diff --git a/.github/workflows/CI-test.yaml b/.github/workflows/CI-test.yaml index 1a38313f..66bb9036 100644 --- a/.github/workflows/CI-test.yaml +++ b/.github/workflows/CI-test.yaml @@ -32,6 +32,18 @@ jobs: - name: Install package run: | python -m pip install .[dev] + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + # - name: Check internet connectivity of CI runner + # run: | + # sudo apt-get install -y curl + # curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.deb.sh | sudo bash + # sudo apt-get install -y speedtest + # speedtest --accept-license - name: Test if data will work (Meta-Test) run: | export HDF5_DEBUG=1 diff --git a/conftest.py b/conftest.py index 1f28f001..fdfc9c6b 100644 --- a/conftest.py +++ b/conftest.py @@ -6,6 +6,7 @@ "tests.fixtures.config_files", "tests.fixtures.configs", "tests.fixtures.environment", + "tests.fixtures.example_data.awicm_recom", "tests.fixtures.example_data.fesom_2p6_pimesh", "tests.fixtures.example_data.pi_uxarray", "tests.fixtures.fake_data.fesom_mesh", diff --git a/setup.cfg b/setup.cfg index 53f17450..1f273341 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,6 +8,6 @@ tag_prefix = 'v' max-line-length = 120 [flake8] max-line-length = 120 -exclude = cmip6-cmor-tables/CMIP6_CVs/src +exclude = cmip6-cmor-tables/CMIP6_CVs/src CMIP7_DReq_Software/ [isort] profile = black diff --git a/setup.py b/setup.py index 884985d8..f5be4bf6 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,13 @@ def read(filename): long_description=read("README.rst"), package_dir={"": "src"}, packages=find_packages(where="src", exclude=("tests",)), + # NOTE: Please keep this list sorted! In vim, you can use + # visual-block mode (Ctrl-V) to select the lines and then `:sort`. + # or use the vim-ism (starting anywhere in the list):: + # + # vi[:sort + # + # meaning: [v]isual [i]nside square brackets, command mode, sort, enter. install_requires=[ "bokeh", "cerberus", @@ -47,8 +54,10 @@ def read(filename): "flox", "h5netcdf", "imohash", - "netcdf4", # NOTE(PG): Shouldn't be a prereq for xarray? + "joblib", + "netcdf4", "numbagg", + "numpy", "pendulum", "pint-xarray", "prefect[dask]", diff --git a/src/pymorize/cli.py b/src/pymorize/cli.py index d84a50a9..dc4db121 100644 --- a/src/pymorize/cli.py +++ b/src/pymorize/cli.py @@ -11,6 +11,8 @@ from rich.traceback import install as rich_traceback_install from streamlit.web import cli as stcli +from pymorize.fesom_1p4.nodes_to_levels import convert + from . import _version, caching, dev_utils from .cmorizer import CMORizer from .filecache import fc @@ -136,7 +138,11 @@ def cache(verbose, quiet, logfile, profile_mem): return 0 -################################################################################ +@click.group() +def scripts(): + return 0 + + ################################################################################ ################################################################################ @@ -219,6 +225,22 @@ def directory(config_file, output_dir, verbose, quiet, logfile, profile_mem): cmorizer.check_rules_for_output_dir(output_dir) +################################################################################ +################################################################################ +################################################################################ + +################################################################################ +# COMMANDS FOR scripts +################################################################################ + + +@scripts.group() +def fesom1(): + pass + + +fesom1.add_command(convert, name="nodes-to-levels") + ################################################################################ ################################################################################ ################################################################################ @@ -270,6 +292,7 @@ def populate_cache(files: List, verbose, quiet, logfile, profile_mem): cli.add_command(validate) cli.add_command(develop) cli.add_command(ssh_tunnel_cli, name="ssh-tunnel") +cli.add_command(scripts) def main(): diff --git a/src/pymorize/fesom_1p4/__init__.py b/src/pymorize/fesom_1p4/__init__.py new file mode 100644 index 00000000..3456a66a --- /dev/null +++ b/src/pymorize/fesom_1p4/__init__.py @@ -0,0 +1,3 @@ +# Move to top-level namespace: +from .load_mesh_data import * # noqa: F401, F403 +from .nodes_to_levels import * # noqa: F401, F403 diff --git a/src/pymorize/fesom_1p4/load_mesh_data.py b/src/pymorize/fesom_1p4/load_mesh_data.py new file mode 100644 index 00000000..b55d76e3 --- /dev/null +++ b/src/pymorize/fesom_1p4/load_mesh_data.py @@ -0,0 +1,430 @@ +# This file is part of pyfesom +# +################################################################################ +# +# Original code by Dmitry Sidorenko, 2013 +# +# Modifications: +# Nikolay Koldunov, 2016 +# - optimisation of reading ASCII fies (add pandas dependency) +# - move loading and processing of the mesh to the mesh class itself +# +# Paul Gierz, 2024 +# - extracted relevant functions from original code only for pymorize +# - general cleanup of booleans (usepickle, usejoblib) +# +################################################################################ + +import logging +import math as mt +import os +import pickle +from datetime import datetime + +import joblib +import numpy as np +import pandas as pd + + +def scalar_r2g(al, be, ga, rlon, rlat): + """ + Converts rotated coordinates to geographical coordinates. + + Parameters + ---------- + al : float + alpha Euler angle + be : float + beta Euler angle + ga : float + gamma Euler angle + rlon : array + 1d array of longitudes in rotated coordinates + rlat : array + 1d araay of latitudes in rotated coordinates + + Returns + ------- + lon : array + 1d array of longitudes in geographical coordinates + lat : array + 1d array of latitudes in geographical coordinates + + """ + + rad = mt.pi / 180 + al = al * rad + be = be * rad + ga = ga * rad + rotate_matrix = np.zeros(shape=(3, 3)) + rotate_matrix[0, 0] = np.cos(ga) * np.cos(al) - np.sin(ga) * np.cos(be) * np.sin(al) + rotate_matrix[0, 1] = np.cos(ga) * np.sin(al) + np.sin(ga) * np.cos(be) * np.cos(al) + rotate_matrix[0, 2] = np.sin(ga) * np.sin(be) + rotate_matrix[1, 0] = -np.sin(ga) * np.cos(al) - np.cos(ga) * np.cos(be) * np.sin( + al + ) + rotate_matrix[1, 1] = -np.sin(ga) * np.sin(al) + np.cos(ga) * np.cos(be) * np.cos( + al + ) + rotate_matrix[1, 2] = np.cos(ga) * np.sin(be) + rotate_matrix[2, 0] = np.sin(be) * np.sin(al) + rotate_matrix[2, 1] = -np.sin(be) * np.cos(al) + rotate_matrix[2, 2] = np.cos(be) + + rotate_matrix = np.linalg.pinv(rotate_matrix) + + rlat = rlat * rad + rlon = rlon * rad + + # Rotated Cartesian coordinates: + xr = np.cos(rlat) * np.cos(rlon) + yr = np.cos(rlat) * np.sin(rlon) + zr = np.sin(rlat) + + # Geographical Cartesian coordinates: + xg = rotate_matrix[0, 0] * xr + rotate_matrix[0, 1] * yr + rotate_matrix[0, 2] * zr + yg = rotate_matrix[1, 0] * xr + rotate_matrix[1, 1] * yr + rotate_matrix[1, 2] * zr + zg = rotate_matrix[2, 0] * xr + rotate_matrix[2, 1] * yr + rotate_matrix[2, 2] * zr + + # Geographical coordinates: + lat = np.arcsin(zg) + lon = np.arctan2(yg, xg) + + a = np.where((np.abs(xg) + np.abs(yg)) == 0) + if a: + lon[a] = 0 + + lat = lat / rad + lon = lon / rad + + return (lon, lat) + + +def load_mesh(path, abg=[50, 15, -90], get3d=True, usepickle=True, usejoblib=False): + """Loads FESOM mesh + + Parameters + ---------- + path : str + Path to the directory with mesh files + abg : list + alpha, beta and gamma Euler angles. Default [50, 15, -90] + get3d : bool + do we load complete 3d mesh or only 2d nodes. + + Returns + ------- + mesh : object + fesom_mesh object + """ + python_version = "3" + path = os.path.abspath(path) + if usepickle and usejoblib: + raise ValueError( + "Both `usepickle` and `usejoblib` set to True, select only one" + ) + + if usepickle: + pickle_file = os.path.join(path, "pickle_mesh_py3") + + if usejoblib: + joblib_file = os.path.join(path, "joblib_mesh") + + if usepickle and (os.path.isfile(pickle_file)): + print("The usepickle == True)") + print("The pickle file for python 3 exists.") + print("The mesh will be loaded from {}".format(pickle_file)) + + ifile = open(pickle_file, "rb") + mesh = pickle.load(ifile) + ifile.close() + return mesh + + elif usepickle and not os.path.isfile(pickle_file): + print("The usepickle == True") + print("The pickle file for python 3 DO NOT exists") + print("The mesh will be saved to {}".format(pickle_file)) + + mesh = fesom_mesh(path=path, abg=abg, get3d=get3d) + logging.info("Use pickle to save the mesh information") + print("Save mesh to binary format") + outfile = open(pickle_file, "wb") + pickle.dump(mesh, outfile) + outfile.close() + return mesh + + elif not usepickle and not usejoblib: + mesh = fesom_mesh(path=path, abg=abg, get3d=get3d) + return mesh + + if usejoblib and os.path.isfile(joblib_file): + print("The usejoblib == True)") + print("The joblib file for python {} exists.".format(str(python_version))) + print("The mesh will be loaded from {}".format(joblib_file)) + + mesh = joblib.load(joblib_file) + return mesh + + elif usejoblib and not os.path.isfile(joblib_file): + print("The usejoblib == True") + print("The joblib file for python {} DO NOT exists".format(str(python_version))) + print("The mesh will be saved to {}".format(joblib_file)) + + mesh = fesom_mesh(path=path, abg=abg, get3d=get3d) + logging.info("Use joblib to save the mesh information") + print("Save mesh to binary format") + joblib.dump(mesh, joblib_file) + + return mesh + + +class fesom_mesh(object): + """Creates instance of the FESOM mesh. + + This class creates instance that contain information + about FESOM mesh. At present the class works with + ASCII representation of the FESOM grid, but should be extended + to be able to read also netCDF version (probably UGRID convention). + + Minimum requirement is to provide the path to the directory, + where following files should be located (not nessesarely all of them will + be used): + + - nod2d.out + - nod3d.out + - elem2d.out + - aux3d.out + + Parameters + ---------- + path : str + Path to the directory with mesh files + + abg : list + alpha, beta and gamma Euler angles. Default [50, 15, -90] + + get3d : bool + do we load complete 3d mesh or only 2d nodes. + + Attributes + ---------- + path : str + Path to the directory with mesh files + x2 : array + x position (lon) of the surface node + y2 : array + y position (lat) of the surface node + n2d : int + number of 2d nodes + e2d : int + number of 2d elements (triangles) + n3d : int + number of 3d nodes + nlev : int + number of vertical levels + zlevs : array + array of vertical level depths + voltri : array + array with 2d volume of triangles + alpha : float + Euler angle alpha + beta : float + Euler angle beta + gamma : float + Euler angle gamma + + Returns + ------- + mesh : object + fesom_mesh object + """ + + def __init__(self, path, abg=[50, 15, -90], get3d=True): + self.path = os.path.abspath(path) + + if not os.path.exists(self.path): + raise IOError('The path "{}" does not exists'.format(self.path)) + + self.alpha = abg[0] + self.beta = abg[1] + self.gamma = abg[2] + + self.nod2dfile = os.path.join(self.path, "nod2d.out") + self.elm2dfile = os.path.join(self.path, "elem2d.out") + self.aux3dfile = os.path.join(self.path, "aux3d.out") + self.nod3dfile = os.path.join(self.path, "nod3d.out") + + self.n3d = 0 + self.e2d = 0 + self.nlev = 0 + self.zlevs = [] + # self.x2= [] + # self.y2= [] + # self.elem= [] + self.n32 = [] + # self.no_cyclic_elem=[] + self.topo = [] + self.voltri = [] + + logging.info("load 2d part of the grid") + start = datetime.now() + self.read2d() + end = datetime.now() + print("Load 2d part of the grid in {} second(s)".format(end - start)) + + if get3d: + logging.info("load 3d part of the grid") + start = datetime.now() + self.read3d() + end = datetime.now() + print("Load 3d part of the grid in {} seconds".format(end - start)) + + def read2d(self): + """Reads only surface part of the mesh. + Useful if your mesh is large and you want to visualize only surface. + """ + file_content = pd.read_csv( + self.nod2dfile, + delim_whitespace=True, + skiprows=1, + names=["node_number", "x", "y", "flag"], + ) + self.x2 = file_content.x.values + self.y2 = file_content.y.values + self.ind2d = file_content.flag.values + self.n2d = len(self.x2) + + file_content = pd.read_csv( + self.elm2dfile, + delim_whitespace=True, + skiprows=1, + names=["first_elem", "second_elem", "third_elem"], + ) + self.elem = file_content.values - 1 + self.e2d = np.shape(self.elem)[0] + + ########################################### + # here we compute the volumes of the triangles + # this should be moved into fesom generan mesh output netcdf file + # + r_earth = 6371000.0 + rad = np.pi / 180 + edx = self.x2[self.elem] + edy = self.y2[self.elem] + ed = np.array([edx, edy]) + + jacobian2D = ed[:, :, 1] - ed[:, :, 0] + jacobian2D = np.array([jacobian2D, ed[:, :, 2] - ed[:, :, 0]]) + for j in range(2): + mind = [i for (i, val) in enumerate(jacobian2D[j, 0, :]) if val > 355] + pind = [i for (i, val) in enumerate(jacobian2D[j, 0, :]) if val < -355] + jacobian2D[j, 0, mind] = jacobian2D[j, 0, mind] - 360 + jacobian2D[j, 0, pind] = jacobian2D[j, 0, pind] + 360 + + jacobian2D = jacobian2D * r_earth * rad + + for k in range(2): + jacobian2D[k, 0, :] = jacobian2D[k, 0, :] * np.cos(edy * rad).mean(axis=1) + + self.voltri = abs(np.linalg.det(np.rollaxis(jacobian2D, 2))) / 2.0 + + # compute the 2D lump operator + # cnt = np.array((0,) * self.n2d) + self.lump2 = np.array((0.0,) * self.n2d) + for i in range(3): + for j in range(self.e2d): + n = self.elem[j, i] + # cnt[n]=cnt[n]+1 + self.lump2[n] = self.lump2[n] + self.voltri[j] + self.lump2 = self.lump2 / 3.0 + + self.x2, self.y2 = scalar_r2g( + self.alpha, self.beta, self.gamma, self.x2, self.y2 + ) + d = self.x2[self.elem].max(axis=1) - self.x2[self.elem].min(axis=1) + self.no_cyclic_elem = [i for (i, val) in enumerate(d) if val < 100] + + return self + + def read3d(self): + """ + Reads 3d part of the mesh. + """ + self.n3d = int(open(self.nod3dfile).readline().rstrip()) + df = pd.read_csv( + self.nod3dfile, + delim_whitespace=True, + skiprows=1, + names=["node_number", "x", "y", "z", "flag"], + ) + zcoord = -df.z.values + self.zlevs = np.unique(zcoord) + + with open(self.aux3dfile) as f: + self.nlev = int(next(f)) + self.n32 = np.fromiter( + f, dtype=np.int32, count=self.n2d * self.nlev + ).reshape(self.n2d, self.nlev) + + self.topo = np.zeros(shape=(self.n2d)) + for prof in self.n32: + ind_nan = prof[prof > 0] + ind_nan = ind_nan[-1] + self.topo[prof[0] - 1] = zcoord[ind_nan - 1] + + return self + + def __repr__(self): + meshinfo = """ +FESOM mesh: +path = {} +alpha, beta, gamma = {}, {}, {} +number of 2d nodes = {} +number of 2d elements = {} +number of 3d nodes = {} + + """.format( + self.path, + str(self.alpha), + str(self.beta), + str(self.gamma), + str(self.n2d), + str(self.e2d), + str(self.n3d), + ) + return meshinfo + + def __str__(self): + return self.__repr__() + + +def ind_for_depth(depth, mesh): + """ + Return indexes that belong to certain depth. + + Parameters + ---------- + depth : float + desired depth. Note there will be no interpolation, the model level + that is closest to desired depth will be selected. + mesh : object + FESOM mesh object + + Returns + ------- + ind_depth : 1d array + vector with the size equal to the size of the surface nodes with index values where + we have data values and missing values where we don't have data values. + ind_noempty : 1d array + vector with indexes of the `ind_depth` that have data values. + ind_empty : 1d array + vector with indexes of the `ind_depth` that do not have data values. + """ + + # Find indexes of the model depth that are closest to the required depth. + dind = (abs(mesh.zlevs - depth)).argmin() + # select data from the level and find indexes with values and with nans + ind_depth = mesh.n32[:, dind] - 1 + ind_noempty = np.where(ind_depth >= 0)[0] + ind_empty = np.where(ind_depth < 0)[0] + return ind_depth, ind_noempty, ind_empty diff --git a/src/pymorize/fesom_1p4/nodes_to_levels.py b/src/pymorize/fesom_1p4/nodes_to_levels.py new file mode 100644 index 00000000..0f209ca9 --- /dev/null +++ b/src/pymorize/fesom_1p4/nodes_to_levels.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +This module contains a function to convert FESOM 1.4 output data stored in +the dimensions (nodes, time) to the dimensions (nodes, levels, time). + +You can include it in your Pipeline by using the step:: + + pymorize.fesom1.nodes_to_levels + +This script can also be used stand-alone:: + + $ pymorize scripts fesom1 nodes_to_levels mesh in_path out_path [variable] + +The argument ``[variable]`` defaults to ``"temp"``. +""" +import os + +import numpy as np +import rich_click as click +import xarray as xr +from dask.diagnostics import ProgressBar + +from .load_mesh_data import ind_for_depth, load_mesh + + +def open_dataset(filepath): + """Open a dataset with Xarray.""" + return xr.open_dataset(filepath, engine="netcdf4", decode_times=False) + + +def save_dataset(ds, filepath, compute=True): + """Save an Xarray dataset to a NetCDF file.""" + print(f"Saving {filepath}") + with ProgressBar(): + ds.to_netcdf(filepath, mode="w", format="NETCDF4", compute=compute) + + +def process_dataset(input_path, output_path, processor, skip=False): + """ + General framework for loading, processing, and saving datasets. + + Parameters: + input_path (str): Path to the input file. + output_path (str): Path to the output file. + processor (function): Function that takes an Xarray dataset and returns a processed one. + skip (bool): Whether to skip processing if the output file exists. + """ + if skip and os.path.isfile(output_path): + print(f"File {output_path} exists. Skipping.") + return + + ds_in = open_dataset(input_path) + ds_out = processor(ds_in) + save_dataset(ds_out, output_path) + ds_in.close() + + +def interpolate_to_levels(data, mesh, indices): + """ + Interpolates unstructured data onto depth levels for FESOM. + + Parameters: + data (np.ndarray): Input data for a single time step. + mesh (object): FESOM mesh object. + indices (dict): Precomputed depth and mask indices. + + Returns: + np.ndarray: Interpolated data (depth, ncells). + """ + level_data = np.full((len(mesh.zlevs), mesh.n2d), np.nan) + for i, (ind_depth, ind_noempty, ind_empty) in enumerate( + zip( + indices["ind_depth_all"], + indices["ind_noempty_all"], + indices["ind_empty_all"], + ) + ): + level_data[i, ind_noempty] = data[ind_depth[ind_noempty]] + level_data[i, ind_empty] = np.nan # Fill missing values + return level_data + + +def interpolate_dataarray(ds_in, mesh, indices): + """ + Applies depth-level interpolation across an entire dataset. + + Parameters: + ds_in (xarray.DataArray): Input dataset. + mesh (object): FESOM mesh object. + variable (str): Variable name to interpolate. + indices (dict): Precomputed depth and mask indices. + + Returns: + xarray.Dataset: Dataset with interpolated values. + """ + variable = ds_in.name + + # Apply interpolation per time step + level_data = xr.apply_ufunc( + interpolate_to_levels, + # data_var, + ds_in, + input_core_dims=[["nodes_3d"]], + output_core_dims=[["depth", "ncells"]], + vectorize=True, + dask="parallelized", + kwargs={"mesh": mesh, "indices": indices}, + output_dtypes=[np.float32], + output_sizes={"depth": len(mesh.zlevs), "ncells": mesh.n2d}, + ) + + # Build the output dataset + coords = { + "time": ds_in["time"], + "depth": ("depth", mesh.zlevs), + "ncells": ("ncells", np.arange(mesh.n2d)), + } + attrs = ds_in.attrs.copy() + attrs.update( + { + "grid_type": "unstructured", + "description": f"{variable} interpolated to FESOM levels", + } + ) + + # FIXME(PG): This works but is hard to read. Level_data is already an xr.DataArray, so + # we need to get out the "data" attribute again... + ds_out = xr.Dataset( + {variable: (["time", "depth", "ncells"], level_data.data, attrs)}, + coords=coords, + ) + + # Add metadata for time and depth + ds_out["time"].attrs = ds_in["time"].attrs + ds_out["depth"].attrs = { + "units": "m", + "long_name": "depth", + "positive": "down", + "axis": "Z", + } + return ds_out + + +def interpolate_dataset(ds_in, variable, mesh, indices): + """Interpolate Dataset -> Interpolate DataArray converter function""" + da_in = ds_in[variable] + return interpolate_dataarray(da_in, mesh, indices) + + +def nodes_to_levels(data, rule): + mesh = rule.mesh_path + mesh = load_mesh(mesh) + indices = indicies_from_mesh(mesh) + data = interpolate_dataarray(data, mesh, indices) + return data + + +# FIXME(PG): This should be... a method of the mesh object?? +def indicies_from_mesh(mesh): + # Precompute depth indices + indices = { + "ind_depth_all": [], + "ind_noempty_all": [], + "ind_empty_all": [], + } + for zlev in mesh.zlevs: + ind_depth, ind_noempty, ind_empty = ind_for_depth(zlev, mesh) + indices["ind_depth_all"].append(ind_depth) + indices["ind_noempty_all"].append(ind_noempty) + indices["ind_empty_all"].append(ind_empty) + + return indices + + +def _convert(meshpath, ipath, opath, variable, ncore, skip): + """Main CLI for FESOM unstructured-to-structured conversion.""" + mesh = load_mesh(meshpath, usepickle=False, usejoblib=True) + indices = indicies_from_mesh(mesh) + + # Define a reusable processor function + def processor(ds_in): + return interpolate_dataset(ds_in, variable, mesh, indices) + + # Process all input files + for ifile in ipath: + ofile = os.path.join(opath, f"{os.path.basename(ifile)[:-3]}_levels.nc") + process_dataset(ifile, ofile, processor, skip=skip) + + +@click.command() +@click.argument("meshpath", type=click.Path(exists=True), required=True) +@click.argument("ipath", nargs=-1, type=click.Path(exists=True), required=True) +@click.argument("opath", nargs=1, required=False, default="./") +@click.argument("variable", nargs=1, required=False, default="temp") +@click.option( + "--ncore", + "-n", + default=2, + help="Number of cores (parallel processes)", + show_default=True, +) +@click.option( + "--skip", + "-s", + is_flag=True, + help="Skip the calculation if the output file already exists.", +) +def convert(meshpath, ipath, opath, variable, ncore, skip): + """Main CLI for FESOM unstructured-to-structured conversion.""" + _convert(meshpath, ipath, opath, variable, ncore, skip) + + +if __name__ == "__main__": + convert() diff --git a/tests/configs/test_config_awicm_1p0_recom.yaml b/tests/configs/test_config_awicm_1p0_recom.yaml new file mode 100644 index 00000000..f05f4453 --- /dev/null +++ b/tests/configs/test_config_awicm_1p0_recom.yaml @@ -0,0 +1,69 @@ +pymorize: + version: "unreleased" + use_xarray_backend: True + warn_on_no_rule: False + minimum_jobs: 8 + maximum_jobs: 10 +general: + name: "fesom_2p6_pimesh" + description: "This is a test configuration using esm-tools generated test data on PI Mesh" + maintainer: "pgierz" + email: "pgierz@awi.de" + cmor_version: "CMIP6" + mip: "CMIP" + frequency: "mon" + CMIP_Tables_Dir: "./cmip6-cmor-tables/Tables" +rules: + - name: "temp_with_levels" + experiment_id: "piControl" + output_directory: "./output" + source_id: "FESOM" + variant_label: "r1i1p1f1" + inputs: + - path: "REPLACE_ME/outdata/fesom" + pattern: "thetao.fesom..*.nc" + cmor_variable: "thetao" + model_variable: "thetao" + mesh_path: "REPLACE_ME/input/fesom/mesh" + pipelines: + - level_regridder +pipelines: + - name: level_regridder + steps: + - pymorize.gather_inputs.load_mfdataset + - pymorize.generic.get_variable + - pymorize.fesom_1p4.nodes_to_levels + - pymorize.caching.manual_checkpoint + - pymorize.generic.trigger_compute + - pymorize.generic.show_data +distributed: + worker: + memory: + target: 0.6 # Target 60% of worker memory usage + spill: 0.7 # Spill to disk when 70% of memory is used + pause: 0.8 # Pause workers if memory usage exceeds 80% + terminate: 0.95 # Terminate workers at 95% memory usage + resources: + CPU: 4 # Assign 4 CPUs per worker + death-timeout: 60 # Worker timeout if no heartbeat (seconds) +# SLURM-specific settings for launching workers +jobqueue: + slurm: + queue: compute # SLURM queue/partition to submit jobs + project: ab0246 # SLURM project/account name + cores: 4 # Number of cores per worker + memory: 128GB # Memory per worker + walltime: '00:30:00' # Maximum walltime per job + # interface: ib0 # Network interface for communication + job-extra: # Additional SLURM job options + - '--exclusive' # Run on exclusive nodes + # How to launch workers and scheduler + worker-template: + # Command to launch a Dask worker via SLURM + command: | + srun --ntasks=1 --cpus-per-task=4 --mem=128G dask-worker \ + --nthreads 4 --memory-limit 128GB --death-timeout 60 + # Command to launch the Dask scheduler + scheduler-template: + command: | + srun --ntasks=1 --cpus-per-task=1 dask-scheduler diff --git a/tests/fixtures/config_files.py b/tests/fixtures/config_files.py index 165b0438..2ff1d62a 100644 --- a/tests/fixtures/config_files.py +++ b/tests/fixtures/config_files.py @@ -26,3 +26,8 @@ def pi_uxarray_config_cmip7(): @pytest.fixture def fesom_2p6_pimesh_esm_tools_config(): return TEST_ROOT / "configs" / "test_config_fesom_2p6_pimesh.yaml" + + +@pytest.fixture +def awicm_1p0_recom_config(): + return TEST_ROOT / "configs" / "test_config_awicm_1p0_recom.yaml" diff --git a/tests/fixtures/example_data/awicm_recom.py b/tests/fixtures/example_data/awicm_recom.py new file mode 100644 index 00000000..5bdaf4ab --- /dev/null +++ b/tests/fixtures/example_data/awicm_recom.py @@ -0,0 +1,48 @@ +"""Example data for the FESOM model.""" + +import os +import tarfile +from pathlib import Path + +import pytest +import requests + +URL = "https://nextcloud.awi.de/s/DaQjtTS9xB7o7pL/download/awicm_1p0_recom.tar" +"""str : URL to download the example data from.""" + + +@pytest.fixture(scope="session") +def awicm_1p0_recom_download_data(tmp_path_factory): + cache_dir = tmp_path_factory.getbasetemp() / "cached_data" + cache_dir.mkdir(exist_ok=True) + data_path = cache_dir / "awicm_1p0_recom.tar" + + if not data_path.exists(): + response = requests.get(URL) + response.raise_for_status() + with open(data_path, "wb") as f: + f.write(response.content) + print(f"Data downloaded: {data_path}.") + else: + print(f"Using cached data: {data_path}.") + + return data_path + + +@pytest.fixture(scope="session") +def awicm_1p0_recom_data(awicm_1p0_recom_download_data): + data_dir = Path(awicm_1p0_recom_download_data).parent / "awicm_1p0_recom" + if not data_dir.exists(): + with tarfile.open(awicm_1p0_recom_download_data, "r") as tar: + tar.extractall(data_dir) + print(f"Data extracted to: {data_dir}.") + else: + print(f"Using cached extraction: {data_dir}.") + + for root, dirs, files in os.walk(data_dir): + print(f"Root: {root}") + for file in files: + print(f"File: {os.path.join(root, file)}") + + print(f">>> RETURNING: {data_dir / 'awicm_1p0_recom' }") + return data_dir / "awicm_1p0_recom" diff --git a/tests/fixtures/example_data/fesom_2p6_pimesh.py b/tests/fixtures/example_data/fesom_2p6_pimesh.py index 8f7cc3f7..42457edb 100644 --- a/tests/fixtures/example_data/fesom_2p6_pimesh.py +++ b/tests/fixtures/example_data/fesom_2p6_pimesh.py @@ -1,5 +1,6 @@ """Example data for the FESOM model.""" +import shutil import tarfile from pathlib import Path @@ -30,6 +31,17 @@ def fesom_2p6_esm_tools_download_data(tmp_path_factory): @pytest.fixture(scope="session") def fesom_2p6_pimesh_esm_tools_data(fesom_2p6_esm_tools_download_data): + I_need_to_make_a_local_copy = True + # Check if you have a local copy + # Useful for testing on your local laptop + local_cache_path = Path( + "~/.cache/pytest/github.com/esm-tools/pymorize" + ).expanduser() + local_cache_path = local_cache_path / "fesom_2p6_pimesh" + if local_cache_path.exists(): + I_need_to_make_a_local_copy = False + print(f"Using local cache: {local_cache_path}") + return local_cache_path data_dir = Path(fesom_2p6_esm_tools_download_data).parent / "fesom_2p6_pimesh" if not data_dir.exists(): with tarfile.open(fesom_2p6_esm_tools_download_data, "r") as tar: @@ -44,4 +56,20 @@ def fesom_2p6_pimesh_esm_tools_data(fesom_2p6_esm_tools_download_data): # print(f"File: {os.path.join(root, file)}") # print(f">>> RETURNING: {data_dir / 'fesom_2p6_pimesh' }") + if I_need_to_make_a_local_copy: + local_cache_path.mkdir(parents=True, exist_ok=True) + try: + shutil.copytree( + data_dir / "fesom_2p6_pimesh", + local_cache_path, + dirs_exist_ok=True, + ignore_dangling_symlinks=True, + ) + # (data_dir / "fesom_2p6_pimesh").copy(local_cache_path, follow_symlinks=True) + print(f"Local cache created: {local_cache_path}") + except Exception as e: + print(f"Failed to create local cache: {e}") + # Remove the local cache + shutil.rmtree(local_cache_path) + print(f">>> RETURNING: {data_dir / 'fesom_2p6_pimesh' }") return data_dir / "fesom_2p6_pimesh" diff --git a/tests/fixtures/sample_rules.py b/tests/fixtures/sample_rules.py index b9ab8637..35053e54 100644 --- a/tests/fixtures/sample_rules.py +++ b/tests/fixtures/sample_rules.py @@ -28,6 +28,29 @@ def fesom_2p6_esmtools_temp_rule(fesom_2p6_pimesh_esm_tools_data): ) +@pytest.fixture +def fesom_2p6_esmtools_temp_rule_without_data(): + pymorize_config = PymorizeConfigManager.from_pymorize_cfg({}) + return Rule.from_dict( + { + "name": "temp", + "experiment_id": "piControl", + "output_directory": "./output", + "source_id": "FESOM", + "variant_label": "r1i1p1f1", + "inputs": [ + { + "path": "REPLACE_ME/outdata/fesom", + "pattern": "temp.fesom..*.nc", + }, + ], + "cmor_variable": "thetao", + "model_variable": "temp", + "_pymorize_cfg": pymorize_config, + } + ) + + @pytest.fixture def pi_uxarray_temp_rule(pi_uxarray_data): pymorize_config = PymorizeConfigManager.from_pymorize_cfg({}) diff --git a/tests/integration/test_awicm_recom.py b/tests/integration/test_awicm_recom.py new file mode 100644 index 00000000..167f156a --- /dev/null +++ b/tests/integration/test_awicm_recom.py @@ -0,0 +1,23 @@ +import yaml + +from pymorize.cmorizer import CMORizer +from pymorize.logging import logger + + +def test_process(awicm_1p0_recom_config, awicm_1p0_recom_data): + logger.info(f"Processing {awicm_1p0_recom_config}") + with open(awicm_1p0_recom_config, "r") as f: + cfg = yaml.safe_load(f) + for rule in cfg["rules"]: + for input in rule["inputs"]: + input["path"] = input["path"].replace( + "REPLACE_ME", + str(f"{awicm_1p0_recom_data}/awi-esm-1-1-lr_kh800/piControl/"), + ) + if "mesh_path" in rule: + rule["mesh_path"] = rule["mesh_path"].replace( + "REPLACE_ME", + str(f"{awicm_1p0_recom_data}/awi-esm-1-1-lr_kh800/piControl/"), + ) + cmorizer = CMORizer.from_dict(cfg) + cmorizer.process() diff --git a/tests/meta/test_prefect_can_serialize_rules.py b/tests/meta/test_prefect_can_serialize_rules.py deleted file mode 100644 index 8935bc04..00000000 --- a/tests/meta/test_prefect_can_serialize_rules.py +++ /dev/null @@ -1,184 +0,0 @@ -from prefect import flow, task -from prefect.cache_policies import INPUTS, TASK_SOURCE - - -def test_prefect_can_serialize_rules(simple_rule): - @task - def my_step(data, rule): - return data - - @flow - def my_flow(): - data_in = None - data_out = my_step(data_in, simple_rule) - return data_out - - my_flow() - - -def test_prefect_can_serialize_rules_with_cache(simple_rule): - @task(cache_policy=TASK_SOURCE + INPUTS) - def my_step(data, rule): - return data - - @flow - def my_flow(): - data_in = None - data_out = my_step(data_in, simple_rule) - return data_out - - my_flow() - - -def test_prefect_can_serialize_rules_with_cache_in_nested_flow(simple_rule): - @task(cache_policy=TASK_SOURCE + INPUTS) - def my_step(data, rule): - return data - - @flow - def my_flow(): - data_in = None - data_out = my_step(data_in, simple_rule) - return data_out - - @flow - def my_flow_wrapper(): - return my_flow() - - my_flow_wrapper() - - -def test_prefect_can_serialize_as_pipeline(simple_rule): - class Pipeline: - @task(cache_policy=TASK_SOURCE + INPUTS) - def my_step(self, data, rule): - return data - - @flow - def run(self): - data_in = None - data_out = self.my_step(data_in, simple_rule) - return data_out - - class CMORizer: - def __init__(self, pipelines): - self.pipelines = pipelines - - @flow - def run(self): - results = [] - for pipeline in self.pipelines: - results.append(pipeline.run()) - return results - - pl = Pipeline() - cmorizer = CMORizer([pl]) - cmorizer.run() - - -def test_prefect_can_serialize_as_pipeline_with_cache(simple_rule): - class Pipeline: - @task - def my_step(self, data, rule): - return data - - @flow - def run(self): - data_in = None - data_out = self.my_step(data_in, simple_rule) - return data_out - - class CMORizer: - def __init__(self, pipelines): - self.pipelines = pipelines - - @flow - def run(self): - results = [] - for pipeline in self.pipelines: - results.append(pipeline.run()) - return results - - pl = Pipeline() - cmorizer = CMORizer([pl]) - cmorizer.run() - - -def test_prefect_can_serialize_simplified(): - @task - def my_step(data, rule): - return data - - class Pipeline: - - STEPS = [my_step] - - @flow - def run(self, data, rule_spec): - for step in self.STEPS: - data = step(data, rule_spec) - return data - - class CMORizer: - def __init__(self, rules, pipelines): - self.pipelines = pipelines - self.rules = rules - - @flow - def run(self): - results = [] - for rule in self.rules: - data = None - for pipeline in rule.pipelines: - data = pipeline.run(data, rule) - results.append(data) - return results - - class Rule: - def __init__(self, pipelines): - self.pipelines = pipelines - - pl = Pipeline() - rule = Rule([pl]) - cmorizer = CMORizer([rule], [pl]) - cmorizer.run() - - -def test_prefect_can_serialize_simplified_with_cache(): - @task(cache_policy=TASK_SOURCE + INPUTS) - def my_step(data, rule): - return data - - class Pipeline: - - STEPS = [my_step] - - @flow - def run(self, data, rule_spec): - for step in self.STEPS: - data = step(data, rule_spec) - return data - - class CMORizer: - def __init__(self, rules, pipelines): - self.pipelines = pipelines - self.rules = rules - - @flow - def run(self): - results = [] - for rule in self.rules: - data = None - for pipeline in rule.pipelines: - data = pipeline.run(data, rule) - results.append(data) - return results - - class Rule: - def __init__(self, pipelines): - self.pipelines = pipelines - - pl = Pipeline() - rule = Rule([pl]) - cmorizer = CMORizer([rule], [pl]) - cmorizer.run() diff --git a/tests/meta/test_pyfesom_load_mesh.py b/tests/meta/test_pyfesom_load_mesh.py new file mode 100644 index 00000000..7b06034b --- /dev/null +++ b/tests/meta/test_pyfesom_load_mesh.py @@ -0,0 +1,15 @@ +from pymorize.fesom_1p4 import load_mesh_data + + +def test_load_mesh_awicm_1p0_recom(awicm_1p0_recom_data): + try: + mesh = load_mesh_data.load_mesh( + f"{awicm_1p0_recom_data}/awi-esm-1-1-lr_kh800/piControl/input/fesom/mesh/" + ) + except Exception as e: + for path in ( + awicm_1p0_recom_data / "awi-esm-1-1-lr_kh800/piControl/input/fesom/mesh/" + ).iterdir(): + print(path) + raise e + assert mesh is not None diff --git a/tests/meta/test_xarray_open_mfdataset.py b/tests/meta/test_xarray_open_mfdataset.py index 966a56a8..101307e7 100644 --- a/tests/meta/test_xarray_open_mfdataset.py +++ b/tests/meta/test_xarray_open_mfdataset.py @@ -4,6 +4,20 @@ import xarray as xr +@pytest.mark.parametrize( + "engine", + [ + "netcdf4", + ], +) +def test_open_awicm_1p0_recom(awicm_1p0_recom_data, engine): + ds = xr.open_mfdataset( + f"{awicm_1p0_recom_data}/awi-esm-1-1-lr_kh800/piControl/outdata/fesom/*.nc", + engine=engine, + ) + assert isinstance(ds, xr.Dataset) + + @pytest.mark.parametrize( "engine", [ diff --git a/tests/unit/test_fesom_1p4_nodes_to_levels.py b/tests/unit/test_fesom_1p4_nodes_to_levels.py new file mode 100644 index 00000000..be488cb0 --- /dev/null +++ b/tests/unit/test_fesom_1p4_nodes_to_levels.py @@ -0,0 +1,18 @@ +import xarray as xr + +from pymorize.fesom_1p4 import indicies_from_mesh, interpolate_dataarray, load_mesh + + +def test_nodes_to_levels_with_awicm_1p0_recom_data(awicm_1p0_recom_data): + outdata_path_stub = "awi-esm-1-1-lr_kh800/piControl/outdata/fesom/" + outdata_files = sorted(list((awicm_1p0_recom_data / outdata_path_stub).iterdir())) + # NOTE(PG): Just check the first file, for this test + ds = xr.open_mfdataset(outdata_files).thetao + mesh = load_mesh( + f"{awicm_1p0_recom_data}/awi-esm-1-1-lr_kh800/piControl/input/fesom/mesh/" + ) + indices = indicies_from_mesh(mesh) + ds_out = interpolate_dataarray(ds, mesh, indices) + # NOTE(PG): For now, just check if the output object is created + # FIXME(PG): Correctness check... + assert ds_out is not None