Source code for nowcast.workers.watch_nemo

# Copyright 2016-2019 Doug Latornell, 43ravens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GoMSS NEMO nowcast worker that monitors and reports on the
progress of a NEMO run.
"""
import logging
import os
from pathlib import Path
import shlex
import subprocess
import time

import arrow
from nemo_cmd.namelist import namelist2dict
from nemo_nowcast import NowcastWorker, WorkerError

NAME = "watch_nemo"
logger = logging.getLogger(NAME)

POLL_INTERVAL = 2 * 60  # seconds


[docs]def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.watch_nemo --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.run(watch_nemo, success, failure)
def success(parsed_args): logger.info("NEMO run completed") msg_type = "success" return msg_type def failure(parsed_args): logger.critical("NEMO run failed") msg_type = "failure" return msg_type def watch_nemo(parsed_args, config, tell_manager): run_info = tell_manager("need", "NEMO run").payload["nowcast"] run_date = arrow.get(run_info["run date"]) pid = _find_run_pid(run_info) logger.debug(f"run pid: {pid}") # Get run time steps and date info from namelist run_dir = Path(run_info["run dir"]) namelist = namelist2dict(os.fspath(run_dir / "namelist_cfg")) it000 = namelist["namrun"][0]["nn_it000"] itend = namelist["namrun"][0]["nn_itend"] date0 = arrow.get(str(namelist["namrun"][0]["nn_date0"]), "YYYYMMDD") rdt = namelist["namdom"][0]["rn_rdt"] # Watch for the run process to end while _pid_exists(pid): try: with (run_dir / "time.step").open("rt") as f: time_step = int(f.read().strip()) model_seconds = (time_step - it000) * rdt model_time = date0.shift(seconds=model_seconds).format( "YYYY-MM-DD HH:mm:ss UTC" ) fraction_done = (time_step - it000) / (itend - it000) msg = ( f"timestep: {time_step} = {model_time}, " f"{fraction_done:.1%} complete" ) except FileNotFoundError: # time.step file not found; assume that run is young and it # hasn't been created yet, or has finished and it has been # moved to the results directory msg = "time.step not found; continuing to watch..." logger.info(msg) time.sleep(POLL_INTERVAL) run_succeeded = _confirm_run_success(run_date, run_dir, itend, config) if not run_succeeded: raise WorkerError checklist = { "nowcast": {"run date": run_info["run date"], "completed": run_succeeded} } return checklist def _find_run_pid(run_info): run_exec_cmd = run_info["run exec cmd"] cmd = shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"') logger.debug(f'searching processes for "{run_exec_cmd}"') pid = None while pid is None: try: proc = subprocess.run( cmd, stdout=subprocess.PIPE, check=True, universal_newlines=True ) pid = int(proc.stdout) except subprocess.CalledProcessError: # Process has not yet been spawned pass return pid def _pid_exists(pid): """Check whether pid exists in the current process table. From: http://stackoverflow.com/a/6940314 """ if pid < 0: return False if pid == 0: # According to "man 2 kill" PID 0 refers to every process # in the process group of the calling process. # On certain systems 0 is a valid PID but we have no way # to know that in a portable fashion. raise ValueError("invalid PID 0") try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: # PermissionError clearly means there's a process to deny access to return True except OSError: raise return True def _confirm_run_success(run_date, run_dir, itend, config): run_succeeded = True results_dir = Path(config["results archive"], run_date.format("YYYY-MM-DD")) if not results_dir.exists(): run_succeeded = False logger.critical(f"No results directory: {results_dir}") # Continue the rest of the checks in the temporary run directory results_dir = run_dir if (results_dir / "output.abort.nc").exists(): run_succeeded = False logger.critical(f'Run aborted: {results_dir/"output.abort.nc"}') try: with (results_dir / "time.step").open("rt") as f: time_step = int(f.read().strip()) if time_step != itend: run_succeeded = False logger.critical(f"Run failed: final time step is {time_step} not {itend}") except FileNotFoundError: run_succeeded = False logger.critical(f"Run failed; no time.step file") pass try: with (results_dir / "ocean.output").open("rt") as f: for line in f: if "E R R O R" in line: run_succeeded = False logger.critical( f"Run failed; " f'1 or more E R R O R in: {results_dir/"ocean.output"}' ) break except FileNotFoundError: run_succeeded = False logger.critical(f"Run failed; no ocean.output file") pass try: with (results_dir / "solver.stat").open("rt") as f: for line in f: if "NaN" in line: run_succeeded = False logger.critical(f'Run failed; NaN in: {results_dir/"solver.stat"}') break except FileNotFoundError: run_succeeded = False logger.critical(f"Run failed; no solver.stat file") pass if not (results_dir / "restart").exists(): run_succeeded = False logger.critical("Run failed; no restart/ directory") return run_succeeded if __name__ == "__main__": main() # pragma: no cover