From 2b6a375563a41788536f70552bde1d93b056ed01 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 9 Jan 2025 15:07:41 +0100 Subject: [PATCH] wip --- src/pymorize/cmorizer.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 2648632..7f1bb46 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -554,20 +554,24 @@ def check_rules_for_output_dir(self, output_dir): logger.warning(filepath) def process(self, parallel=None): - logger.debug("Process start!") - self._match_pipelines_in_rules() - if parallel is None: - parallel = self._pymorize_cfg.get("parallel", True) - if parallel: - logger.debug("Parallel processing...") - # FIXME(PG): This is mixed up, hard-coding to prefect for now... - workflow_backend = self._pymorize_cfg.get( - "pipeline_orchestrator", "prefect" - ) - logger.debug(f"...with {workflow_backend}...") - return self.parallel_process(backend=workflow_backend) - else: - return self.serial_process() + with DaskContext(self._cluster): + # We encapsulate the flow in a context manager to ensure that the + # Dask cluster is available in the singleton, which could be used + # during unpickling to reattach it to a Pipeline. + logger.debug("Process start!") + self._match_pipelines_in_rules() + if parallel is None: + parallel = self._pymorize_cfg.get("parallel", True) + if parallel: + logger.debug("Parallel processing...") + # FIXME(PG): This is mixed up, hard-coding to prefect for now... + workflow_backend = self._pymorize_cfg.get( + "pipeline_orchestrator", "prefect" + ) + logger.debug(f"...with {workflow_backend}...") + return self.parallel_process(backend=workflow_backend) + else: + return self.serial_process() def parallel_process(self, backend="prefect"): if backend == "prefect": @@ -595,11 +599,7 @@ def dynamic_flow(): logger.debug("...done!") logger.debug("About to return dynamic_flow()...") - with DaskContext(self._cluster): - # We encapsulate the flow in a context manager to ensure that the - # Dask cluster is available in the singleton, which could be used - # during unpickling to reattach it to a Pipeline. - return dynamic_flow() + return dynamic_flow() def _parallel_process_dask(self, external_client=None): if external_client: