Source code for marrmotflow.core

"""Core functionality for MarrmotFlow package."""
# internal imports
from ._default_dicts import (
    default_forcing_units,
    default_forcing_vars,
    default_model_dict
)

from .templating import render_models

# built-in imports
import glob
import os
import warnings
import json
import re

from typing import Dict, Sequence, Union

# third-party imports
import xarray as xr
import geopandas as gpd
import pandas as pd
import pint_xarray
import pint
import pyet
import numpy as np
import timezonefinder

from scipy.io import savemat

# define types
try:
    from os import PathLike
except ImportError:
    PathLike = str
else:
    PathLike = Union[str, PathLike]

[docs] class MARRMOTWorkflow: """A class representing a MARRMOT workflow."""
[docs] def __init__( self, name: str = "MARRMOTModels", cat: gpd.GeoDataFrame | PathLike = None, forcing_vars: Dict[str, str] = {}, forcing_files: Sequence[PathLike] | PathLike = None, # type: ignore forcing_units: Dict[str, str] = {}, pet_method: str = "hamon", model_number: Sequence[int] | int = [7, 37], # HBV-96 and GR4J as default models forcing_time_zone: str = None, model_time_zone: str = None, ) -> 'MARRMOTWorkflow': """ Initialize the MARRMOT workflow with forcing variables, files, units, PET method, and model number. Parameters ---------- forcing_vars : Dict[str, str], optional Dictionary of forcing variables and their units, by default {}. The mandatory keys to this dictionary are: - 'precip': Precipitation variable name - 'temp': Temperature variable name The values must be present in the `forcing_files`. forcing_files : Sequence[PathLike] | PathLike, optional Sequence of paths to forcing files or a single path, by default None forcing_units : Dict[str, str], optional Dictionary of units for the forcing variables, by default {}. The keys must match the keys in `forcing_vars`, and the values must be valid pint units. pet_method : str, optional Method for calculating potential evapotranspiration, by default "hamon" model_number : Sequence[int] | int, optional Model number(s) to be used in the workflow, by default [7, 37] (HBV-96 and GR4J) Raises ------ ValueError If forcing files are not provided or are not in the correct format. TypeError If forcing files are not a sequence or a PathLike object. Notes ----- - `pet_method` only accepts "hamon" as a valid method. Other methods will be added in the future. """ # assing the name of the workflow self.name = name # assign the catchment (cat) as a GeoDataFrame or PathLike if cat is None: raise ValueError("Catchment (cat) must be provided as a GeoDataFrame or PathLike.") if isinstance(cat, gpd.GeoDataFrame): self.cat = cat elif isinstance(cat, PathLike): self.cat = gpd.read_file(cat) else: raise TypeError("cat must be a GeoDataFrame or a PathLike object.") # assign forcing variables self.forcing_vars = forcing_vars # if not a list of forcing files and is a PathLike, read the files using glob.glob if isinstance(forcing_files, PathLike): forcing_files = glob.glob(os.path.join(forcing_files, "**/*.nc*"), recursive=True) # if not, then the user must provide a list of forcing files in NetCDF format # assigning a class attribute self.forcing_files = forcing_files # assign forcing units self.forcing_units = forcing_units # assign pet method self.pet_method = pet_method # assign model number if isinstance(model_number, int): model_number = [model_number] self.model_number = model_number # assign time zones self.forcing_time_zone = forcing_time_zone self.model_time_zone = model_time_zone # assign an output object self.output_mat = None # Placeholder for output matrix
[docs] @classmethod def from_dict( cls: 'MARRMOTWorkflow', init_dict: Dict = {}, ) -> 'MARRMOTWorkflow': """ Constructor to use a dictionary to instantiate """ if len(init_dict) == 0: raise KeyError("`init_dict` cannot be empty") assert isinstance(init_dict, dict), "`init_dict` must be a `dict`" return cls(**init_dict)
[docs] @classmethod def from_json( cls: 'MARRMOTWorkflow', json_str: str, ) -> 'MARRMOTWorkflow': """ Constructor to use a loaded JSON string """ # building customized MARRMOTWorkflow's JSON string decoder object decoder = json.JSONDecoder(object_hook=MARRMOTWorkflow._json_decoder) json_dict = decoder.decode(json_str) # return class instance return cls.from_dict(json_dict)
[docs] @classmethod def from_json_file( cls: 'MARRMOTWorkflow', json_file: 'str', ) -> 'MARRMOTWorkflow': """ Constructor to use a JSON file path """ with open(json_file) as f: json_dict = json.load(f, object_hook=MARRMOTWorkflow._json_decoder) return cls.from_dict(json_dict)
@staticmethod def _env_var_decoder(s): """ OS environmental variable decoder """ # RE patterns env_pat = r'\$(.*?)/' bef_pat = r'(.*?)\$.*?/?' aft_pat = r'\$.*?(/.*)' # strings after re matches e = re.search(env_pat, s).group(1) b = re.search(bef_pat, s).group(1) a = re.search(aft_pat, s).group(1) # extract environmental variable v = os.getenv(e) # return full: before+env_var+after if v: return b+v+a return s @staticmethod def _is_valid_integer(obj): """ Check if a string represents a valid integer. """ try: int(obj) return True except (ValueError, TypeError): return False @staticmethod def _json_decoder(obj): """ Decoding typical JSON strings returned into valid Python objects """ if obj in ["true", "True", "TRUE"]: return True elif obj in ["false", "False", "FALSE"]: return False elif isinstance(obj, str): if '$' in obj: return MARRMOTWorkflow._env_var_decoder(obj) if MARRMOTWorkflow._is_valid_integer(obj): return int(obj) elif isinstance(obj, dict): return {MARRMOTWorkflow._json_decoder(k): MARRMOTWorkflow._json_decoder(v) for k, v in obj.items()} return obj
[docs] @staticmethod def datetime64_to_matlab_datenum_fast(dt64): """ Python's datetime64 to MATLAB datenum conversion. """ dt64 = pd.to_datetime(dt64) matlab_epoch = 719529 # 1970-01-01 unix_epoch = np.datetime64('1970-01-01', 'D') days = (dt64.values - unix_epoch) / np.timedelta64(1, 'D') # Convert to MATLAB datenum format in integer format return (matlab_epoch + days).astype(int)
# class methods
[docs] def run(self): """Run the workflow.""" self.init_forcing_files() # defines self.df self.init_pet() # defines self.pet self.init_model_file(self.model_number) # print a message about the timezones print(f"Using forcing time zone: {self.forcing_time_zone}") print(f"Using model time zone: {self.model_time_zone}") return f"Workflow executed successfully with {len(self.forcing_files)} forcing files."
[docs] def save(self, save_path: PathLike): # type: ignore """Save the workflow output to a specified path.""" if self.forcing is None or self.pet is None: raise ValueError("No output matrix to save. Run the workflow first.") # Create .mat file using the scipy.io.savemat function # the dataframe must be a cobination of self.df and self.pet combined_data = { 'name': self.name, 'gaugeID': f'{self.name} gauge', 'dates_as_datenum': MARRMOTWorkflow.datetime64_to_matlab_datenum_fast(self.forcing.index), 'precip': self.forcing['precip'], 'temp': self.forcing['temp'], 'pet': self.pet, 'delta_t': 1, # Assuming daily data, so delta_t is 1 day } # create the output directory if it does not exist if not os.path.exists(save_path): os.makedirs(save_path, exist_ok=True) # save the combined data to a .mat file savemat(os.path.join(save_path, 'marrmot_data.mat'), {'marrmot_data': combined_data}) # save the model file into a .m file model_file_path = os.path.join(save_path, 'marrmot_model.m') with open(model_file_path, 'w') as f: f.write(self.model) return f"Outputs saved to {save_path}"
def __str__(self): return f"MARRMOTWorkflow(forcing_files={self.forcing_files})"
[docs] def init_forcing_files(self): """Initialize forcing files.""" # check the timezones if self.forcing_time_zone is None: warnings.warn( "Forcing time zone is not set. Defaulting to 'UTC'.", UserWarning ) self.forcing_time_zone = "UTC" # check the model time-zone if self.model_time_zone is None: # try extracting the time zone from the provided `cat` file # calculate area's centroid coordinates---basically what is done # in the `init_class` method warnings.warn( "No `model_time_zone` provided in the settings. " "Autodetecting the time zone using `timezonefinder` " "based on the centroid coordinates of the catchment.", UserWarning, ) if not self.cat.crs: warnings.warn( "Catchment (cat) does not have a CRS. A default of " "EPSG:4326 will be used for latitude calculation.", UserWarning) self.cat.set_crs(epsg=4326, inplace=True) # if crs is not set to EPSG:4326, then convert it to EPSG:4326 elif self.cat.crs != 'EPSG:4326': self.cat.to_crs(epsg=4326, inplace=True) # calculate the latitude from the catchment geometry lat = self.cat.geometry.centroid.y.mean() lng = self.cat.geometry.centroid.x.mean() # extracing the model time zone from the coordinates self.model_time_zone = timezonefinder.TimezoneFinder().timezone_at( lat=lat, lng=lng ) # Print the model time zone if self.model_time_zone: warnings.warn( f"Autodetected model time zone: {self.model_time_zone}", UserWarning, ) # if the model time zone is None, then assume UTC # and warn the user else: self.model_time_zone = 'UTC' warnings.warn( "No `model_time_zone` provided in the settings and" " autodetection using `timezonefinder` failed." " Assuming UTC time zone.", UserWarning, ) _ureg = pint.UnitRegistry(force_ndarray_like=True) # read the forcing files using xarray and create a dataset ds = xr.open_mfdataset( self.forcing_files, combine='by_coords', parallel=False, engine='netcdf4' ) # adjust the model time zone if self.model_time_zone != self.forcing_time_zone: # convert the dataset to the forcing time zone ds = ds.assign_coords({ 'time': ds.time.to_index().tz_localize(self.forcing_time_zone).tz_convert(self.model_time_zone).tz_localize(None) }) # rename the dataset variables to match the forcing_vars # and assign pint units to the dataset variables rename_vars_dict = {} for key, value in self.forcing_vars.items(): if value in ds.variables: rename_vars_dict[value] = default_forcing_vars.get(key) ds = ds.rename(rename_vars_dict) # drop the variable not in self.forcing_vars ds = ds[list(default_forcing_vars.values())] # assign pint units to the dataset variables renamed_forcing_units = {} for key, value in self.forcing_units.items(): new_key = default_forcing_vars.get(key) renamed_forcing_units[new_key] = value ds = ds.pint.quantify(units=renamed_forcing_units, unit_registry=_ureg) # convert the dataset units to the default forcing units renamed_to_forcing_units = {} for key, value in default_forcing_units.items(): new_key = default_forcing_vars.get(key) renamed_to_forcing_units[new_key] = value ds = ds.pint.to(units=renamed_to_forcing_units) # after unit conversion, dequantify the dataset ds = ds.pint.dequantify() # resample the dataset to daily frequency if it is not already # first converting the dataset to a pandas.DataFrame df = ds.to_dataframe() # if a multi-index is present, drop any level not named `time` if df.index.nlevels > 1: df = df.reset_index(level=[level for level in df.index.names if level != 'time']) # resample the precipitation varaible to daily frequency df = df.resample('D').mean() # drop any column not named 'precip' or 'temp' df = df[list(default_forcing_vars.values())] # create the new attribute self.forcing = df return
[docs] def init_pet(self): """Initialize potential evapotranspiration (PET) calculation.""" if self.pet_method != "hamon": raise ValueError(f"Invalid PET method: {self.pet_method}. Only 'hamon' is supported.") # FIXME: More flexibility is needed in the future for different PET methods # extract the latitude from the catchment object, first checking if `cat` has a crs if not self.cat.crs: warnings.warn( "Catchment (cat) does not have a CRS. A default of " "EPSG:4326 will be used for latitude calculation.", UserWarning) self.cat.set_crs(epsg=4326, inplace=True) # if crs is not set to EPSG:4326, then convert it to EPSG:4326 elif self.cat.crs != 'EPSG:4326': self.cat.to_crs(epsg=4326, inplace=True) # calculate the latitude from the catchment geometry lat = self.cat.geometry.centroid.y.mean() # calculate the potential evapotranspiration using the Hamon method self.pet = pyet.temperature.hamon( tmean=self.forcing['temp'], lat=lat, ) return
[docs] def init_model_file( self, model_number: Sequence[int] | int ) -> None: """Initialize the model file for the given model number.""" if isinstance(model_number, int): model_number = [model_number] # create a list of model files using the model_number model_files = [] for num in model_number: # check if the model number is in the default model dict if num in default_model_dict: model_name = default_model_dict[num] model_files.append(model_name) else: raise ValueError(f"Model number {num} in MARRMoT is not supported.") # create the content of the model file self.model = render_models(model_files) # return the rendered content return