Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pgierz committed Jan 9, 2025
1 parent 097c250 commit 2b6a375
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,20 +554,24 @@ def check_rules_for_output_dir(self, output_dir):
logger.warning(filepath)

def process(self, parallel=None):
logger.debug("Process start!")
self._match_pipelines_in_rules()
if parallel is None:
parallel = self._pymorize_cfg.get("parallel", True)
if parallel:
logger.debug("Parallel processing...")
# FIXME(PG): This is mixed up, hard-coding to prefect for now...
workflow_backend = self._pymorize_cfg.get(
"pipeline_orchestrator", "prefect"
)
logger.debug(f"...with {workflow_backend}...")
return self.parallel_process(backend=workflow_backend)
else:
return self.serial_process()
with DaskContext(self._cluster):
# We encapsulate the flow in a context manager to ensure that the
# Dask cluster is available in the singleton, which could be used
# during unpickling to reattach it to a Pipeline.
logger.debug("Process start!")
self._match_pipelines_in_rules()
if parallel is None:
parallel = self._pymorize_cfg.get("parallel", True)
if parallel:
logger.debug("Parallel processing...")
# FIXME(PG): This is mixed up, hard-coding to prefect for now...
workflow_backend = self._pymorize_cfg.get(
"pipeline_orchestrator", "prefect"
)
logger.debug(f"...with {workflow_backend}...")
return self.parallel_process(backend=workflow_backend)
else:
return self.serial_process()

def parallel_process(self, backend="prefect"):
if backend == "prefect":
Expand Down Expand Up @@ -595,11 +599,7 @@ def dynamic_flow():
logger.debug("...done!")

logger.debug("About to return dynamic_flow()...")
with DaskContext(self._cluster):
# We encapsulate the flow in a context manager to ensure that the
# Dask cluster is available in the singleton, which could be used
# during unpickling to reattach it to a Pipeline.
return dynamic_flow()
return dynamic_flow()

def _parallel_process_dask(self, external_client=None):
if external_client:
Expand Down

0 comments on commit 2b6a375

Please sign in to comment.