Skip to content

Commit

Permalink
Merge branch 'main' into feat/find_eligible_input
Browse files Browse the repository at this point in the history
  • Loading branch information
pgierz committed Jul 24, 2024
2 parents e00d921 + 8f4b498 commit a8dda53
Show file tree
Hide file tree
Showing 10 changed files with 1,056 additions and 118 deletions.
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ def read(filename):
packages=find_packages(where="src", exclude=("tests",)),
install_requires=[
"cf_xarray",
"chemicals",
"click-loguru",
"dill",
"dpath",
"pendulum",
"pint-xarray",
"pyyaml",
"questionary",
Expand All @@ -44,6 +47,7 @@ def read(filename):
"isort",
"pre-commit",
"pytest",
"pytest-asyncio",
"pytest-cov",
"pytest-xdist",
"sphinx",
Expand Down
209 changes: 209 additions & 0 deletions src/pymorize/calendar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
"""
Yet another calendar implementation.
This module provides functions for creating date ranges.
The main components of this module are:
- ``year_bounds_major_digits``: generates a list of year ranges (bounds) where each range starts with a specific digit.
- ``date_ranges_from_bounds``: creates a list of date indexes from bounds
- ``date_ranges_from_year_bounds``: creates a list of date indexes from year bounds
- ``simple_ranges_from_bounds``: creates a list of simple ranges from bounds
Examples
--------
>>> year_bounds = year_bounds_major_digits(2000, 2010, 2, 2)
>>> print(year_bounds)
[[2000, 2001], [2002, 2003], [2004, 2005], [2006, 2007], [2008, 2009], [2010, 2010]]
>>> date_range = date_range_from_bounds(year_bounds, freq="Y")
>>> print(date_range)
([Timestamp('2000-12-31 00:00:00', freq='A-DEC'), Timestamp('2001-12-31 00:00:00', freq='A-DEC')], [Timestamp('2002-12-31 00:00:00', freq='A-DEC'), Timestamp('2003-12-31 00:00:00', freq='A-DEC')], [Timestamp('2004-12-31 00:00:00', freq='A-DEC'), Timestamp('2005-12-31 00:00:00', freq='A-DEC')], [Timestamp('2006-12-31 00:00:00', freq='A-DEC'), Timestamp('2007-12-31 00:00:00', freq='A-DEC')], [Timestamp('2008-12-31 00:00:00', freq='A-DEC'), Timestamp('2009-12-31 00:00:00', freq='A-DEC')], [Timestamp('2010-12-31 00:00:00', freq='A-DEC')])
>>> date_range = date_range_from_year_bounds(year_bounds, freq="Y")
>>> print(date_range)
([Timestamp('2000-12-31 00:00:00', freq='A-DEC'), Timestamp('2001-12-31 00:00:00', freq='A-DEC')], [Timestamp('2002-12-31 00:00:00', freq='A-DEC'), Timestamp('2003-12-31 00:00:00', freq='A-DEC')], [Timestamp('2004-12-31 00:00:00', freq='A-DEC'), Timestamp('2005-12-31 00:00:00', freq='A-DEC')], [Timestamp('2006-12-31 00:00:00', freq='A-DEC'), Timestamp('2007-12-31 00:00:00', freq='A-DEC')], [Timestamp('2008-12-31 00:00:00', freq='A-DEC'), Timestamp('2009-12-31 00:00:00', freq='A-DEC')], [Timestamp('2010-12-31 00:00:00', freq='A-DEC')])
"""

import pendulum
import xarray as xr
from loguru import logger


def year_bounds_major_digits(first, last, step, binning_digit, return_type=int):
"""
Generate year ranges with a specific first digit.
This function generates a list of year ranges (bounds) where each range starts with a specific digit (binning_digit).
The ranges are generated from a given start year (first) to an end year (last) with a specific step size.
Parameters
----------
first : int
The first year in the range.
last : int
The last year in the range.
step : int
The step size for the range.
binning_digit : int
The digit that each range should start with.
return_type : type, optional
The type of the elements in the returned list, either int or pendulum.DateTime. Defaults to int.
Returns
-------
list
A list of lists where each inner list is a range of years.
Raises
------
ValueError
If the binning_digit is greater than 10.
Examples
--------
>>> year_bounds_major_digits(2000, 2010, 2, 2)
[[2000, 2001], [2002, 2003], [2004, 2005], [2006, 2007], [2008, 2009], [2010, 2010]]
>>> year_bounds_major_digits(2000, 2010, 3, 3)
[[2000, 2002], [2003, 2005], [2006, 2008], [2009, 2010]]
Notes
-----
This function uses a while loop to iterate through the years from first to last.
It checks the ones digit of the current year and compares it with the binning_digit to determine the start of a new range.
If the first range is undersized (i.e., the binning_digit is in the ones digit of the first few years),
the function will continue to increment the current year until it hits the binning_digit.
If the first range is not undersized, the function will continue to increment the current year until it hits the next binning_digit.
Once a range is completed, it is appended to the bounds list and the process continues until the last year is reached.
"""
# NOTE(PG): This is a bit hacky and difficult to read, but all the tests pass...
logger.debug(
f"Running year_bounds_major_digits({first=}, {last=}, {step=}, {binning_digit=})"
)
if binning_digit >= 10:
raise ValueError("Give a binning_digit less than 10")
bounds = []
current_location = bin_start = first
first_bin_is_undersized = binning_digit in [
i % 10 for i in range(first, first + step)
]
bin_end = "underfull bin" if first_bin_is_undersized else bin_start + step
logger.debug(f"first_bin_is_undersized: {first_bin_is_undersized}")
first_bin_empty = True

while current_location <= last:
ones_digit = current_location % 10

if first_bin_empty:
if first_bin_is_undersized:
# Go until you hit the binning digit
if ones_digit != binning_digit:
current_location += 1
ones_digit = current_location % 10
else:
bounds.append([bin_start, current_location - 1])
logger.debug(
f"Appending bounds {bin_start=}, {current_location-1=}"
)
first_bin_empty = False
bin_start = current_location
else:
# Go until you hit the next binning digit
if ones_digit == binning_digit:
bounds.append([bin_start, current_location - 1])
logger.debug(
f"Appending bounds {bin_start=}, {current_location-1=}"
)
first_bin_empty = False
bin_start = current_location
else:
current_location += 1
else:
bin_end = bin_start + step
current_location += 1
if current_location == bin_end or current_location > last:
bounds.append([bin_start, min(current_location - 1, last)])
logger.debug(
f"Appending bounds {bin_start=}, {min(current_location-1, last)=}"
)
bin_start = current_location
if return_type is int:
return [[int(i) for i in bound] for bound in bounds]
elif return_type is pendulum.DateTime:
return [[pendulum.datetime(int(i), 1, 1) for i in bound] for bound in bounds]
else:
raise ValueError("return_type must be either int or pendulum.DateTime")


def date_ranges_from_bounds(bounds, freq: str = "M", **kwargs):
"""
Class method to create a list of instances from a list of start and end bounds.
Parameters
----------
bounds : list of tuple of str or datetime-like
A list of strings or datetime-like tuples each containing a start and end bound.
freq : str, optional
The frequency of the periods. Defaults to one month.
**kwargs :
Additional keyword arguments to pass to the date_range function.
Returns
-------
tuple
A tuple containing instances of the class for each provided bound.
Examples
--------
>>> bounds = [("2020-01-01", "2020-01-31"), ("2020-02-01", "2020-02-29")]
>>> date_ranges = date_ranges_from_bounds(bounds)
>>> print(date_ranges)
(DatetimeIndex(['2020-01-01', '2020-01-02', ..., '2020-01-31'], dtype='datetime64[ns]', freq='D'),
DatetimeIndex(['2020-02-01', '2020-02-02', ..., '2020-02-29'], dtype='datetime64[ns]', freq='D'))
>>> bounds = [("2020-01-01", "2020-12-31")]
>>> date_ranges = date_ranges_from_bounds(bounds, freq="M")
>>> print(date_ranges)
(DatetimeIndex(['2020-01-31', '2020-02-29', ..., '2020-12-31'], dtype='datetime64[ns]', freq='M'),)
"""
objs = []
for start, end in bounds:
objs.append(xr.date_range(start=start, end=end, freq=freq, **kwargs))
if len(objs) == 1:
return objs[0]
return (*objs,)


def date_ranges_from_year_bounds(year_bounds, freq: str = "M", **kwargs):
"""
Class method to create a list of instances from a list of year bounds.
Parameters
----------
year_bounds : list of lists or tuples
A list of lists, each containing a start and end year.
freq : str, optional
The frequency of the periods. Defaults to one month.
**kwargs :
Additional keyword arguments to pass to the date_range function.
"""
bounds = [
(pendulum.datetime(start, 1, 1), pendulum.datetime(end, 12, 31))
for start, end in year_bounds
]
return date_ranges_from_bounds(bounds, freq, **kwargs)


def simple_ranges_from_bounds(bounds):
"""
Create a list of simple ranges from a list of bounds.
"""
if len(bounds) == 1:
start, end = bounds[0]
return range(start, end + 1)
return [range(start, end + 1) for start, end in bounds]


def assign_time_axis(da: xr.DataArray, taxis):
return da.assign_coords(time=taxis)
112 changes: 94 additions & 18 deletions src/pymorize/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,6 @@
- `linear_transform`: Applies a linear transformation to the data of a NetCDF file.
- `invert_z_axis`: Inverts the z-axis of a NetCDF file.
Classes:
- `Rule`: Defines a set of instructions to process a file and convert it to a CMOR standard.
- `CMORMetadata`: Inherits from `Rule` and adds CMOR metadata to a NetCDF file.
- `CMORizer`: Manages a set of `Rule` objects and applies them to NetCDF files.
Each `Rule` object represents a set of transformations that should be applied to a NetCDF file to
make it comply with the CMOR standard. The `CMORizer` class manages a collection of `Rule` objects and
applies the appropriate ones to each file.
The `CMORMetadata` class is a special type of `Rule` that adds CMOR metadata to a file.
The Full CMOR (yes, bad pun):
* Applied if no other rule sets are given for a file
* Adds CMOR metadata to the file
Expand All @@ -27,15 +16,102 @@
"""

import datetime
import json
import re
import sys
from functools import partial
from pathlib import Path

import questionary
import xarray as xr
import yaml
from loguru import logger

from .utils import generate_partial_function, get_callable_by_name

def convert_units(filepath: Path, source_units: str, target_units: str):
"""
Converts the units of a NetCDF file, using the Pint library.
Parameters
----------
filepath : Path
Path to the input file.
source_units : str
target_units : str
"""
ds = xr.open_dataset(filepath)
ds = ds.pint.quantify()
ds = ds.pint.to(target_units)
ds = ds.pint.dequantify()
logger.info(f"Converted units of {filepath} from {source_units} to {target_units}")
ds.to_netcdf(filepath)


def set_cmor_metadata(filepath: Path, cmor_metadata: dict, attrs_to_skip=[]):
"""
Adds CMOR metadata to a NetCDF file.
Parameters
----------
filepath : Path
Path to the input file.
cmor_metadata : dict
Dictionary with the CMOR metadata to be added to the file.
attrs_to_skip : list of str, optional
List of attributes to skip when adding CMOR metadata.
"""
attrs_to_skip = attrs_to_skip or ["units", "cell_methods", "cell_measures"]
ds = xr.open_dataset(filepath)
for key, value in cmor_metadata.items():
if key in attrs_to_skip:
continue
ds.attrs[key] = value
hist_str = ds.attrs.get("history", "")
hist_str += f"\n{datetime.now()}: CMOR metadata added by ``pymorize``\n"
ds.to_netcdf(filepath)


def linear_transform(
filepath: Path, execute: bool = False, slope: float = 1, offset: float = 0
):
"""
Applies a linear transformation to the data of a NetCDF file.
Parameters
----------
filepath : Path
Path to the input file.
execute : bool, optional
slope: float, optional
offset: float, optional
"""
if execute:
ds = xr.open_dataset(filepath)
ds = ds * slope + offset
logger.info(f"Applied linear transformation to {filepath}")
ds.to_netcdf(filepath)
else:
logger.info(f"Would apply linear transformation to {filepath}")
logger.info(f"slope: {slope}, offset: {offset}")
logger.info("Use `execute=True` to apply changes")


def invert_z_axis(filepath: Path, execute: bool = False, flip_sign: bool = False):
"""
Inverts the z-axis of a NetCDF file.
Parameters
----------
filepath : Path
Path to the input file.
execute : bool, optional
If True, the function will execute the inversion. If False, it will
only print the changes that would be made.
"""
if execute:
ds = xr.open_dataset(filepath)
ds = ds.reindex(z=ds.z[::-1])
logger.info(f"Inverted order of z-axis of {filepath}")
if flip_sign:
ds["z"] *= -1
logger.info(f"Flipped sign of z-axis of {filepath}")
ds.to_netcdf(filepath)
else:
logger.info(f"Would invert z-axis of {filepath}")
if flip_sign:
logger.info("Would flip sign of z-axis")
logger.info("Use `execute=True` to apply changes")
Loading

0 comments on commit a8dda53

Please sign in to comment.