Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update retrieve_data in era5.py #414

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions atlite/datasets/era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation
"""

import io
import logging
import os
import warnings
import weakref
import zipfile
from tempfile import mkstemp

import cdsapi
Expand Down Expand Up @@ -290,9 +292,9 @@ def retrieval_times(coords, static=False, monthly_requests=False):
time = coords["time"].to_index()
if static:
return {
"year": str(time[0].year),
"month": str(time[0].month),
"day": str(time[0].day),
"year": [str(time[0].year)],
"month": [str(time[0].month).zfill(2)],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If leading zeros are required, perhaps this is better (and "%d", "%Y" for day and year):

Suggested change
"month": [str(time[0].month).zfill(2)],
"month": [time[0].strftime("%m")],

"day": [str(time[0].day).zfill(2)],
"time": time[0].strftime("%H:00"),
}

Expand All @@ -304,16 +306,18 @@ def retrieval_times(coords, static=False, monthly_requests=False):
for month in t.month.unique():
query = {
"year": str(year),
"month": str(month),
"day": list(t[t.month == month].day.unique()),
"month": [str(month).zfill(2)],
"day": list(
t[t.month == month].day.unique().astype(str).str.zfill(2)
),
"time": ["%02d:00" % h for h in t[t.month == month].hour.unique()],
}
times.append(query)
else:
query = {
"year": str(year),
"month": list(t.month.unique()),
"day": list(t.day.unique()),
"year": [str(year)],
"month": list(t.month.unique().astype(str).str.zfill(2)),
"day": list(t.day.unique().astype(str).str.zfill(2)),
"time": ["%02d:00" % h for h in t.hour.unique()],
}
times.append(query)
Expand All @@ -338,36 +342,76 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates):
If you want to track the state of your request go to
https://cds-beta.climate.copernicus.eu/requests?tab=all
"""
request = {"product_type": "reanalysis", "format": "netcdf"}

# Set url for data download, this allows to switch to different data
# sources more easily.
url = "https://cds.climate.copernicus.eu/api"
Comment on lines +346 to +348
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you switch to other sources? Could you explain?
That should probably be a constant at module level, and maybe an optional argument

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Lukas, you are right, I just added it here for later improvement. For example, I have a local version in which I added the glofas data, but they are stored under a different url, it is not cds, but rather ads. Switching the url in the dataset is then just simpler then going to the "config" file of the cds-api. Furthermore, with a fixed url in the data retrieval, changes can be made more easily. It was just a comment here and will be taken up in future additions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense. But I don't think we want to hardcode the url in here without the ability to pass it as an argument. We are basically overriding any urls a user may have set up manually via the cds api config file without being able to change it. So just move it to an extra argument to get this merged. Future additions we can make anyways


request = {
"product_type": ["reanalysis"],
"data_format": "netcdf",
"download_format": "zip",
}

request.update(updates)

assert {"year", "month", "variable"}.issubset(
request
), "Need to specify at least 'variable', 'year' and 'month'"

client = cdsapi.Client(
info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level
url=url, info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level
)
result = client.retrieve(product, request)

if lock is None:
lock = nullcontext()

with lock:
fd, target = mkstemp(suffix=".nc", dir=tmpdir)
fd, target_zip = mkstemp(suffix=".zip", dir=tmpdir)
lkstrp marked this conversation as resolved.
Show resolved Hide resolved
os.close(fd)

# Inform user about data being downloaded as "* variable (year-month)"
timestr = f"{request['year']}-{request['month']}"
variables = atleast_1d(request["variable"])
varstr = "\n\t".join([f"{v} ({timestr})" for v in variables])
logger.info(f"CDS: Downloading variables\n\t{varstr}\n")
result.download(target)
result.download(target_zip)

# Open the .zip file in memory
with zipfile.ZipFile(target_zip, "r") as zf:
# Identify .nc files inside the .zip
nc_files = [name for name in zf.namelist() if name.endswith(".nc")]

if not nc_files:
raise FileNotFoundError(
"No .nc files found in the downloaded .zip archive."
)

if len(nc_files) == 1:
# If there's only one .nc file, read it into memory
with zf.open(nc_files[0]) as nc_file:
# Pass the in-memory file-like object to Xarray
ds = xr.open_dataset(
io.BytesIO(nc_file.read()), chunks=chunks or {}
)

else:
# If multiple .nc files, combine them using Xarray
datasets = []
for nc_file in nc_files:
with zf.open(nc_file) as file:
datasets.append(
xr.open_dataset(
io.BytesIO(file.read()), chunks=chunks or {}
)
)
# Combine datasets along temporal dimension
ds = xr.merge(datasets)

ds = xr.open_dataset(target, chunks=chunks or {})
if tmpdir is None:
logger.debug(f"Adding finalizer for {target}")
weakref.finalize(ds._file_obj._manager, noisy_unlink, target)
logging.debug(f"Adding finalizer for {target_zip}")
weakref.finalize(ds._file_obj._manager, noisy_unlink, target_zip)

return ds

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"numexpr",
"xarray>=2024.03.0",
"netcdf4",
"h5netcdf", # For new cds api. Maybe make optional / may not be needed with future netcdf4 updates
"dask>=2021.10.0",
"toolz",
"requests",
Expand Down