Skip to content

Commit

Permalink
750 configure workers (#777)
Browse files Browse the repository at this point in the history
* Use itertuples over iterrows since iterrows is an enormous memory hog.

* Drop sources_df columns before renaming id column to avoid a copy of the while dataframe in memory.

* Decrease default partition size to 15MB

* Dont split (large-in-memory) list of DataFrames into dask bags (No performance hit).

* Don't write forced parquets in parallel (No perfomance hit for this).

* Initial configuration updates for processing options.

* Dont overwrite input DataFrame when writing parquets.

* Update CHANGELOG.md

* Address review comments.

* Copy YAML objects before revalidation so the can be garbage collected.

* Appease flake8

* Add processing options as optional with defaults.

* filter processing config to parallel association.

* Add a funtion to determine the number of workers and partitions for Dask.

* Use config values for num_workers and max_partition_size throughout pipeline.

* Correct working in config template.

* Update CHANGELOG.md

* Remove unused imports.

* Bump strictyaml to 1.6.2

* Use YAML 'null' to create Python None for all cores option.

* Make None the default in `calculate_workers_and_partitions` instead of 0

* Updated run config docs

* Allow null for num_workers_io and improve validation of processing parameters.

* Update num_workers_io default in docs.

---------

Co-authored-by: Dougal Dobie <[email protected]>
  • Loading branch information
mauch and ddobie authored Nov 5, 2024
1 parent 19883c2 commit abfd38d
Show file tree
Hide file tree
Showing 15 changed files with 934 additions and 715 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Added

- Added configuration options to specify number of workers and maximum partition size for parallel operations. [#777](https://github.com/askap-vast/vast-pipeline/pull/777)
- Added vast_pipeline.utils.delete_run.py to enable deletion of pipeline runs using raw SQL [#775](https://github.com/askap-vast/vast-pipeline/pull/775)

#### Changed
Expand All @@ -27,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#777](https://github.com/askap-vast/vast-pipeline/pull/777): feat: Allow user to specify number of cores and memory size of partitions via configuration.
- [#776](https://github.com/askap-vast/vast-pipeline/pull/776): fix: Minor memory optimisations
- [#775](https://github.com/askap-vast/vast-pipeline/pull/775): fix, feat: Enabled deletion of pipeline runs directly using SQL rather than via django
- [#734](https://github.com/askap-vast/vast-pipeline/pull/734): Shortened forced fits measurement names
Expand Down
15 changes: 15 additions & 0 deletions docs/using/runconfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ Below is an example of a default `config.yaml` file. Note that no images or othe
# aggregate pair metrics that are stored in Source objects.
source_aggregate_pair_metrics_min_abs_vs: 4.3

processing:
# Options to control use of Dask parallelism
# NOTE: These are advanced options and you should only change them if you know what you are doing.

# The total number of workers available to Dask ('null' means use one less than all cores)
num_workers: null

# The number of workers to use for disk IO operations (e.g. when reading images for forced extraction)
num_workers_io: 5

# The default maximum size (in MB) to allow per partition of Dask DataFrames
# Increasing this will create fewer partitions and will potentially increase the memory footprint
# of parallel tasks.
max_partition_mb: 15

```

!!! note
Expand Down
1,390 changes: 734 additions & 656 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ whitenoise = "^5.2.0"
gevent = { version = "^21.1.2", optional = true }
gunicorn = { version = "^20.0.4", optional = true }
forced-phot = { git = "https://github.com/askap-vast/forced_phot.git" }
strictyaml = "^1.3.2"
strictyaml = "^1.6.2"
colorcet = "^2.0.6"
matplotlib = "^3.5.0"
holoviews = "^1.14.7"
Expand Down
15 changes: 15 additions & 0 deletions vast_pipeline/config_template.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,18 @@ variability:
# Only measurement pairs where the Vs metric exceeds this value are selected for the
# aggregate pair metrics that are stored in Source objects.
source_aggregate_pair_metrics_min_abs_vs: {{ source_aggregate_pair_metrics_min_abs_vs }}

processing:
# Options to control use of Dask parallelism
# NOTE: These are advanced options and you should only change them if you know what you are doing.

# The total number of workers available to Dask ('null' means use one less than all cores)
num_workers: {{ num_workers }}

# The number of workers to use for disk IO operations (e.g. when reading images for forced extraction)
num_workers_io: {{ num_workers_io }}

# The default maximum size (in MB) to allow per partition of Dask DataFrames
# Increasing this will create fewer partitions and will potentially increase the memory footprint
# of parallel tasks.
max_partition_mb: {{ max_partition_mb }}
16 changes: 8 additions & 8 deletions vast_pipeline/pipeline/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pandas as pd
from typing import Tuple, Dict, List
import dask.dataframe as dd
from psutil import cpu_count

from astropy import units as u
from astropy.coordinates import SkyCoord
Expand All @@ -20,7 +19,7 @@
reconstruct_associtaion_dfs
)
from vast_pipeline.pipeline.config import PipelineConfig
from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions
from vast_pipeline.utils.utils import StopWatch, calculate_workers_and_partitions


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1586,11 +1585,12 @@ def parallel_association(
# Add an increment to any new source values when using add_mode to avoid
# getting duplicates in the result laater
id_incr_par_assoc = max(done_source_ids) if add_mode else 0

n_cpu = cpu_count() - 1
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(images_df, n_cpu)

n_workers, n_partitions = calculate_workers_and_partitions(
images_df,
n_cpu=config['processing']['num_workers'],
max_partition_mb=config['processing']['max_partition_mb']
)
logger.debug(f"Running association with {n_workers} CPUs")
# pass each skyreg_group through the normal association process.
results = (
dd.from_pandas(images_df.set_index('skyreg_group'), npartitions=n_partitions)
Expand All @@ -1608,7 +1608,7 @@ def parallel_association(
id_incr_par_assoc=id_incr_par_assoc,
parallel=True,
meta=meta
).compute(n_workers=n_cpu, scheduler='processes')
).compute(n_workers=n_workers, scheduler='processes')
)

# results are the normal dataframe of results with the columns:
Expand Down
25 changes: 24 additions & 1 deletion vast_pipeline/pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def make_config_template(template_path: str, **kwargs) -> str:
"""Generate the contents of a run configuration file from on a Jinja2 template.
"""Generate the contents of a run configuration file from a Jinja2 template.
Args:
template_path: Path to a Jinja2 template.
Expand Down Expand Up @@ -114,6 +114,22 @@ class PipelineConfig:
"source_aggregate_pair_metrics_min_abs_vs": yaml.Float(),
}
),
yaml.Optional("processing"): yaml.Map(
{
yaml.Optional(
"num_workers",
default=settings.PIPE_RUN_CONFIG_DEFAULTS['num_workers']):
yaml.NullNone() | yaml.Int() | yaml.Str(),
yaml.Optional(
"num_workers_io",
default=settings.PIPE_RUN_CONFIG_DEFAULTS['num_workers_io']):
yaml.NullNone() | yaml.Int() | yaml.Str(),
yaml.Optional(
"max_partition_mb",
default=settings.PIPE_RUN_CONFIG_DEFAULTS['max_partition_mb']):
yaml.Int()
}
)
}
)
# path to default run config template
Expand Down Expand Up @@ -511,6 +527,13 @@ def validate(self, user: User = None):
if not os.path.exists(file):
raise PipelineConfigError(f"{file} does not exist.")

# ensure num_workers and num_workers_io are
# either None (from null in config yaml) or an integer
for param_name in ('num_workers', 'num_workers_io'):
param_value = self['processing'][param_name]
if (param_value is not None) and (type(param_value) is not int):
raise PipelineConfigError(f"{param_name} can only be an integer or 'null'")

def check_prev_config_diff(self) -> bool:
"""
Checks if the previous config file differs from the current config file. Used in
Expand Down
13 changes: 10 additions & 3 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def final_operations(
source_aggregate_pair_metrics_min_abs_vs: float,
add_mode: bool,
done_source_ids: List[int],
previous_parquets: Dict[str, str]
previous_parquets: Dict[str, str],
n_cpu: int = 0,
max_partition_mb: int = 15
) -> Tuple[int, int]:
"""
Performs the final operations of the pipeline:
Expand Down Expand Up @@ -136,7 +138,9 @@ def final_operations(
)
log_total_memory_usage()

srcs_df = parallel_groupby(sources_df)
srcs_df = parallel_groupby(sources_df,
n_cpu=n_cpu,
max_partition_mb=max_partition_mb)

mem_usage = get_df_memory_usage(srcs_df)
logger.info('Groupby-apply time: %.2f seconds', timer.reset())
Expand Down Expand Up @@ -179,7 +183,10 @@ def final_operations(
# create measurement pairs, aka 2-epoch metrics
if calculate_pairs:
timer.reset()
measurement_pairs_df = calculate_measurement_pair_metrics(sources_df)
measurement_pairs_df = calculate_measurement_pair_metrics(
sources_df,
n_cpu=n_cpu,
max_partition_mb=max_partition_mb)
logger.info(
'Measurement pair metrics time: %.2f seconds',
timer.reset())
Expand Down
24 changes: 15 additions & 9 deletions vast_pipeline/pipeline/forced_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pandas as pd
import dask.dataframe as dd
import dask.bag as db
from psutil import cpu_count
from glob import glob

from astropy import units as u
Expand All @@ -19,7 +18,7 @@
from vast_pipeline.pipeline.loading import make_upload_measurements

from forced_phot import ForcedPhot
from ..utils.utils import StopWatch
from ..utils.utils import StopWatch, calculate_workers_and_partitions
from vast_pipeline.image.utils import open_fits


Expand Down Expand Up @@ -259,7 +258,7 @@ def finalise_forced_dfs(
def parallel_extraction(
df: pd.DataFrame, df_images: pd.DataFrame, df_sources: pd.DataFrame,
min_sigma: float, edge_buffer: float, cluster_threshold: float,
allow_nan: bool, add_mode: bool, p_run_path: str
allow_nan: bool, add_mode: bool, p_run_path: str, n_workers: int = 5
) -> pd.DataFrame:
"""
Parallelize forced extraction with Dask
Expand Down Expand Up @@ -289,6 +288,8 @@ def parallel_extraction(
True when the pipeline is running in add image mode.
p_run_path:
The system path of the pipeline run output.
n_workers:
The desired number of workers for Dask
Returns:
Dataframe with forced extracted measurements data, columns are
Expand Down Expand Up @@ -374,7 +375,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
npartitions=len(list_meas_parquets)
)
.map(get_data_from_parquet, p_run_path, add_mode)
.compute()
.compute(num_workers=n_workers, scheduler="processes")
)
mapping = pd.DataFrame(mapping)
# remove not used columns from images_df and merge into mapping
Expand All @@ -393,7 +394,6 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
)
del col_to_drop

n_cpu = cpu_count() - 1
bags = db.from_sequence(list_to_map, npartitions=len(list_to_map))
forced_dfs = (
bags.map(lambda x: extract_from_image(
Expand All @@ -402,7 +402,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
allow_nan=allow_nan,
**x
))
.compute()
.compute(num_workers=n_workers, scheduler='processes')
)
del bags
# create intermediates dfs combining the mapping data and the forced
Expand Down Expand Up @@ -497,7 +497,8 @@ def forced_extraction(
sources_df: pd.DataFrame, cfg_err_ra: float, cfg_err_dec: float,
p_run: Run, extr_df: pd.DataFrame, min_sigma: float, edge_buffer: float,
cluster_threshold: float, allow_nan: bool, add_mode: bool,
done_images_df: pd.DataFrame, done_source_ids: List[int]
done_images_df: pd.DataFrame, done_source_ids: List[int],
n_cpu: int = 5
) -> Tuple[pd.DataFrame, int]:
"""
Check and extract expected measurements, and associated them with the
Expand Down Expand Up @@ -533,6 +534,8 @@ def forced_extraction(
done_source_ids:
List of the source ids that were already present in the previous
run (used in add image mode).
n_cpu:
The desired number of workers for Dask.
Returns:
The `sources_df` with the extracted sources added.
Expand Down Expand Up @@ -604,11 +607,14 @@ def forced_extraction(
f" (from {total_to_extract} total)"
)

# Don't care about n_partitions in this step
n_workers, _ = calculate_workers_and_partitions(None, n_cpu)

timer.reset()
extr_df = parallel_extraction(
extr_df, images_df, sources_df[['source', 'image', 'flux_peak']],
min_sigma, edge_buffer, cluster_threshold, allow_nan, add_mode,
p_run.path
p_run.path, n_workers=n_workers
)
logger.info(
'Force extraction step time: %.2f seconds', timer.reset()
Expand Down Expand Up @@ -712,7 +718,7 @@ def forced_extraction(
n_forced = (
dd.read_parquet(forced_parquets, columns=['id'])
.count()
.compute()
.compute(num_workers=n_workers, scheduler='processes')
.values[0]
)
else:
Expand Down
13 changes: 10 additions & 3 deletions vast_pipeline/pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def process_pipeline(self, p_run: Run) -> None:
sources_df.loc[sources_df['forced'] == False, missing_source_cols],
images_df,
skyregs_df,
n_cpu=self.config['processing']['num_workers'],
max_partition_mb=self.config['processing']['max_partition_mb']
)

# STEP #4 New source analysis
Expand All @@ -274,7 +276,9 @@ def process_pipeline(self, p_run: Run) -> None:
missing_sources_df,
self.config["new_sources"]["min_sigma"],
self.config["source_monitoring"]["edge_buffer_scale"],
p_run
p_run,
n_cpu=self.config['processing']['num_workers_io'],
max_partition_mb=self.config['processing']['max_partition_mb']
)

# Drop column no longer required in missing_sources_df.
Expand All @@ -299,7 +303,8 @@ def process_pipeline(self, p_run: Run) -> None:
self.config["source_monitoring"]["allow_nan"],
self.add_mode,
done_images_df,
done_source_ids
done_source_ids,
n_cpu=self.config['processing']['num_workers_io']
)
mem_usage = get_df_memory_usage(sources_df)
logger.debug(f"Step 5: sources_df memory usage: {mem_usage}MB")
Expand All @@ -319,7 +324,9 @@ def process_pipeline(self, p_run: Run) -> None:
self.config["variability"]["source_aggregate_pair_metrics_min_abs_vs"],
self.add_mode,
done_source_ids,
self.previous_parquets
self.previous_parquets,
n_cpu=self.config['processing']['num_workers'],
max_partition_mb=self.config['processing']['max_partition_mb']
)

log_total_memory_usage()
Expand Down
Loading

0 comments on commit abfd38d

Please sign in to comment.