# Copyright 2016-2019 Doug Latornell, 43ravens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GoMSS NEMO nowcast worker that prepares the YAML run
description file and bash run script for a nowcast, and launches the run.
"""
import logging
import os
from pathlib import Path
import shlex
import subprocess
import arrow
import nemo_cmd.api
import yaml
from nemo_cmd.namelist import namelist2dict, get_namelist_value
from nemo_nowcast import NowcastWorker
from nemo_nowcast.fileutils import FilePerms
NAME = "run_nemo"
logger = logging.getLogger(NAME)
[docs]def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.run_nemo --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_date_option(
"--run-date",
default=arrow.now().floor("day"),
help="Date to execute the run for.",
)
worker.run(run_nemo, success, failure)
def success(parsed_args):
logger.info(
f'NEMO run for {parsed_args.run_date.format("YYYY-MM-DD")} started',
extra={"date": parsed_args.run_date.format("YYYY-MM-DD HH:mm:ss ZZ")},
)
msg_type = "success"
return msg_type
def failure(parsed_args):
logger.critical(
f'NEMO run for {parsed_args.run_date.format("YYYY-MM-DD")} failed',
extra={"date": parsed_args.run_date.format("YYYY-MM-DD HH:mm:ss ZZ")},
)
msg_type = "failure"
return msg_type
def run_nemo(parsed_args, config, *args):
run_date = parsed_args.run_date
run_desc_filepath = _create_run_desc_file(run_date, config)
run_dir = nemo_cmd.api.prepare(run_desc_filepath)
logger.debug(f"temporary run directory: {run_dir}")
run_script_filepath = _create_run_script(
run_date, run_desc_filepath, run_dir, config
)
run_desc_filepath.unlink()
run_exec_cmd = _launch_run_script(run_script_filepath)
checklist = {
"nowcast": {
"run dir": os.fspath(run_dir),
"run exec cmd": run_exec_cmd,
"run date": run_date.format("YYYY-MM-DD"),
}
}
return checklist
def _create_run_desc_file(run_date, config):
run_id = f"{run_date.format('YYYY-MM-DD')}nowcast"
run_duration = config["runs"]["duration"]
_update_time_namelist(run_date, run_duration, config)
run_desc = _run_description(run_date, run_id, config)
run_prep_dir = Path(config["runs"]["run prep dir"])
run_desc_filepath = run_prep_dir / f"{run_id}.yaml"
with run_desc_filepath.open("wt") as f:
yaml.dump(run_desc, f, default_flow_style=False)
logger.debug(f"run description file: {run_desc_filepath}")
return run_desc_filepath
def _update_time_namelist(run_date, run_duration, config):
results_dir = Path(config["results archive"])
prev_dmy = run_date.shift(days=-1).format("YYYY-MM-DD")
prev_run_namelist = namelist2dict(
os.fspath(results_dir / prev_dmy / "namelist_cfg")
)
prev_it000 = prev_run_namelist["namrun"][0]["nn_it000"]
rdt = prev_run_namelist["namdom"][0]["rn_rdt"]
timesteps_per_day = 86400 / rdt
namelist_time = Path(config["runs"]["run prep dir"], "namelist.time")
with namelist_time.open("rt") as f:
lines = f.readlines()
new_lines = _calc_new_namelist_lines(
run_date, run_duration, prev_it000, timesteps_per_day, lines
)
with namelist_time.open("wt") as f:
f.writelines(new_lines)
def _calc_new_namelist_lines(
run_date, run_duration, prev_it000, timesteps_per_day, lines
):
it000, it000_line = get_namelist_value("nn_it000", lines)
itend, itend_line = get_namelist_value("nn_itend", lines)
new_it000 = int(prev_it000 + timesteps_per_day)
lines[it000_line] = lines[it000_line].replace(it000, str(new_it000), 1)
restart_timestep = int((prev_it000 - 1) + int(run_duration) * timesteps_per_day)
new_itend = int(restart_timestep + (run_duration * timesteps_per_day))
lines[itend_line] = lines[itend_line].replace(itend, str(new_itend), 1)
date0, date0_line = get_namelist_value("nn_date0", lines)
lines[date0_line] = lines[date0_line].replace(date0, run_date.format("YYYYMMDD"))
restart_src, restart_src_line = get_namelist_value("cn_ocerst_in", lines)
lines[restart_src_line] = lines[restart_src_line].replace(
restart_src, f'"GoMSS_NOWCAST_{restart_timestep:08d}_restart"', 1
)
return lines
def _run_description(run_date, run_id, config):
run_prep_dir = Path(config["runs"]["run prep dir"])
config_name = config["runs"]["config name"]
run_desc = {
"config name": config_name,
"MPI decomposition": config["runs"]["mpi decomposition"],
"run_id": run_id,
}
nemo_code = Path(config["runs"]["NEMO code"])
nemo_code_config = nemo_code / "CONFIG"
xios_code = Path(config["runs"]["XIOS code"])
run_desc["paths"] = {
"NEMO code config": os.fspath(nemo_code_config.resolve()),
"XIOS": os.fspath(xios_code.resolve()),
"forcing": os.fspath(run_prep_dir.resolve()),
"runs directory": os.fspath(run_prep_dir.resolve()),
}
config_dir = nemo_code_config / config_name / "EXP00"
run_desc["grid"] = {
"coordinates": os.fspath(
(config_dir / config["runs"]["coordinates"]).resolve()
),
"bathymetry": os.fspath((config_dir / config["runs"]["bathymetry"]).resolve()),
"land processor elimination": False,
}
results_archive = Path(config["results archive"])
prev_yyyymmdd = run_date.shift(days=-1).format("YYYY-MM-DD")
run_desc["forcing"] = {
"init": {
"link to": os.fspath((results_archive / prev_yyyymmdd / "restart").resolve())
},
"surface_forcing": {
"link to": os.fspath((run_prep_dir / "surface_forcing").resolve())
},
"open_boundaries": {
"link to": os.fspath((run_prep_dir / "open_boundaries").resolve())
},
}
namelist_sections = (
"namelist.time",
"namelist.domain",
"namelist.surface",
"namelist.lateral",
"namelist.bottom",
"namelist.tracers",
"namelist.dynamics",
"namelist.vertical",
"namelist.compute",
)
namelists = {"namelist_cfg": []}
for namelist in namelist_sections:
if (run_prep_dir / namelist).exists():
namelists["namelist_cfg"].append(
os.fspath((run_prep_dir / namelist).resolve())
)
else:
namelists["namelist_cfg"].append(
os.fspath((config_dir / namelist).resolve())
)
run_desc["namelists"] = namelists
run_desc["output"] = {
"iodefs": os.fspath((run_prep_dir / "iodef.xml").resolve()),
"domaindefs": os.fspath((config_dir / "domain_def.xml").resolve()),
"fielddefs": os.fspath((config_dir / "field_def.xml").resolve()),
"separate XIOS server": True,
"XIOS servers": config["runs"]["xios servers"],
}
run_desc["vcs revisions"] = {
"hg": [
os.fspath((run_prep_dir / "../gomss-nemo-config").resolve()),
os.fspath((run_prep_dir / "../GoMSS_Nowcast").resolve()),
os.fspath((run_prep_dir / "../NEMO-Cmd").resolve()),
os.fspath((run_prep_dir / "../NEMO_Nowcast").resolve()),
os.fspath((Path("~/nowcast-sys").expanduser()/"NEMO-ARCH").resolve()),
os.fspath((Path("~/nowcast-sys").expanduser()/"XIOS-ARCH").resolve()),
]
}
return run_desc
def _create_run_script(run_date, run_desc_filepath, run_dir, config):
results_dir = Path(config["results archive"])
script = _build_script(
run_dir, run_desc_filepath, results_dir / run_date.format("YYYY-MM-DD"), config)
run_script_filepath = run_dir / "GoMSS_NEMO.sh"
with run_script_filepath.open("wt") as f:
f.write(script)
run_script_filepath.chmod(FilePerms(user="rwx", group="rwx", other="r"))
logger.debug(f"run script: {run_script_filepath}")
return run_script_filepath
def _build_script(run_dir, run_desc_filepath, results_dir, config):
with run_desc_filepath.open("rt") as f:
run_desc = yaml.safe_load(f)
jpni, jpnj = map(int, run_desc["MPI decomposition"].split("x"))
nemo_processors = jpni * jpnj
xios_processors = int(run_desc["output"]["XIOS servers"])
script = "\n".join(
(
f"#!/bin/bash\n",
f"{_definitions(run_desc, run_desc_filepath, run_dir, results_dir, config)}\n"
f"{_execute(nemo_processors, xios_processors)}\n"
f"{_cleanup()}"
)
)
return script
def _definitions(run_desc, run_desc_filepath, run_dir, results_dir, config):
run_id = run_desc["run_id"]
mpirun_cmd = config["runs"]["mpirun cmd"]
nemo_cmd = config["runs"]["nemo cmd"]
defns = (
f'RUN_ID="{run_id}"\n'
f'RUN_DESC="{run_desc_filepath.name}"\n'
f'WORK_DIR="{run_dir}"\n'
f'RESULTS_DIR="{results_dir}"\n'
f'MPIRUN="{mpirun_cmd}"\n'
f'GATHER="{nemo_cmd} gather"\n'
)
return defns
def _execute(nemo_processors, xios_processors):
mpirun = f"${{MPIRUN}} -np {nemo_processors} ./nemo.exe"
mpirun = " ".join((mpirun, ":", "-np", str(xios_processors), "./xios_server.exe"))
script = (
"mkdir -p ${RESULTS_DIR}\n"
"\n"
"cd ${WORK_DIR}\n"
'echo "working dir: $(pwd)" >>${RESULTS_DIR}/stdout\n'
"mkdir -p restart\n"
"\n"
'echo "Starting run at $(date)" >>${RESULTS_DIR}/stdout\n'
)
script += f"{mpirun} >>${{RESULTS_DIR}}/stdout 2>>${{RESULTS_DIR}}/stderr\n"
script += (
'echo "Ended run at $(date)" >>${RESULTS_DIR}/stdout\n'
"\n"
'echo "Results gathering started at $(date)" >>${RESULTS_DIR}/stdout\n'
"${GATHER} ${RESULTS_DIR} "
">>${RESULTS_DIR}/stdout 2>>${RESULTS_DIR}/stderr\n"
'echo "Results gathering ended at $(date)" >>${RESULTS_DIR}/stdout\n'
)
return script
def _cleanup():
script = (
'echo "Deleting run directory" >>${RESULTS_DIR}/stdout\n'
"rmdir $(pwd)\n"
'echo "Finished at $(date)" >>${RESULTS_DIR}/stdout\n'
)
return script
def _launch_run_script(run_script_filepath):
logger.info(f"launching {run_script_filepath}")
run_exec_cmd = f"bash {run_script_filepath}"
logger.debug(f"running command in subprocess: {shlex.split(run_exec_cmd)}")
subprocess.Popen(shlex.split(run_exec_cmd))
run_process_pid = None
cmd = shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"')
while not run_process_pid:
try:
run_process_pid = int(subprocess.check_output(cmd, universal_newlines=True))
except subprocess.CalledProcessError:
# Process has not yet been spawned
pass
logger.debug(f"run pid: {run_process_pid}")
return run_exec_cmd
if __name__ == "__main__":
main()