Source code for nowcast.workers.make_weather_forcing

# Copyright 2016-2019 Doug Latornell, 43ravens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GoMSS NEMO nowcast worker to assemble a weather forcing file from
a collection of HRDPS forecast files.

Concatenate the contents of the gzipped hourly netCDF files from
the Environment Canada GEM 2.5km HRDPS research model forecast
for a specified date; (defaults to today) to create a weather forcing
file for NEMO.

The concatenated file is a netCDF-4/HDF5 format file with LZ-deflated
variables to reduce its size.
The number of hours in the forcing file is the same as the number of hours
in the forecast
(see the :kbd:`weather:download:forecast hrs` value in the system configuration
file).

The final hour of the previous day's forecast is used as the 0th hour of
weather forcing to avoid restart artifacts from the HRDPS product.

The precipitation is transformed from the
cumulative metres of equivalent water to flux in kg/m^2/s.
The 0th hour flux is calculated from the final 2 hours accummulation in\
the previous day's forecast.

Solid phase precipitation flux is calculated by assuming that precipitation
at grid points where the value of the 2m air temperature (:kbd:`tair`) is
less than 273.15 Kelvin is snow.
The calculated :kbd:`snow` file is merged into the dataset created from the
HRDPS forecast files to produce the NEMO forcing file.
"""
import gzip
import logging
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory

import arrow
import numpy
import xarray

from nemo_nowcast import NowcastWorker, WorkerError

NAME = "make_weather_forcing"
logger = logging.getLogger(NAME)


[docs]def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.make_weather_forcing --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_date_option( "--weather-date", default=(arrow.now().floor("day")), help="Date for which to assemble the weather forcing file.", ) worker.run(make_weather_forcing, success, failure)
def success(parsed_args): ymd = parsed_args.weather_date.format("YYYY-MM-DD") logger.info(f"{ymd} weather forcing file created", extra={"weather_date": ymd}) msg_type = "success" return msg_type def failure(parsed_args): ymd = parsed_args.weather_date.format("YYYY-MM-DD") logger.critical( f"{ymd} weather forcing file creation failed", extra={"weather_date": ymd} ) msg_type = "failure" return msg_type def make_weather_forcing(parsed_args, config, *args): weather_date = parsed_args.weather_date ymd = weather_date.format("YYYY-MM-DD") logger.info( f"assembling hourly forecast files for {ymd} into 1-day forcing file", extra={"weather_date": ymd}, ) download_dir = Path(config["weather"]["download"]["dest dir"]) forecast_hrs = config["weather"]["download"]["forecast hrs"] download_filename_tmpl = config["weather"]["download"]["file template"] dest_dir = Path(config["weather"]["forcing"]["dest dir"]) filename_tmpl = config["weather"]["forcing"]["file template"] forcing_filepath = _unzip_concat_calc_precip_flux( weather_date, forecast_hrs, download_dir, download_filename_tmpl, dest_dir, filename_tmpl, ) logger.info(f"created {forcing_filepath}") checklist = { "weather date": ymd, "file": forcing_filepath.as_posix(), } return checklist def _unzip_concat_calc_precip_flux( weather_date, forecast_hrs, download_dir, download_filename_tmpl, dest_dir, filename_tmpl, ): yyyy, mm, dd = weather_date.format("YYYY-MM-DD").split("-") forcing_filepath = dest_dir / filename_tmpl.format(yyyy=yyyy, mm=mm, dd=dd) with TemporaryDirectory() as tmp_dir_name: # Note: All calculations must be completed within this context # because we are using dask in combination with xarray and so # the calculations are deferred until _store_as_netcdf() is called, # at which point the unzipped files in the temporary directory # are processed. gunzip_dir = Path(tmp_dir_name) for hr in (23, 24): nc_filepath = _unzip( weather_date.shift(days=-1), hr, download_dir, download_filename_tmpl, gunzip_dir, ) nc_filepaths = [nc_filepath] logger.debug(f"hr=0 weather from {nc_filepath}") for hr in range(1, forecast_hrs): nc_filepath = _unzip( weather_date, hr, download_dir, download_filename_tmpl, gunzip_dir ) nc_filepaths.append(nc_filepath) logger.debug(f"hr={hr} weather from {nc_filepath}") ds = xarray.open_mfdataset( [nc_filepath.as_posix() for nc_filepath in nc_filepaths] ) _reshape_lats_lons(ds) _precip_to_flux( ds, weather_date, forecast_hrs, gunzip_dir, download_filename_tmpl ) ds_with_snow = _solid_precip_flux(ds) _store_as_netcdf(ds_with_snow, forcing_filepath) if any(numpy.any(ds_with_snow.variables[var].isnull()) for var in ds_with_snow.variables): logger.critical(f"found None or NaN value(s) in {forcing_filepath}") raise WorkerError return forcing_filepath def _unzip(weather_date, hr, download_dir, download_filename_tmpl, gunzip_dir): gz_filename = Path( download_filename_tmpl.format( yyyymmdd=weather_date.format("YYYYMMDD"), hhh=f"{hr:03d}" ) ) gz_filepath = download_dir / gz_filename nc_filepath = gunzip_dir / gz_filename.stem with gzip.open(gz_filepath.open("rb")) as f_in: with nc_filepath.open("wb") as f_out: shutil.copyfileobj(f_in, f_out) logger.debug(f"gunzip-ed {gz_filepath} to {nc_filepath}") return nc_filepath def _reshape_lats_lons(ds): # NEMO requires that nav_lat and nav_lon variables be 2D (y, x), # not 3D (t, y, x) lats = ds.nav_lat[0, ...] del lats.coords["time_counter"] ds["nav_lat"] = lats lons = ds.nav_lon[0, ...] del lons.coords["time_counter"] ds["nav_lon"] = lons logger.debug("nav_lat and nav_lon variables reshaped to (y, x)") def _precip_to_flux(ds, weather_date, forecast_hrs, gunzip_dir, download_filename_tmpl): # Cumulative to flux conversion factor from # m/hr * 1000 kg/m^3 * 1/3600 hr/s = 1/3.6 ds.load() for hr in range(forecast_hrs - 1, 1, -1): ds.precip.values[hr] = (ds.precip.values[hr] - ds.precip.values[hr - 1]) / 3.6 logger.debug(f"hr={hr} precip flux calculated by de-accumulation") ds.precip.values[1] /= 3.6 logger.debug("hr=1 precip flux calculated by units conversion") day_m1_precip = {} yyyymmdd = weather_date.shift(days=-1).format("YYYYMMDD") for hr in (forecast_hrs - 2, forecast_hrs - 1): gz_filename = Path( download_filename_tmpl.format(yyyymmdd=yyyymmdd, hhh=f"{hr:03d}") ) nc_filepath = gunzip_dir / gz_filename.stem with xarray.open_dataset(nc_filepath.as_posix()) as hr_ds: day_m1_precip[hr] = hr_ds.precip.values[0, ...] ds.precip.values[0] = ( day_m1_precip[forecast_hrs - 1] - day_m1_precip[forecast_hrs - 2] ) / 3.6 logger.debug("hr=0 precip flux calculated from previous day") ds.precip.attrs["units"] = "kg m-2 s-1" ds.precip.attrs["long_name"] = "precipitation" ds.precip.attrs["valid_max"] = 3 logger.debug("cummulative precip converted to flux") def _solid_precip_flux(ds): # Precipitation flux at grid points where air temperature is >273.15K # is assumed to be snow snow = ds.precip.copy() snow.name = "snow" snow.attrs["short_name"] = "snow" snow.attrs["long_name"] = "solid precipitation flux" snow.values[ds.tair.values >= 273.15] = 0 logger.debug("solid phase precipitation flux calculated") ds_with_snow = xarray.merge((ds, snow)) return ds_with_snow def _store_as_netcdf(ds, forcing_filepath): encoding = {var: {"zlib": True} for var in ds.data_vars} encoding["time_counter"] = { "units": "seconds since 1970-01-01 00:00:00", "zlib": True, } ds.to_netcdf( forcing_filepath.as_posix(), encoding=encoding, unlimited_dims=("time_counter",) ) logger.debug(f"concatenated forecasts stored as {forcing_filepath}") if __name__ == "__main__": main() # pragma: no cover