Source code for nowcast.workers.run_nemo

# Copyright 2016-2019 Doug Latornell, 43ravens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GoMSS NEMO nowcast worker that prepares the YAML run
description file and bash run script for a nowcast, and launches the run.
"""
import logging
import os
from pathlib import Path
import shlex
import subprocess

import arrow
import nemo_cmd.api
import yaml
from nemo_cmd.namelist import namelist2dict, get_namelist_value

from nemo_nowcast import NowcastWorker
from nemo_nowcast.fileutils import FilePerms

NAME = "run_nemo"
logger = logging.getLogger(NAME)


[docs]def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.run_nemo --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_date_option( "--run-date", default=arrow.now().floor("day"), help="Date to execute the run for.", ) worker.run(run_nemo, success, failure)
def success(parsed_args): logger.info( f'NEMO run for {parsed_args.run_date.format("YYYY-MM-DD")} started', extra={"date": parsed_args.run_date.format("YYYY-MM-DD HH:mm:ss ZZ")}, ) msg_type = "success" return msg_type def failure(parsed_args): logger.critical( f'NEMO run for {parsed_args.run_date.format("YYYY-MM-DD")} failed', extra={"date": parsed_args.run_date.format("YYYY-MM-DD HH:mm:ss ZZ")}, ) msg_type = "failure" return msg_type def run_nemo(parsed_args, config, *args): run_date = parsed_args.run_date run_desc_filepath = _create_run_desc_file(run_date, config) run_dir = nemo_cmd.api.prepare(run_desc_filepath) logger.debug(f"temporary run directory: {run_dir}") run_script_filepath = _create_run_script( run_date, run_desc_filepath, run_dir, config ) run_desc_filepath.unlink() run_exec_cmd = _launch_run_script(run_script_filepath) checklist = { "nowcast": { "run dir": os.fspath(run_dir), "run exec cmd": run_exec_cmd, "run date": run_date.format("YYYY-MM-DD"), } } return checklist def _create_run_desc_file(run_date, config): run_id = f"{run_date.format('YYYY-MM-DD')}nowcast" run_duration = config["runs"]["duration"] _update_time_namelist(run_date, run_duration, config) run_desc = _run_description(run_date, run_id, config) run_prep_dir = Path(config["runs"]["run prep dir"]) run_desc_filepath = run_prep_dir / f"{run_id}.yaml" with run_desc_filepath.open("wt") as f: yaml.dump(run_desc, f, default_flow_style=False) logger.debug(f"run description file: {run_desc_filepath}") return run_desc_filepath def _update_time_namelist(run_date, run_duration, config): results_dir = Path(config["results archive"]) prev_dmy = run_date.shift(days=-1).format("YYYY-MM-DD") prev_run_namelist = namelist2dict( os.fspath(results_dir / prev_dmy / "namelist_cfg") ) prev_it000 = prev_run_namelist["namrun"][0]["nn_it000"] rdt = prev_run_namelist["namdom"][0]["rn_rdt"] timesteps_per_day = 86400 / rdt namelist_time = Path(config["runs"]["run prep dir"], "namelist.time") with namelist_time.open("rt") as f: lines = f.readlines() new_lines = _calc_new_namelist_lines( run_date, run_duration, prev_it000, timesteps_per_day, lines ) with namelist_time.open("wt") as f: f.writelines(new_lines) def _calc_new_namelist_lines( run_date, run_duration, prev_it000, timesteps_per_day, lines ): it000, it000_line = get_namelist_value("nn_it000", lines) itend, itend_line = get_namelist_value("nn_itend", lines) new_it000 = int(prev_it000 + timesteps_per_day) lines[it000_line] = lines[it000_line].replace(it000, str(new_it000), 1) restart_timestep = int((prev_it000 - 1) + int(run_duration) * timesteps_per_day) new_itend = int(restart_timestep + (run_duration * timesteps_per_day)) lines[itend_line] = lines[itend_line].replace(itend, str(new_itend), 1) date0, date0_line = get_namelist_value("nn_date0", lines) lines[date0_line] = lines[date0_line].replace(date0, run_date.format("YYYYMMDD")) restart_src, restart_src_line = get_namelist_value("cn_ocerst_in", lines) lines[restart_src_line] = lines[restart_src_line].replace( restart_src, f'"GoMSS_NOWCAST_{restart_timestep:08d}_restart"', 1 ) return lines def _run_description(run_date, run_id, config): run_prep_dir = Path(config["runs"]["run prep dir"]) config_name = config["runs"]["config name"] run_desc = { "config name": config_name, "MPI decomposition": config["runs"]["mpi decomposition"], "run_id": run_id, } nemo_code = Path(config["runs"]["NEMO code"]) nemo_code_config = nemo_code / "CONFIG" xios_code = Path(config["runs"]["XIOS code"]) run_desc["paths"] = { "NEMO code config": os.fspath(nemo_code_config.resolve()), "XIOS": os.fspath(xios_code.resolve()), "forcing": os.fspath(run_prep_dir.resolve()), "runs directory": os.fspath(run_prep_dir.resolve()), } config_dir = nemo_code_config / config_name / "EXP00" run_desc["grid"] = { "coordinates": os.fspath( (config_dir / config["runs"]["coordinates"]).resolve() ), "bathymetry": os.fspath((config_dir / config["runs"]["bathymetry"]).resolve()), "land processor elimination": False, } results_archive = Path(config["results archive"]) prev_yyyymmdd = run_date.shift(days=-1).format("YYYY-MM-DD") run_desc["forcing"] = { "init": { "link to": os.fspath((results_archive / prev_yyyymmdd / "restart").resolve()) }, "surface_forcing": { "link to": os.fspath((run_prep_dir / "surface_forcing").resolve()) }, "open_boundaries": { "link to": os.fspath((run_prep_dir / "open_boundaries").resolve()) }, } namelist_sections = ( "namelist.time", "namelist.domain", "namelist.surface", "namelist.lateral", "namelist.bottom", "namelist.tracers", "namelist.dynamics", "namelist.vertical", "namelist.compute", ) namelists = {"namelist_cfg": []} for namelist in namelist_sections: if (run_prep_dir / namelist).exists(): namelists["namelist_cfg"].append( os.fspath((run_prep_dir / namelist).resolve()) ) else: namelists["namelist_cfg"].append( os.fspath((config_dir / namelist).resolve()) ) run_desc["namelists"] = namelists run_desc["output"] = { "iodefs": os.fspath((run_prep_dir / "iodef.xml").resolve()), "domaindefs": os.fspath((config_dir / "domain_def.xml").resolve()), "fielddefs": os.fspath((config_dir / "field_def.xml").resolve()), "separate XIOS server": True, "XIOS servers": config["runs"]["xios servers"], } run_desc["vcs revisions"] = { "hg": [ os.fspath((run_prep_dir / "../gomss-nemo-config").resolve()), os.fspath((run_prep_dir / "../GoMSS_Nowcast").resolve()), os.fspath((run_prep_dir / "../NEMO-Cmd").resolve()), os.fspath((run_prep_dir / "../NEMO_Nowcast").resolve()), os.fspath((Path("~/nowcast-sys").expanduser()/"NEMO-ARCH").resolve()), os.fspath((Path("~/nowcast-sys").expanduser()/"XIOS-ARCH").resolve()), ] } return run_desc def _create_run_script(run_date, run_desc_filepath, run_dir, config): results_dir = Path(config["results archive"]) script = _build_script( run_dir, run_desc_filepath, results_dir / run_date.format("YYYY-MM-DD"), config) run_script_filepath = run_dir / "GoMSS_NEMO.sh" with run_script_filepath.open("wt") as f: f.write(script) run_script_filepath.chmod(FilePerms(user="rwx", group="rwx", other="r")) logger.debug(f"run script: {run_script_filepath}") return run_script_filepath def _build_script(run_dir, run_desc_filepath, results_dir, config): with run_desc_filepath.open("rt") as f: run_desc = yaml.safe_load(f) jpni, jpnj = map(int, run_desc["MPI decomposition"].split("x")) nemo_processors = jpni * jpnj xios_processors = int(run_desc["output"]["XIOS servers"]) script = "\n".join( ( f"#!/bin/bash\n", f"{_definitions(run_desc, run_desc_filepath, run_dir, results_dir, config)}\n" f"{_execute(nemo_processors, xios_processors)}\n" f"{_cleanup()}" ) ) return script def _definitions(run_desc, run_desc_filepath, run_dir, results_dir, config): run_id = run_desc["run_id"] mpirun_cmd = config["runs"]["mpirun cmd"] nemo_cmd = config["runs"]["nemo cmd"] defns = ( f'RUN_ID="{run_id}"\n' f'RUN_DESC="{run_desc_filepath.name}"\n' f'WORK_DIR="{run_dir}"\n' f'RESULTS_DIR="{results_dir}"\n' f'MPIRUN="{mpirun_cmd}"\n' f'GATHER="{nemo_cmd} gather"\n' ) return defns def _execute(nemo_processors, xios_processors): mpirun = f"${{MPIRUN}} -np {nemo_processors} ./nemo.exe" mpirun = " ".join((mpirun, ":", "-np", str(xios_processors), "./xios_server.exe")) script = ( "mkdir -p ${RESULTS_DIR}\n" "\n" "cd ${WORK_DIR}\n" 'echo "working dir: $(pwd)" >>${RESULTS_DIR}/stdout\n' "mkdir -p restart\n" "\n" 'echo "Starting run at $(date)" >>${RESULTS_DIR}/stdout\n' ) script += f"{mpirun} >>${{RESULTS_DIR}}/stdout 2>>${{RESULTS_DIR}}/stderr\n" script += ( 'echo "Ended run at $(date)" >>${RESULTS_DIR}/stdout\n' "\n" 'echo "Results gathering started at $(date)" >>${RESULTS_DIR}/stdout\n' "${GATHER} ${RESULTS_DIR} " ">>${RESULTS_DIR}/stdout 2>>${RESULTS_DIR}/stderr\n" 'echo "Results gathering ended at $(date)" >>${RESULTS_DIR}/stdout\n' ) return script def _cleanup(): script = ( 'echo "Deleting run directory" >>${RESULTS_DIR}/stdout\n' "rmdir $(pwd)\n" 'echo "Finished at $(date)" >>${RESULTS_DIR}/stdout\n' ) return script def _launch_run_script(run_script_filepath): logger.info(f"launching {run_script_filepath}") run_exec_cmd = f"bash {run_script_filepath}" logger.debug(f"running command in subprocess: {shlex.split(run_exec_cmd)}") subprocess.Popen(shlex.split(run_exec_cmd)) run_process_pid = None cmd = shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"') while not run_process_pid: try: run_process_pid = int(subprocess.check_output(cmd, universal_newlines=True)) except subprocess.CalledProcessError: # Process has not yet been spawned pass logger.debug(f"run pid: {run_process_pid}") return run_exec_cmd if __name__ == "__main__": main()