Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot use meteodatalab in multiple threads #48

Open
frazane opened this issue Nov 5, 2024 · 0 comments
Open

Cannot use meteodatalab in multiple threads #48

frazane opened this issue Nov 5, 2024 · 0 comments

Comments

@frazane
Copy link
Collaborator

frazane commented Nov 5, 2024

Sometimes meteodatalab is used in situations where most of the workload is I/O, so it would make sense to use multiple threads. I find that this is not possible at the moment. When launching embarassingly parallel tasks over multiple threads, only the first submitted thread succeeds and the others fail. That's because of the way the eccodes definition paths are set in the first thread. Related to #49

Minimal reproducible example

from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

from meteodatalab.data_source import FileDataSource
from meteodatalab import grib_decoder

ROOT_DIR = Path("/store_new/mch/msopr/ml/KENDA_pp/2015")
OUT_DIR = Path("/tmp/")
laffiles = sorted([str(f) for f in ROOT_DIR.glob("laf201512012*")])
outfiles = [OUT_DIR / f"laf_lff_{f[-10:]}" for f in laffiles]


def do_something(laffile, outfile):

    lafds = FileDataSource(datafiles=[laffile])
    ds = grib_decoder.load(lafds, {"param": "T", "levelist": [1, 2, 3]})
    
    Path(outfile).parent.mkdir(parents=True, exist_ok=True)
    with open(outfile, "wb") as fof:
        print(f"Saving to {outfile}")
        for var in ds.values():
            grib_decoder.save(var, fof)

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    for laffile, outfile in zip(laffiles, outfiles):
        futures.append(executor.submit(do_something, laffile, outfile))

    for future in as_completed(futures):
        try:
            future.result()
        except Exception as e:
            print(f"Failed for {future}")
            raise ValueError(f"Error!") from e

print("Done!")

This gives the following traceback (from one of the threads):

    ds = grib_decoder.load(lafds, {"param": "T", "levelist": [1, 2, 3]})
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/mch/fzanetta/miniforge3/envs/model-archive-process/lib/python3.12/site-packages/meteodatalab/grib_decoder.py", line 271, in load
    buffer_map = _load_buffer_map(source, request)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/mch/fzanetta/miniforge3/envs/model-archive-process/lib/python3.12/site-packages/meteodatalab/grib_decoder.py", line 198, in _load_buffer_map
    for field in fs:
                 ^^
  File "/scratch/mch/fzanetta/miniforge3/envs/model-archive-process/lib/python3.12/site-packages/meteodatalab/data_source.py", line 87, in _
    with grib_def_ctx(grib_def):
         ^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/mch/fzanetta/miniforge3/envs/model-archive-process/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/scratch/mch/fzanetta/miniforge3/envs/model-archive-process/lib/python3.12/site-packages/meteodatalab/data_source.py", line 34, in cosmo_grib_defs
    raise RuntimeError(f"{path} does not exist")
RuntimeError: /scratch/mch/fzanetta/miniforge3/envs/model-archive-process/share/eccodes-cosmo-resources/definitions:/scratch/mch/fzanetta/miniforge3/envs/model-archive-process/share/eccodes/definitions does not exist

Current workaround

I am replacing this line

raise RuntimeError(f"{path} does not exist")

with del path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant