diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index 04e88015..0d683af5 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -8,10 +8,12 @@ https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation """ +import io import logging import os import warnings import weakref +import zipfile from tempfile import mkstemp import cdsapi @@ -290,9 +292,9 @@ def retrieval_times(coords, static=False, monthly_requests=False): time = coords["time"].to_index() if static: return { - "year": str(time[0].year), - "month": str(time[0].month), - "day": str(time[0].day), + "year": [str(time[0].year)], + "month": [str(time[0].month).zfill(2)], + "day": [str(time[0].day).zfill(2)], "time": time[0].strftime("%H:00"), } @@ -304,16 +306,18 @@ def retrieval_times(coords, static=False, monthly_requests=False): for month in t.month.unique(): query = { "year": str(year), - "month": str(month), - "day": list(t[t.month == month].day.unique()), + "month": [str(month).zfill(2)], + "day": list( + t[t.month == month].day.unique().astype(str).str.zfill(2) + ), "time": ["%02d:00" % h for h in t[t.month == month].hour.unique()], } times.append(query) else: query = { - "year": str(year), - "month": list(t.month.unique()), - "day": list(t.day.unique()), + "year": [str(year)], + "month": list(t.month.unique().astype(str).str.zfill(2)), + "day": list(t.day.unique().astype(str).str.zfill(2)), "time": ["%02d:00" % h for h in t.hour.unique()], } times.append(query) @@ -338,7 +342,17 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): If you want to track the state of your request go to https://cds-beta.climate.copernicus.eu/requests?tab=all """ - request = {"product_type": "reanalysis", "format": "netcdf"} + + # Set url for data download, this allows to switch to different data + # sources more easily. + url = "https://cds.climate.copernicus.eu/api" + + request = { + "product_type": ["reanalysis"], + "data_format": "netcdf", + "download_format": "zip", + } + request.update(updates) assert {"year", "month", "variable"}.issubset( @@ -346,7 +360,7 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): ), "Need to specify at least 'variable', 'year' and 'month'" client = cdsapi.Client( - info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level + url=url, info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level ) result = client.retrieve(product, request) @@ -354,7 +368,7 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): lock = nullcontext() with lock: - fd, target = mkstemp(suffix=".nc", dir=tmpdir) + fd, target_zip = mkstemp(suffix=".zip", dir=tmpdir) os.close(fd) # Inform user about data being downloaded as "* variable (year-month)" @@ -362,12 +376,42 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): variables = atleast_1d(request["variable"]) varstr = "\n\t".join([f"{v} ({timestr})" for v in variables]) logger.info(f"CDS: Downloading variables\n\t{varstr}\n") - result.download(target) + result.download(target_zip) + + # Open the .zip file in memory + with zipfile.ZipFile(target_zip, "r") as zf: + # Identify .nc files inside the .zip + nc_files = [name for name in zf.namelist() if name.endswith(".nc")] + + if not nc_files: + raise FileNotFoundError( + "No .nc files found in the downloaded .zip archive." + ) + + if len(nc_files) == 1: + # If there's only one .nc file, read it into memory + with zf.open(nc_files[0]) as nc_file: + # Pass the in-memory file-like object to Xarray + ds = xr.open_dataset( + io.BytesIO(nc_file.read()), chunks=chunks or {} + ) + + else: + # If multiple .nc files, combine them using Xarray + datasets = [] + for nc_file in nc_files: + with zf.open(nc_file) as file: + datasets.append( + xr.open_dataset( + io.BytesIO(file.read()), chunks=chunks or {} + ) + ) + # Combine datasets along temporal dimension + ds = xr.merge(datasets) - ds = xr.open_dataset(target, chunks=chunks or {}) if tmpdir is None: - logger.debug(f"Adding finalizer for {target}") - weakref.finalize(ds._file_obj._manager, noisy_unlink, target) + logging.debug(f"Adding finalizer for {target_zip}") + weakref.finalize(ds._file_obj._manager, noisy_unlink, target_zip) return ds diff --git a/pyproject.toml b/pyproject.toml index fec16fc2..5a9eb093 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "numexpr", "xarray>=2024.03.0", "netcdf4", + "h5netcdf", # For new cds api. Maybe make optional / may not be needed with future netcdf4 updates "dask>=2021.10.0", "toolz", "requests",