# Copyright 2016-2019 Doug Latornell, 43ravens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GoMSS NEMO nowcast worker that monitors and reports on the
progress of a NEMO run.
"""
import logging
import os
from pathlib import Path
import shlex
import subprocess
import time
import arrow
from nemo_cmd.namelist import namelist2dict
from nemo_nowcast import NowcastWorker, WorkerError
NAME = "watch_nemo"
logger = logging.getLogger(NAME)
POLL_INTERVAL = 2 * 60 # seconds
[docs]def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.watch_nemo --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.run(watch_nemo, success, failure)
def success(parsed_args):
logger.info("NEMO run completed")
msg_type = "success"
return msg_type
def failure(parsed_args):
logger.critical("NEMO run failed")
msg_type = "failure"
return msg_type
def watch_nemo(parsed_args, config, tell_manager):
run_info = tell_manager("need", "NEMO run").payload["nowcast"]
run_date = arrow.get(run_info["run date"])
pid = _find_run_pid(run_info)
logger.debug(f"run pid: {pid}")
# Get run time steps and date info from namelist
run_dir = Path(run_info["run dir"])
namelist = namelist2dict(os.fspath(run_dir / "namelist_cfg"))
it000 = namelist["namrun"][0]["nn_it000"]
itend = namelist["namrun"][0]["nn_itend"]
date0 = arrow.get(str(namelist["namrun"][0]["nn_date0"]), "YYYYMMDD")
rdt = namelist["namdom"][0]["rn_rdt"]
# Watch for the run process to end
while _pid_exists(pid):
try:
with (run_dir / "time.step").open("rt") as f:
time_step = int(f.read().strip())
model_seconds = (time_step - it000) * rdt
model_time = date0.shift(seconds=model_seconds).format(
"YYYY-MM-DD HH:mm:ss UTC"
)
fraction_done = (time_step - it000) / (itend - it000)
msg = (
f"timestep: {time_step} = {model_time}, "
f"{fraction_done:.1%} complete"
)
except FileNotFoundError:
# time.step file not found; assume that run is young and it
# hasn't been created yet, or has finished and it has been
# moved to the results directory
msg = "time.step not found; continuing to watch..."
logger.info(msg)
time.sleep(POLL_INTERVAL)
run_succeeded = _confirm_run_success(run_date, run_dir, itend, config)
if not run_succeeded:
raise WorkerError
checklist = {
"nowcast": {"run date": run_info["run date"], "completed": run_succeeded}
}
return checklist
def _find_run_pid(run_info):
run_exec_cmd = run_info["run exec cmd"]
cmd = shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"')
logger.debug(f'searching processes for "{run_exec_cmd}"')
pid = None
while pid is None:
try:
proc = subprocess.run(
cmd, stdout=subprocess.PIPE, check=True, universal_newlines=True
)
pid = int(proc.stdout)
except subprocess.CalledProcessError:
# Process has not yet been spawned
pass
return pid
def _pid_exists(pid):
"""Check whether pid exists in the current process table.
From: http://stackoverflow.com/a/6940314
"""
if pid < 0:
return False
if pid == 0:
# According to "man 2 kill" PID 0 refers to every process
# in the process group of the calling process.
# On certain systems 0 is a valid PID but we have no way
# to know that in a portable fashion.
raise ValueError("invalid PID 0")
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
# PermissionError clearly means there's a process to deny access to
return True
except OSError:
raise
return True
def _confirm_run_success(run_date, run_dir, itend, config):
run_succeeded = True
results_dir = Path(config["results archive"], run_date.format("YYYY-MM-DD"))
if not results_dir.exists():
run_succeeded = False
logger.critical(f"No results directory: {results_dir}")
# Continue the rest of the checks in the temporary run directory
results_dir = run_dir
if (results_dir / "output.abort.nc").exists():
run_succeeded = False
logger.critical(f'Run aborted: {results_dir/"output.abort.nc"}')
try:
with (results_dir / "time.step").open("rt") as f:
time_step = int(f.read().strip())
if time_step != itend:
run_succeeded = False
logger.critical(f"Run failed: final time step is {time_step} not {itend}")
except FileNotFoundError:
run_succeeded = False
logger.critical(f"Run failed; no time.step file")
pass
try:
with (results_dir / "ocean.output").open("rt") as f:
for line in f:
if "E R R O R" in line:
run_succeeded = False
logger.critical(
f"Run failed; "
f'1 or more E R R O R in: {results_dir/"ocean.output"}'
)
break
except FileNotFoundError:
run_succeeded = False
logger.critical(f"Run failed; no ocean.output file")
pass
try:
with (results_dir / "solver.stat").open("rt") as f:
for line in f:
if "NaN" in line:
run_succeeded = False
logger.critical(f'Run failed; NaN in: {results_dir/"solver.stat"}')
break
except FileNotFoundError:
run_succeeded = False
logger.critical(f"Run failed; no solver.stat file")
pass
if not (results_dir / "restart").exists():
run_succeeded = False
logger.critical("Run failed; no restart/ directory")
return run_succeeded
if __name__ == "__main__":
main() # pragma: no cover