From 192289f44d29b79d02043943a00782b7f6791b23 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 29 Jun 2023 12:45:24 +1000 Subject: [PATCH 01/48] Initial changes --- vast_pipeline/pipeline/finalise.py | 34 ++++++++++++++++++++++++++++++ vast_pipeline/pipeline/loading.py | 10 +++++++++ vast_pipeline/pipeline/main.py | 5 ++++- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/finalise.py b/vast_pipeline/pipeline/finalise.py index b02a27d49..471d42a5b 100644 --- a/vast_pipeline/pipeline/finalise.py +++ b/vast_pipeline/pipeline/finalise.py @@ -133,6 +133,8 @@ def final_operations( sources_df.source.unique().shape[0] ) srcs_df = parallel_groupby(sources_df) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Initial srcs_df memory: {mem_usage}MB") logger.info('Groupby-apply time: %.2f seconds', timer.reset()) # add new sources @@ -145,6 +147,8 @@ def final_operations( how="left", ) srcs_df["new_high_sigma"] = srcs_df["new_high_sigma"].fillna(0.0) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after adding new sources: {mem_usage}MB") # calculate nearest neighbour srcs_skycoord = SkyCoord( @@ -159,12 +163,16 @@ def final_operations( # add the separation distance in degrees srcs_df['n_neighbour_dist'] = d2d.deg + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after nearest-neighbour: {mem_usage}MB") # create measurement pairs, aka 2-epoch metrics if calculate_pairs: timer.reset() measurement_pairs_df = calculate_measurement_pair_metrics(sources_df) logger.info('Measurement pair metrics time: %.2f seconds', timer.reset()) + mem_usage = measurement_pairs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"measurement_pairs_df memory: {mem_usage}MB") # calculate measurement pair metric aggregates for sources by finding the row indices # of the aggregate max of the abs(m) metric for each flux type. @@ -189,6 +197,8 @@ def final_operations( "m_abs_significant_max_int": 0.0, }) logger.info("Measurement pair aggregate metrics time: %.2f seconds", timer.reset()) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after calculate_pairs: {mem_usage}MB") else: logger.info( "Skipping measurement pair metric calculation as specified in the run configuration." @@ -201,18 +211,31 @@ def final_operations( # upload new ones first (new id's are fetched) src_done_mask = srcs_df.index.isin(done_source_ids) srcs_df_upload = srcs_df.loc[~src_done_mask].copy() + mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_upload initial memory: {mem_usage}MB") + srcs_df_upload = make_upload_sources(srcs_df_upload, p_run, add_mode) + mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_upload memory after upload: {mem_usage}MB") # And now update srcs_df_update = srcs_df.loc[src_done_mask].copy() + mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_update memory: {mem_usage}MB") logger.info( f"Updating {srcs_df_update.shape[0]} sources with new metrics.") + srcs_df = update_sources(srcs_df_update, batch_size=1000) + mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_update memory after update: {mem_usage}MB") # Add back together if not srcs_df_upload.empty: srcs_df = pd.concat([srcs_df, srcs_df_upload]) else: srcs_df = make_upload_sources(srcs_df, p_run, add_mode) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after upload_sources: {mem_usage}MB") + # gather the related df, upload to db and save to parquet file # the df will look like # @@ -230,11 +253,15 @@ def final_operations( .explode("related_list") .rename(columns={"id": "from_source_id", "related_list": "to_source_id"}) ) + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory: {mem_usage}MB") # for the column 'from_source_id', replace relation source ids with db id related_df["to_source_id"] = related_df["to_source_id"].map(srcs_df["id"].to_dict()) # drop relationships with the same source related_df = related_df[related_df["from_source_id"] != related_df["to_source_id"]] + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory after calcs: {mem_usage}MB") # write symmetrical relations to parquet related_df.to_parquet( @@ -256,7 +283,12 @@ def final_operations( ) logger.debug(f'Add mode: #{related_df.shape[0]} relations to upload.') + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory after partitioning: {mem_usage}MB") + make_upload_related_sources(related_df) + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory after upload: {mem_usage}MB") del related_df @@ -272,6 +304,8 @@ def final_operations( sources_df.drop('related', axis=1) .merge(srcs_df.rename(columns={'id': 'source_id'}), on='source') ) + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"sources_df memory after srcs_df merge: {mem_usage}MB") if add_mode: # Load old associations so the already uploaded ones can be removed diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index a7d8f459e..901837bb5 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -165,6 +165,9 @@ def make_upload_sources( Returns: The input dataframe with the 'id' column added. ''' + logger.debug("Uploading sources...") + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"sources_df memory usage: {mem_usage}MB") # create sources in DB with transaction.atomic(): if (add_mode is False and @@ -204,6 +207,8 @@ def make_upload_related_sources(related_df: pd.DataFrame) -> None: None. """ logger.info('Populate "related" field of sources...') + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory usage: {mem_usage}MB") bulk_upload_model(RelatedSource, related_models_generator(related_df)) @@ -220,6 +225,8 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: None. """ logger.info('Upload associations...') + mem_usage = associations_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"associations_df memory usage: {mem_usage}MB") bulk_upload_model( Association, association_models_generator(associations_df) ) @@ -237,6 +244,9 @@ def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame: Returns: Original DataFrame with the database ID attached to each row. """ + logger.info("Upload measurements...") + mem_usage = measurements_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"measurements_df memory usage: {mem_usage}MB") meas_dj_ids = bulk_upload_model( Measurement, measurement_models_generator(measurements_df), diff --git a/vast_pipeline/pipeline/main.py b/vast_pipeline/pipeline/main.py index 6e1fe1721..d383d7483 100644 --- a/vast_pipeline/pipeline/main.py +++ b/vast_pipeline/pipeline/main.py @@ -232,7 +232,8 @@ def process_pipeline(self, p_run: Run) -> None: self.previous_parquets, done_images_df ) - + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Step 2: sources_df memory usage: {mem_usage}MB") # Obtain the number of selavy measurements for the run # n_selavy_measurements = sources_df. nr_selavy_measurements = sources_df['id'].unique().shape[0] @@ -285,6 +286,8 @@ def process_pipeline(self, p_run: Run) -> None: done_images_df, done_source_ids ) + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Step 5: sources_df memory usage: {mem_usage}MB") del missing_sources_df From 93c4e427d8f931148a697e9097d9c7a0bbeffe95 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 4 Jul 2023 10:37:06 +1000 Subject: [PATCH 02/48] Bump to 1.0.1 for reference --- vast_pipeline/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/_version.py b/vast_pipeline/_version.py index b02bd1570..053832bc7 100644 --- a/vast_pipeline/_version.py +++ b/vast_pipeline/_version.py @@ -1 +1 @@ -__version__ = '1.0.0dev' +__version__ = '1.0.1dev' From 1a0eebb1f19fefaecd3fa3dd431bc56d72528dae Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 4 Jul 2023 13:09:06 +1000 Subject: [PATCH 03/48] Added forced photometry timer debug statements --- vast_pipeline/pipeline/forced_extraction.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 33bb9e04d..2cc07f7e8 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -147,14 +147,16 @@ def extract_from_image( df['wavg_dec'].values, unit=(u.deg, u.deg) ) - + timer = StopWatch() FP = ForcedPhot(image, background, noise) + logger.debug("Time to initialise FP for {image}: {timer.reset()}") flux, flux_err, chisq, DOF, cluster_id = FP.measure( P_islands, cluster_threshold=cluster_threshold, allow_nan=allow_nan, edge_buffer=edge_buffer ) + logger.debug("Time to measure FP for {image}: {timer.reset()}") df['flux_int'] = flux * 1.e3 df['flux_int_err'] = flux_err * 1.e3 df['chi_squared_fit'] = chisq From a05bb866c54a79b67bb78cf0bccf9c037281d99d Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 4 Jul 2023 13:29:51 +1000 Subject: [PATCH 04/48] Fixed previous commit --- vast_pipeline/pipeline/forced_extraction.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 2cc07f7e8..b30eb1716 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -149,14 +149,14 @@ def extract_from_image( ) timer = StopWatch() FP = ForcedPhot(image, background, noise) - logger.debug("Time to initialise FP for {image}: {timer.reset()}") + logger.debug(f"Time to initialise FP for {image}: {timer.reset()}") flux, flux_err, chisq, DOF, cluster_id = FP.measure( P_islands, cluster_threshold=cluster_threshold, allow_nan=allow_nan, edge_buffer=edge_buffer ) - logger.debug("Time to measure FP for {image}: {timer.reset()}") + logger.debug(f"Time to measure FP for {image}: {timer.reset()}") df['flux_int'] = flux * 1.e3 df['flux_int_err'] = flux_err * 1.e3 df['chi_squared_fit'] = chisq From fa0bda6e4a2b1649ce4958314e8fa22b22cc5e02 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 4 Jul 2023 13:55:21 +1000 Subject: [PATCH 05/48] Log number of sources being forced fit --- vast_pipeline/pipeline/forced_extraction.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index b30eb1716..93f722faa 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -149,14 +149,16 @@ def extract_from_image( ) timer = StopWatch() FP = ForcedPhot(image, background, noise) - logger.debug(f"Time to initialise FP for {image}: {timer.reset()}") + logger.debug(f"Time to initialise FP for {image}: {timer.reset()}s") + + logger.debug(f"Running forced fits for {len(df)} sources in {image}...") flux, flux_err, chisq, DOF, cluster_id = FP.measure( P_islands, cluster_threshold=cluster_threshold, allow_nan=allow_nan, edge_buffer=edge_buffer ) - logger.debug(f"Time to measure FP for {image}: {timer.reset()}") + logger.debug(f"Time to measure FP for {image}: {timer.reset()}s") df['flux_int'] = flux * 1.e3 df['flux_int_err'] = flux_err * 1.e3 df['chi_squared_fit'] = chisq From 1f5c7c692ee4fa33d0aca1264ed80638f69f8801 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 4 Jul 2023 15:14:24 +1000 Subject: [PATCH 06/48] Updated forced fitting scheduler to processes --- vast_pipeline/pipeline/forced_extraction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 93f722faa..305a7c208 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -368,7 +368,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]: allow_nan=allow_nan, **x )) - .compute() + .compute(scheduler='processes', num_workers=n_cpu) ) del bags # create intermediates dfs combining the mapping data and the forced From 84c442333928410d8c6cd1cca0964ffe391b7c2d Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 5 Jul 2023 13:53:08 +1000 Subject: [PATCH 07/48] Added timing debug loggging to new_sources.py --- vast_pipeline/pipeline/new_sources.py | 32 ++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index c5b751147..937bf767a 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -83,12 +83,14 @@ def get_image_rms_measurements( # input dataframe is empty, nothing to do return group image = group.iloc[0]['img_diff_rms_path'] - + get_rms_timer = StopWatch() with fits.open(image) as hdul: header = hdul[0].header wcs = WCS(header, naxis=2) data = hdul[0].data.squeeze() - + + logger.debug("{image} - Time to load fits: {get_rms_timer.reset()}s") + # Here we mimic the forced fits behaviour, # sources within 3 half BMAJ widths of the image # edges are ignored. The user buffer is also @@ -105,12 +107,15 @@ def get_image_rms_measurements( ) npix = int(round(npix * edge_buffer)) - + + get_rms_timer.reset() coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) + logger.debug("{image} - Time to generate SkyCoord: {get_rms_timer.reset()}s") array_coords = gen_array_coords_from_wcs(coords, wcs) + logger.debug("{image} - Time to generate array_coords: {get_rms_timer.reset()}s") # check for pixel wrapping x_valid = np.logical_or( @@ -130,6 +135,8 @@ def get_image_rms_measurements( valid_indexes = group[valid].index.values group = group.loc[valid_indexes] + + logger.debug("{image} - Time to get valid indices: {get_rms_timer.reset()}s") if group.empty: # early return if all sources failed range check @@ -142,11 +149,14 @@ def get_image_rms_measurements( # Now we also need to check proximity to NaN values # as forced fits may also drop these values + get_rms_timer.reset() coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) + logger.debug("{image} - Time to generate second SkyCoord: {get_rms_timer.reset()}s") array_coords = gen_array_coords_from_wcs(coords, wcs) + logger.debug("{image} - Time to generate second array_coords: {get_rms_timer.reset()}s") acceptable_no_nan_dist = int( round(bmaj.to('arcsec').value / 2. / pixelscale.value) @@ -154,6 +164,7 @@ def get_image_rms_measurements( nan_valid = [] + get_rms_timer.reset() # Get slices of each source and check NaN is not included. for i,j in zip(array_coords[0], array_coords[1]): sl = tuple(( @@ -166,6 +177,8 @@ def get_image_rms_measurements( nan_valid.append(True) valid_indexes = group[nan_valid].index.values + + logger.debug("{image} - Time to get second valid indices: {get_rms_timer.reset()}s") if np.any(nan_valid): # only run if there are actual values to measure @@ -315,6 +328,7 @@ def new_sources( # ['VAST_0127-73A.EPOCH08.I.fits'] | # ----------------------------------+ timer = StopWatch() + debug_timer = StopWatch() logger.info("Starting new source analysis.") @@ -328,6 +342,8 @@ def new_sources( run=p_run ).values(*tuple(cols)) )).set_index('name') + + logger.debug(f"Time to make images_df: {debug_timer.reset()}s") # Get rid of sources that are not 'new', i.e. sources which the # first sky region image is not in the image list @@ -336,6 +352,7 @@ def new_sources( ].drop( columns=['in_primary'] ) + logger.debug(f"Time to make new_sources_df: {debug_timer.reset()}s") # Check if the previous sources would have actually been seen # i.e. are the previous images sensitive enough @@ -368,6 +385,8 @@ def new_sources( 'rms_median': 'img_diff_rms_median', 'noise_path': 'img_diff_rms_path' }) + + logger.debug(f"Time to reset & merge image info into new_sources_df: {debug_timer.reset()}s") # Select only those images that come before the detection image # in time. @@ -381,6 +400,8 @@ def new_sources( left_on=['source', 'detection'], right_on=['source', 'image'], how='left' ).drop(columns=['image']) + + logger.debug(f"Time to merge detection fluxes into new_sources_df: {debug_timer.reset()}s") # calculate the sigma of the source if it was placed in the # minimum rms region of the previous images @@ -393,6 +414,8 @@ def new_sources( new_sources_df = new_sources_df.loc[ new_sources_df['diff_sigma'] >= min_sigma ] + + logger.debug(f"Time to do new_sources_df threshold calcs: {debug_timer.reset()}s") # Now have list of sources that should have been seen before given # previous images minimum rms values. @@ -413,6 +436,7 @@ def new_sources( new_sources_df = parallel_get_rms_measurements( new_sources_df, edge_buffer=edge_buffer ) + logger.debug(f"Time to get rms measurements: {debug_timer.reset()}s") # this removes those that are out of range new_sources_df['img_diff_true_rms'] = ( @@ -443,6 +467,8 @@ def new_sources( # moving forward only the new_high_sigma columns is needed, drop all others. new_sources_df = new_sources_df[['new_high_sigma']] + + logger.debug(f"Time to to do final cleanup steps {debug_timer.reset()}s") logger.info( 'Total new source analysis time: %.2f seconds', timer.reset_init() From 9b41f7b493120770a8979c1f229fc8b14216544b Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 5 Jul 2023 14:09:48 +1000 Subject: [PATCH 08/48] Fixed f-strings --- vast_pipeline/pipeline/new_sources.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 937bf767a..ee3bbaa17 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -89,7 +89,7 @@ def get_image_rms_measurements( wcs = WCS(header, naxis=2) data = hdul[0].data.squeeze() - logger.debug("{image} - Time to load fits: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to load fits: {get_rms_timer.reset()}s") # Here we mimic the forced fits behaviour, # sources within 3 half BMAJ widths of the image @@ -112,10 +112,10 @@ def get_image_rms_measurements( coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) - logger.debug("{image} - Time to generate SkyCoord: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to generate SkyCoord: {get_rms_timer.reset()}s") array_coords = gen_array_coords_from_wcs(coords, wcs) - logger.debug("{image} - Time to generate array_coords: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to generate array_coords: {get_rms_timer.reset()}s") # check for pixel wrapping x_valid = np.logical_or( @@ -136,7 +136,7 @@ def get_image_rms_measurements( group = group.loc[valid_indexes] - logger.debug("{image} - Time to get valid indices: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to get valid indices: {get_rms_timer.reset()}s") if group.empty: # early return if all sources failed range check @@ -153,10 +153,10 @@ def get_image_rms_measurements( coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) - logger.debug("{image} - Time to generate second SkyCoord: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to generate second SkyCoord: {get_rms_timer.reset()}s") array_coords = gen_array_coords_from_wcs(coords, wcs) - logger.debug("{image} - Time to generate second array_coords: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to generate second array_coords: {get_rms_timer.reset()}s") acceptable_no_nan_dist = int( round(bmaj.to('arcsec').value / 2. / pixelscale.value) @@ -178,7 +178,7 @@ def get_image_rms_measurements( valid_indexes = group[nan_valid].index.values - logger.debug("{image} - Time to get second valid indices: {get_rms_timer.reset()}s") + logger.debug(f"{image} - Time to get second valid indices: {get_rms_timer.reset()}s") if np.any(nan_valid): # only run if there are actual values to measure From 63e80c3b3686127861183c61573a5913c88bd620 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 10:18:31 +1000 Subject: [PATCH 09/48] Added association_models_generator debug logging --- vast_pipeline/pipeline/model_generator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vast_pipeline/pipeline/model_generator.py b/vast_pipeline/pipeline/model_generator.py index 310babae0..291786252 100644 --- a/vast_pipeline/pipeline/model_generator.py +++ b/vast_pipeline/pipeline/model_generator.py @@ -87,6 +87,7 @@ def association_models_generator( Returns: An iterable generator object containing the yielded Association objects. """ + self.logger.debug(f"Building {len(assoc_df} association generators") for i, row in assoc_df.iterrows(): yield Association( meas_id=row['id'], @@ -94,6 +95,7 @@ def association_models_generator( d2d=row['d2d'], dr=row['dr'], ) + self.logger.debug(f"Built {len(assoc_df} association generators") def related_models_generator( From 8ab0f67ca6f7a5961fefcfe1cd069a44c7b25b89 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 10:19:11 +1000 Subject: [PATCH 10/48] Drop associations batch size to 100 --- vast_pipeline/pipeline/loading.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index 901837bb5..16f9767bf 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -228,7 +228,9 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: mem_usage = associations_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"associations_df memory usage: {mem_usage}MB") bulk_upload_model( - Association, association_models_generator(associations_df) + Association, + association_models_generator(associations_df), + batch_size=1000 ) From d7f95d1ec39b83da4ff99450e0f76983846292f7 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 13:20:03 +1000 Subject: [PATCH 11/48] Added get_memory_usage function --- vast_pipeline/pipeline/utils.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 9169f25a1..b6726fb42 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -29,6 +29,7 @@ from vast_pipeline.models import ( Band, Image, Run, SkyRegion ) +import psutil logger = logging.getLogger(__name__) @@ -948,6 +949,7 @@ def get_src_skyregion_merged_df( # VAST_0127-73A.EPOCH01.I.fits | True | # ------------------------------+--------------+ logger.info("Creating ideal source coverage df...") + #logger.debug(sources_df.head()) merged_timer = StopWatch() @@ -1707,3 +1709,15 @@ def write_parquets( ) return skyregs_df + +def get_memory_usage(): + """ + This function gets the current memory usage and returns a string. + + Returns: + A string containing the current resource usage. + """ + mem = psutil.virtual_memory()[3] #resource usage in bytes + mem = mem / 1024**3 #resource usage in GB + + return f"Current memory usage: {mem}GB" From ba4f91bff2076865af1295f49407cce4d08cd4af Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 13:22:04 +1000 Subject: [PATCH 12/48] Added get_memory_usage to finalise.py --- vast_pipeline/pipeline/finalise.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/finalise.py b/vast_pipeline/pipeline/finalise.py index 471d42a5b..c253090d6 100644 --- a/vast_pipeline/pipeline/finalise.py +++ b/vast_pipeline/pipeline/finalise.py @@ -14,7 +14,7 @@ update_sources ) from vast_pipeline.pipeline.pairs import calculate_measurement_pair_metrics -from vast_pipeline.pipeline.utils import parallel_groupby +from vast_pipeline.pipeline.utils import parallel_groupby, get_memory_usage logger = logging.getLogger(__name__) @@ -135,6 +135,7 @@ def final_operations( srcs_df = parallel_groupby(sources_df) mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"Initial srcs_df memory: {mem_usage}MB") + logger.debug(get_memory_usage()) logger.info('Groupby-apply time: %.2f seconds', timer.reset()) # add new sources @@ -149,6 +150,7 @@ def final_operations( srcs_df["new_high_sigma"] = srcs_df["new_high_sigma"].fillna(0.0) mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df memory after adding new sources: {mem_usage}MB") + logger.debug(get_memory_usage()) # calculate nearest neighbour srcs_skycoord = SkyCoord( @@ -165,6 +167,7 @@ def final_operations( srcs_df['n_neighbour_dist'] = d2d.deg mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df memory after nearest-neighbour: {mem_usage}MB") + logger.debug(get_memory_usage()) # create measurement pairs, aka 2-epoch metrics if calculate_pairs: @@ -173,6 +176,7 @@ def final_operations( logger.info('Measurement pair metrics time: %.2f seconds', timer.reset()) mem_usage = measurement_pairs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"measurement_pairs_df memory: {mem_usage}MB") + logger.debug(get_memory_usage()) # calculate measurement pair metric aggregates for sources by finding the row indices # of the aggregate max of the abs(m) metric for each flux type. @@ -199,6 +203,7 @@ def final_operations( logger.info("Measurement pair aggregate metrics time: %.2f seconds", timer.reset()) mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df memory after calculate_pairs: {mem_usage}MB") + logger.debug(get_memory_usage()) else: logger.info( "Skipping measurement pair metric calculation as specified in the run configuration." @@ -213,20 +218,24 @@ def final_operations( srcs_df_upload = srcs_df.loc[~src_done_mask].copy() mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df_upload initial memory: {mem_usage}MB") + logger.debug(get_memory_usage()) srcs_df_upload = make_upload_sources(srcs_df_upload, p_run, add_mode) mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df_upload memory after upload: {mem_usage}MB") + logger.debug(get_memory_usage()) # And now update srcs_df_update = srcs_df.loc[src_done_mask].copy() mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df_update memory: {mem_usage}MB") + logger.debug(get_memory_usage()) logger.info( f"Updating {srcs_df_update.shape[0]} sources with new metrics.") srcs_df = update_sources(srcs_df_update, batch_size=1000) mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df_update memory after update: {mem_usage}MB") + logger.debug(get_memory_usage()) # Add back together if not srcs_df_upload.empty: srcs_df = pd.concat([srcs_df, srcs_df_upload]) @@ -235,6 +244,7 @@ def final_operations( mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"srcs_df memory after upload_sources: {mem_usage}MB") + logger.debug(get_memory_usage()) # gather the related df, upload to db and save to parquet file # the df will look like @@ -255,6 +265,7 @@ def final_operations( ) mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"related_df memory: {mem_usage}MB") + logger.debug(get_memory_usage()) # for the column 'from_source_id', replace relation source ids with db id related_df["to_source_id"] = related_df["to_source_id"].map(srcs_df["id"].to_dict()) @@ -262,6 +273,7 @@ def final_operations( related_df = related_df[related_df["from_source_id"] != related_df["to_source_id"]] mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"related_df memory after calcs: {mem_usage}MB") + logger.debug(get_memory_usage()) # write symmetrical relations to parquet related_df.to_parquet( @@ -285,10 +297,12 @@ def final_operations( mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"related_df memory after partitioning: {mem_usage}MB") + logger.debug(get_memory_usage()) make_upload_related_sources(related_df) mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"related_df memory after upload: {mem_usage}MB") + logger.debug(get_memory_usage()) del related_df @@ -306,6 +320,7 @@ def final_operations( ) mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"sources_df memory after srcs_df merge: {mem_usage}MB") + logger.debug(get_memory_usage()) if add_mode: # Load old associations so the already uploaded ones can be removed From 0e6bf3adae7906af5c377a55fa61f5fb5b52fe18 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 13:25:16 +1000 Subject: [PATCH 13/48] Added get_memory_usage to loading.py --- vast_pipeline/pipeline/loading.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index 16f9767bf..b4c34707d 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -18,7 +18,9 @@ Association, Band, Measurement, SkyRegion, Source, RelatedSource, Run, Image ) -from vast_pipeline.pipeline.utils import get_create_img, get_create_img_band +from vast_pipeline.pipeline.utils import ( + get_create_img, get_create_img_band, get_memory_usage +) from vast_pipeline.utils.utils import StopWatch @@ -29,7 +31,8 @@ def bulk_upload_model( djmodel: models.Model, generator: Iterable[Generator[models.Model, None, None]], - batch_size: int=10_000, return_ids: bool=False + batch_size: int=10_000, return_ids: bool=False, + log_mem_usage: False ) -> List[int]: ''' Bulk upload a list of generator objects of django models to db. @@ -52,6 +55,8 @@ def bulk_upload_model( bulk_ids = [] while True: items = list(islice(generator, batch_size)) + if log_mem_usage: + logger.debug(get_memory_usage()) if not items: break out_bulk = djmodel.objects.bulk_create(items) @@ -168,6 +173,7 @@ def make_upload_sources( logger.debug("Uploading sources...") mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"sources_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) # create sources in DB with transaction.atomic(): if (add_mode is False and @@ -209,6 +215,7 @@ def make_upload_related_sources(related_df: pd.DataFrame) -> None: logger.info('Populate "related" field of sources...') mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"related_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) bulk_upload_model(RelatedSource, related_models_generator(related_df)) @@ -227,10 +234,12 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: logger.info('Upload associations...') mem_usage = associations_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"associations_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) bulk_upload_model( Association, association_models_generator(associations_df), - batch_size=1000 + batch_size=100, + log_mem_usage=True ) @@ -249,6 +258,7 @@ def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame: logger.info("Upload measurements...") mem_usage = measurements_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"measurements_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) meas_dj_ids = bulk_upload_model( Measurement, measurement_models_generator(measurements_df), From 1cc2f0e9de5ed955c34d9606dbc956b88b8f83ab Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 13:27:54 +1000 Subject: [PATCH 14/48] Added get_memory_usage to main.py --- vast_pipeline/pipeline/main.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/main.py b/vast_pipeline/pipeline/main.py index d383d7483..d838f9c6a 100644 --- a/vast_pipeline/pipeline/main.py +++ b/vast_pipeline/pipeline/main.py @@ -28,7 +28,8 @@ get_src_skyregion_merged_df, group_skyregions, get_parallel_assoc_image_df, - write_parquets + write_parquets, + get_mem_usage ) from .errors import MaxPipelineRunsError @@ -234,6 +235,7 @@ def process_pipeline(self, p_run: Run) -> None: ) mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"Step 2: sources_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) # Obtain the number of selavy measurements for the run # n_selavy_measurements = sources_df. nr_selavy_measurements = sources_df['id'].unique().shape[0] @@ -288,8 +290,10 @@ def process_pipeline(self, p_run: Run) -> None: ) mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"Step 5: sources_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) del missing_sources_df + logger.debug(get_memory_usage()) # STEP #6: finalise the df getting unique sources, calculating # metrics and upload data to database @@ -303,6 +307,7 @@ def process_pipeline(self, p_run: Run) -> None: done_source_ids, self.previous_parquets ) + logger.debug(get_memory_usage()) # calculate number processed images nr_img_processed = len(images) From b181cd7518b9996fc1ffa47bad45419ac4539776 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 7 Jul 2023 14:48:48 +1000 Subject: [PATCH 15/48] Cleaned things up --- vast_pipeline/pipeline/loading.py | 5 +++-- vast_pipeline/pipeline/main.py | 2 +- vast_pipeline/pipeline/model_generator.py | 4 ++-- vast_pipeline/pipeline/utils.py | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index b4c34707d..3b9adc486 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -31,8 +31,9 @@ def bulk_upload_model( djmodel: models.Model, generator: Iterable[Generator[models.Model, None, None]], - batch_size: int=10_000, return_ids: bool=False, - log_mem_usage: False + batch_size: int=10_000, + return_ids: bool=False, + log_mem_usage: bool=False, ) -> List[int]: ''' Bulk upload a list of generator objects of django models to db. diff --git a/vast_pipeline/pipeline/main.py b/vast_pipeline/pipeline/main.py index d838f9c6a..1abf16069 100644 --- a/vast_pipeline/pipeline/main.py +++ b/vast_pipeline/pipeline/main.py @@ -29,7 +29,7 @@ group_skyregions, get_parallel_assoc_image_df, write_parquets, - get_mem_usage + get_memory_usage ) from .errors import MaxPipelineRunsError diff --git a/vast_pipeline/pipeline/model_generator.py b/vast_pipeline/pipeline/model_generator.py index 291786252..c81da32ba 100644 --- a/vast_pipeline/pipeline/model_generator.py +++ b/vast_pipeline/pipeline/model_generator.py @@ -87,7 +87,7 @@ def association_models_generator( Returns: An iterable generator object containing the yielded Association objects. """ - self.logger.debug(f"Building {len(assoc_df} association generators") + logger.debug(f"Building {len(assoc_df)} association generators") for i, row in assoc_df.iterrows(): yield Association( meas_id=row['id'], @@ -95,7 +95,7 @@ def association_models_generator( d2d=row['d2d'], dr=row['dr'], ) - self.logger.debug(f"Built {len(assoc_df} association generators") + logger.debug(f"Built {len(assoc_df)} association generators") def related_models_generator( diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index b6726fb42..e5a3354a2 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -1720,4 +1720,4 @@ def get_memory_usage(): mem = psutil.virtual_memory()[3] #resource usage in bytes mem = mem / 1024**3 #resource usage in GB - return f"Current memory usage: {mem}GB" + return f"Current memory usage: {mem:.3f}GB" From 43d9d2f3ce1e55bd6635929916ab14f757afdb8b Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 13 Jul 2023 13:52:17 +1000 Subject: [PATCH 16/48] Hardcode n_cpu --- vast_pipeline/pipeline/forced_extraction.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 305a7c208..da3390f96 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -359,7 +359,8 @@ def image_data_func(image_name: str) -> Dict[str, Any]: ) del col_to_drop - n_cpu = cpu_count() - 1 + #n_cpu = cpu_count() - 1 # this doesn't work because cpu_count returns the number of CPUs in the machine, not the container. + n_cpu = 6 bags = db.from_sequence(list_to_map, npartitions=len(list_to_map)) forced_dfs = ( bags.map(lambda x: extract_from_image( From 9c30c4380c23873a2036fa8f8d1887bfd96914bf Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Sun, 30 Jul 2023 12:54:35 +1000 Subject: [PATCH 17/48] Shorten forced fit measurment name --- vast_pipeline/pipeline/forced_extraction.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index da3390f96..98094126b 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -208,7 +208,7 @@ def finalise_forced_dfs( df['component_id'] = df['island_id'].str.replace( 'island', 'component' ) + 'a' - img_prefix = image.split('.')[0] + '_' + img_prefix = ""#image.split('.')[0] + '_' df['name'] = img_prefix + df['component_id'] # assign all the other columns # convert fluxes to mJy @@ -579,7 +579,7 @@ def forced_extraction( ) # make measurement names unique for db constraint - extr_df['name'] = extr_df['name'] + f'_f_run{p_run.id:06d}' + extr_df['name'] = extr_df['name'] + f'_f_run{p_run.id:03d}' # select sensible flux values and set the columns with fix values values = { From 029418ebf4242b66721c34247ac0bc829cbdcae4 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Sun, 30 Jul 2023 13:03:37 +1000 Subject: [PATCH 18/48] Add some further logging --- vast_pipeline/pipeline/forced_extraction.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 98094126b..f0ee8edae 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -647,6 +647,16 @@ def forced_extraction( extr_df = extr_df[col_order + remaining] # upload the measurements, a column 'id' is returned with the DB id + long_names = extr_df.loc[extr_df['name'].str.len() > 63] + long_comps = extr_df.loc[extr_df['component_id'].str.len() > 63] + long_isls = extr_df.loc[extr_df['island_id'].str.len() > 63] + + logger.debug("Entries with long names:") + logger.debug(long_names) + logger.debug("Entries with long component ids:") + logger.debug(long_comps) + logger.debug("Entries with long island ids:") + logger.debug(long_isls) extr_df = make_upload_measurements(extr_df) extr_df = extr_df.rename(columns={'source_tmp_id': 'source'}) From 2cf9970a5130b2e41ace5b593f22b9abc600f22d Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Sun, 30 Jul 2023 13:46:08 +1000 Subject: [PATCH 19/48] Only log if long --- vast_pipeline/pipeline/forced_extraction.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index f0ee8edae..6887620fb 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -651,12 +651,15 @@ def forced_extraction( long_comps = extr_df.loc[extr_df['component_id'].str.len() > 63] long_isls = extr_df.loc[extr_df['island_id'].str.len() > 63] - logger.debug("Entries with long names:") - logger.debug(long_names) - logger.debug("Entries with long component ids:") - logger.debug(long_comps) - logger.debug("Entries with long island ids:") - logger.debug(long_isls) + if len(long_names) > 0: + logger.debug("Entries with long names:") + logger.debug(long_names) + if len(long_comps) > 0: + logger.debug("Entries with long component ids:") + logger.debug(long_comps) + if len(long_isls) > 0: + logger.debug("Entries with long island ids:") + logger.debug(long_isls) extr_df = make_upload_measurements(extr_df) extr_df = extr_df.rename(columns={'source_tmp_id': 'source'}) From e95351f526b2eb88a9cc61856c33789ee3d7a5c5 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 14 Dec 2023 13:45:34 +1100 Subject: [PATCH 20/48] Fixed indent --- vast_pipeline/pipeline/new_sources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 97efdf338..55d54288c 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -84,7 +84,7 @@ def get_image_rms_measurements( return group image = group.iloc[0]['img_diff_rms_path'] get_rms_timer = StopWatch() - with open_fits(image) as hdul: + with open_fits(image) as hdul: header = hdul[0].header wcs = WCS(header, naxis=2) data = hdul[0].data.squeeze() From ea30f62621428662ec4e98ea5dcf2f69491c467d Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 14 Dec 2023 17:44:35 +1100 Subject: [PATCH 21/48] Fix syntax error --- vast_pipeline/pipeline/forced_extraction.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 16c4cf933..fd9b10911 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -166,6 +166,7 @@ def extract_from_image( Dictionary with input dataframe with added columns (flux_int, flux_int_err, chi_squared_fit) and image name. """ + timer = StopWatch() # create the skycoord obj to pass to the forced extraction # see usage https://github.com/dlakaplan/forced_phot P_islands = SkyCoord( From cb22d332a63aa1c334339e28b5518f58f74b9570 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 6 Feb 2024 16:12:12 +1100 Subject: [PATCH 22/48] Revert association size to 1e5 --- vast_pipeline/pipeline/loading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index 3b9adc486..c8aa692f1 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -239,7 +239,7 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: bulk_upload_model( Association, association_models_generator(associations_df), - batch_size=100, + batch_size=10000, log_mem_usage=True ) From e77d538c5b43a16eb72d6a299c9d8862eb59f9e6 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 13 Feb 2024 17:47:25 +1100 Subject: [PATCH 23/48] Temporarily hardcode n_cpu=10 --- vast_pipeline/pipeline/association.py | 64 ++++++++++++++++++++- vast_pipeline/pipeline/forced_extraction.py | 4 +- vast_pipeline/pipeline/new_sources.py | 2 +- vast_pipeline/pipeline/pairs.py | 2 +- vast_pipeline/pipeline/utils.py | 4 +- 5 files changed, 67 insertions(+), 9 deletions(-) diff --git a/vast_pipeline/pipeline/association.py b/vast_pipeline/pipeline/association.py index b66940ce7..565c26ae7 100644 --- a/vast_pipeline/pipeline/association.py +++ b/vast_pipeline/pipeline/association.py @@ -935,6 +935,14 @@ def advanced_association( ''' # read the needed sources fields # Step 1: get matches within semimajor axis of image. + logger.debug(f"bw_max: {bw_max}") + logger.debug(f"skyc2: {skyc2}") + logger.debug(f"skyc1: {skyc1}") + logger.debug(f"skyc1 RA NaN: {np.isnan(skyc1.ra.deg).sum()}") + logger.debug(f"skyc1 Dec NaN: {np.isnan(skyc1.dec.deg).sum()}") + logger.debug(f"skyc2 RA NaN: {np.isnan(skyc2.ra.deg).sum()}") + logger.debug(f"skyc2 Dec NaN: {np.isnan(skyc2.dec.deg).sum()}") + idx_skyc1, idx_skyc2, d2d, d3d = skyc2.search_around_sky( skyc1, bw_max ) @@ -1221,6 +1229,13 @@ def association( skyc1_srcs['dec'].values, unit=(u.deg, u.deg) ) + + logger.debug(skyc1_srcs) + logger.debug(skyc1_srcs.query('ra.isnull() | dec.isnull()', engine='python')) + logger.debug(skyc1_srcs.query('ra.isnull()', engine='python')) + logger.debug(f"skyc1: {skyc1}") + logger.debug(f"skyc1: {skyc1[np.isnan(skyc1.ra.deg)]}") + logger.debug("\n\n\n\n") for it, epoch in enumerate(unique_epochs[start_epoch:]): logger.info('Association iteration: #%i%s', it + 1, skyreg_tag) @@ -1245,6 +1260,16 @@ def association( skyc2_srcs['dec'].values, unit=(u.deg, u.deg) ) + + logger.debug("skyc2_srcs:") + logger.debug(skyc2_srcs) + logger.debug(skyc2_srcs.query('ra.isnull() | dec.isnull()', engine='python')) + logger.debug(f"skyc2: {skyc2}") + + logger.debug("skyc1_srcs:") + logger.debug(skyc1_srcs) + logger.debug(skyc1_srcs.query('ra.isnull() | dec.isnull()', engine='python')) + logger.debug(f"skyc1: {skyc1}") if method == 'basic': sources_df, skyc1_srcs = basic_association( @@ -1298,7 +1323,19 @@ def association( ) sources_df = sources_df.drop(['ra_wrap'], axis=1) - + + logger.debug(f"weight_ew NaN counts: {sources_df['weight_ew'].isna().sum()}") + logger.debug(f"weight_ew inf counts: {np.isinf(sources_df['weight_ew']).sum()}") + logger.debug(f"weight_ns NaN counts: {sources_df['weight_ns'].isna().sum()}") + logger.debug(f"interim_ew NaN counts: {sources_df['interim_ew'].isna().sum()}") + logger.debug(f"interim_ew inf counts: {np.isinf(sources_df['interim_ew']).sum()}") + logger.debug(f"interim_ns NaN counts: {sources_df['interim_ns'].isna().sum()}") + logger.debug(f"interim_ns inf counts: {np.isinf(sources_df['interim_ns']).sum()}") + logger.debug(f"weight_ew zero counts: {(sources_df['weight_ew'] == 0.0).sum()}") + logger.debug(f"weight_ns zero counts: {(sources_df['weight_ns'] == 0.0).sum()}") + + logger.debug(f"uncertainty ew: {sources_df['uncertainty_ew']}") + tmp_srcs_df = ( sources_df.loc[ (sources_df['source'] != -1) & (sources_df['forced'] == False), @@ -1319,6 +1356,9 @@ def association( wm_dec = tmp_srcs_df['interim_ns'].sum() / tmp_srcs_df['weight_ns'].sum() wm_uncertainty_ns = 1. / np.sqrt(tmp_srcs_df['weight_ns'].sum()) + logger.debug(f"wm_ra NaN counts: {wm_ra.isna().sum()}") + logger.debug(f"wm_dec NaN counts: {wm_dec.isna().sum()}") + weighted_df = ( pd.concat( [wm_ra, wm_uncertainty_ew, wm_dec, wm_uncertainty_ns], @@ -1334,7 +1374,21 @@ def association( 'weight_ns': 'uncertainty_ns' }) ) - + nan_indices = weighted_df.query("ra.isnull()", engine='python').index + nan_sources = weighted_df.iloc[nan_indices].source.values + + logger.debug("Temp sources column info:") + for source, group in tmp_srcs_df: + if source in nan_sources: + logger.debug(source) + + for column in group.columns: + logger.debug(group[column]) + + logger.debug("Weighted df column info:") + for column in weighted_df.columns: + logger.debug(weighted_df.iloc[nan_indices][column]) + # correct the RA wrapping ra_wrap_mask = weighted_df.ra >= 360. weighted_df.loc[ @@ -1368,6 +1422,10 @@ def association( 'uncertainty_ns_skyc2' ], axis=1 ) + + logger.debug("skyc1_srcs column info:") + for column in skyc1_srcs.columns: + logger.debug(skyc1_srcs.iloc[nan_indices][column]) # generate new sky coord ready for next iteration skyc1 = SkyCoord( @@ -1587,7 +1645,7 @@ def parallel_association( # getting duplicates in the result laater id_incr_par_assoc = max(done_source_ids) if add_mode else 0 - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu # pass each skyreg_group through the normal association process. results = ( diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index fd9b10911..5fcb306fe 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -395,7 +395,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]: del col_to_drop #n_cpu = cpu_count() - 1 # this doesn't work because cpu_count returns the number of CPUs in the machine, not the container. - n_cpu = 6 + n_cpu = 10 bags = db.from_sequence(list_to_map, npartitions=len(list_to_map)) forced_dfs = ( bags.map(lambda x: extract_from_image( @@ -482,7 +482,7 @@ def get_fname(n): return os.path.join( 'forced_measurements_' + n.replace('.', '_') + '.parquet' ) dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images)) - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu # writing parquets using Dask bag bags = db.from_sequence(dfs) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 55d54288c..f26c67ea4 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -231,7 +231,7 @@ def parallel_get_rms_measurements( 'img_diff_true_rms': 'f', } - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu out = ( dd.from_pandas(out, n_cpu) diff --git a/vast_pipeline/pipeline/pairs.py b/vast_pipeline/pipeline/pairs.py index 470cbab3b..24ca76a97 100644 --- a/vast_pipeline/pipeline/pairs.py +++ b/vast_pipeline/pipeline/pairs.py @@ -64,7 +64,7 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame: vs_peak, vs_int - variability t-statistic m_peak, m_int - variability modulation index """ - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu """Create a DataFrame containing all measurement ID combinations per source. Resultant DataFrame will have a MultiIndex(["source", RangeIndex]) where "source" is diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 52d600f57..e2116d480 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -704,7 +704,7 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: 'eta_peak': 'f', 'related_list': 'O' } - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu out = dd.from_pandas(df, n_cpu) out = ( out.groupby('source') @@ -763,7 +763,7 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame: 'wavg_ra': 'f', 'wavg_dec': 'f', } - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu out = dd.from_pandas(df, n_cpu) out = ( out.groupby('source') From 72d1aeb07f6c5f4b8ec84111a71329c7ed9d3cfb Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 14 Feb 2024 10:19:44 +1100 Subject: [PATCH 24/48] Added some more logging --- vast_pipeline/pipeline/finalise.py | 6 +++++- vast_pipeline/pipeline/utils.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/finalise.py b/vast_pipeline/pipeline/finalise.py index c253090d6..9a1014801 100644 --- a/vast_pipeline/pipeline/finalise.py +++ b/vast_pipeline/pipeline/finalise.py @@ -132,11 +132,15 @@ def final_operations( 'Calculating statistics for %i sources...', sources_df.source.unique().shape[0] ) + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"sources_df memory: {mem_usage}MB") + srcs_df = parallel_groupby(sources_df) + + logger.info('Groupby-apply time: %.2f seconds', timer.reset()) mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"Initial srcs_df memory: {mem_usage}MB") logger.debug(get_memory_usage()) - logger.info('Groupby-apply time: %.2f seconds', timer.reset()) # add new sources srcs_df["new"] = srcs_df.index.isin(new_sources_df.index) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index e2116d480..02901e59d 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -705,6 +705,7 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: 'related_list': 'O' } n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu + logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") out = dd.from_pandas(df, n_cpu) out = ( out.groupby('source') From b3e217b836ed481a5438853e054c3b70b3df9b73 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 14 Feb 2024 10:36:36 +1100 Subject: [PATCH 25/48] Attempt to limit memory usage --- vast_pipeline/pipeline/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 02901e59d..49d3130cc 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -705,6 +705,9 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: 'related_list': 'O' } n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu + from dask.distributed import Client + client = Client(n_workers=n_cpu, memory_limit="5GB") + logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") out = dd.from_pandas(df, n_cpu) out = ( From 41bd24086b0d533bfa8c47e2203f501da88227d5 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 14 Feb 2024 10:37:19 +1100 Subject: [PATCH 26/48] Attempt to limit memory usage --- vast_pipeline/pipeline/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 49d3130cc..22f1867bf 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -706,7 +706,7 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: } n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu from dask.distributed import Client - client = Client(n_workers=n_cpu, memory_limit="5GB") + client = Client(n_workers=n_cpu, memory_limit="3GB") logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") out = dd.from_pandas(df, n_cpu) From 83b60cfcae2cc443465938c14ee74532d89e1c24 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 15 Feb 2024 09:30:55 +1100 Subject: [PATCH 27/48] Update parallel_grouppby to use chunksize --- vast_pipeline/pipeline/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 22f1867bf..5db7b8dfb 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -705,11 +705,12 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: 'related_list': 'O' } n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu - from dask.distributed import Client - client = Client(n_workers=n_cpu, memory_limit="3GB") + #from dask.distributed import Client + #client = Client(n_workers=n_cpu, memory_limit="3GB") + chunksize=1e5 logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") - out = dd.from_pandas(df, n_cpu) + out = dd.from_pandas(df, chunksize=chunksize) out = ( out.groupby('source') .apply( From 9102656325e5f1ed5882841990f8d5692e20b52a Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 15 Feb 2024 22:01:43 +1100 Subject: [PATCH 28/48] Correctly set chunk size --- vast_pipeline/pipeline/utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 5db7b8dfb..c25a1ffce 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -707,10 +707,13 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu #from dask.distributed import Client #client = Client(n_workers=n_cpu, memory_limit="3GB") - chunksize=1e5 + chunksize=10000 logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") + logger.debug(f"Running parallel_groupby with {chunksize} chunks....") out = dd.from_pandas(df, chunksize=chunksize) + mem_per_partition = out.memory_usage_per_partition(index=True, deep=True) + logger.debug(f"Memory per partition: {mem_per_partition/(1024**2)}MB") out = ( out.groupby('source') .apply( From 19741bad44467f63ba5a9e7470151e53870b3495 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 16 Feb 2024 14:16:41 +1100 Subject: [PATCH 29/48] Remove superfluous logging --- vast_pipeline/image/main.py | 8 +++- vast_pipeline/pipeline/association.py | 58 +-------------------------- 2 files changed, 8 insertions(+), 58 deletions(-) diff --git a/vast_pipeline/image/main.py b/vast_pipeline/image/main.py index 2a8edaf5b..24a65ee1f 100644 --- a/vast_pipeline/image/main.py +++ b/vast_pipeline/image/main.py @@ -436,6 +436,10 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: df['ew_sys_err'] = self.config["ra_uncertainty"] / 3600. df['ns_sys_err'] = self.config["dec_uncertainty"] / 3600. + + logger.debug(df['ra_err']) + logger.debug(df['ra_err'].mean()) + df['error_radius'] = calc_error_radius( df['ra'].values, df['ra_err'].values, @@ -446,6 +450,9 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: df['uncertainty_ew'] = np.hypot( df['ew_sys_err'].values, df['error_radius'].values ) + + logger.debug(df['uncertainty_ew']) + logger.debug(df['uncertainty_ew'].mean()) df['uncertainty_ns'] = np.hypot( df['ns_sys_err'].values, df['error_radius'].values @@ -454,7 +461,6 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: # weight calculations to use later df['weight_ew'] = 1. / df['uncertainty_ew'].values**2 df['weight_ns'] = 1. / df['uncertainty_ns'].values**2 - logger.debug('Positional errors done.') # Initialise the forced column as False diff --git a/vast_pipeline/pipeline/association.py b/vast_pipeline/pipeline/association.py index 565c26ae7..8db3c2c38 100644 --- a/vast_pipeline/pipeline/association.py +++ b/vast_pipeline/pipeline/association.py @@ -934,15 +934,7 @@ def advanced_association( association. ''' # read the needed sources fields - # Step 1: get matches within semimajor axis of image. - logger.debug(f"bw_max: {bw_max}") - logger.debug(f"skyc2: {skyc2}") - logger.debug(f"skyc1: {skyc1}") - logger.debug(f"skyc1 RA NaN: {np.isnan(skyc1.ra.deg).sum()}") - logger.debug(f"skyc1 Dec NaN: {np.isnan(skyc1.dec.deg).sum()}") - logger.debug(f"skyc2 RA NaN: {np.isnan(skyc2.ra.deg).sum()}") - logger.debug(f"skyc2 Dec NaN: {np.isnan(skyc2.dec.deg).sum()}") - + # Step 1: get matches within semimajor axis of image. idx_skyc1, idx_skyc2, d2d, d3d = skyc2.search_around_sky( skyc1, bw_max ) @@ -1229,13 +1221,6 @@ def association( skyc1_srcs['dec'].values, unit=(u.deg, u.deg) ) - - logger.debug(skyc1_srcs) - logger.debug(skyc1_srcs.query('ra.isnull() | dec.isnull()', engine='python')) - logger.debug(skyc1_srcs.query('ra.isnull()', engine='python')) - logger.debug(f"skyc1: {skyc1}") - logger.debug(f"skyc1: {skyc1[np.isnan(skyc1.ra.deg)]}") - logger.debug("\n\n\n\n") for it, epoch in enumerate(unique_epochs[start_epoch:]): logger.info('Association iteration: #%i%s', it + 1, skyreg_tag) @@ -1260,16 +1245,6 @@ def association( skyc2_srcs['dec'].values, unit=(u.deg, u.deg) ) - - logger.debug("skyc2_srcs:") - logger.debug(skyc2_srcs) - logger.debug(skyc2_srcs.query('ra.isnull() | dec.isnull()', engine='python')) - logger.debug(f"skyc2: {skyc2}") - - logger.debug("skyc1_srcs:") - logger.debug(skyc1_srcs) - logger.debug(skyc1_srcs.query('ra.isnull() | dec.isnull()', engine='python')) - logger.debug(f"skyc1: {skyc1}") if method == 'basic': sources_df, skyc1_srcs = basic_association( @@ -1324,18 +1299,6 @@ def association( sources_df = sources_df.drop(['ra_wrap'], axis=1) - logger.debug(f"weight_ew NaN counts: {sources_df['weight_ew'].isna().sum()}") - logger.debug(f"weight_ew inf counts: {np.isinf(sources_df['weight_ew']).sum()}") - logger.debug(f"weight_ns NaN counts: {sources_df['weight_ns'].isna().sum()}") - logger.debug(f"interim_ew NaN counts: {sources_df['interim_ew'].isna().sum()}") - logger.debug(f"interim_ew inf counts: {np.isinf(sources_df['interim_ew']).sum()}") - logger.debug(f"interim_ns NaN counts: {sources_df['interim_ns'].isna().sum()}") - logger.debug(f"interim_ns inf counts: {np.isinf(sources_df['interim_ns']).sum()}") - logger.debug(f"weight_ew zero counts: {(sources_df['weight_ew'] == 0.0).sum()}") - logger.debug(f"weight_ns zero counts: {(sources_df['weight_ns'] == 0.0).sum()}") - - logger.debug(f"uncertainty ew: {sources_df['uncertainty_ew']}") - tmp_srcs_df = ( sources_df.loc[ (sources_df['source'] != -1) & (sources_df['forced'] == False), @@ -1356,9 +1319,6 @@ def association( wm_dec = tmp_srcs_df['interim_ns'].sum() / tmp_srcs_df['weight_ns'].sum() wm_uncertainty_ns = 1. / np.sqrt(tmp_srcs_df['weight_ns'].sum()) - logger.debug(f"wm_ra NaN counts: {wm_ra.isna().sum()}") - logger.debug(f"wm_dec NaN counts: {wm_dec.isna().sum()}") - weighted_df = ( pd.concat( [wm_ra, wm_uncertainty_ew, wm_dec, wm_uncertainty_ns], @@ -1376,18 +1336,6 @@ def association( ) nan_indices = weighted_df.query("ra.isnull()", engine='python').index nan_sources = weighted_df.iloc[nan_indices].source.values - - logger.debug("Temp sources column info:") - for source, group in tmp_srcs_df: - if source in nan_sources: - logger.debug(source) - - for column in group.columns: - logger.debug(group[column]) - - logger.debug("Weighted df column info:") - for column in weighted_df.columns: - logger.debug(weighted_df.iloc[nan_indices][column]) # correct the RA wrapping ra_wrap_mask = weighted_df.ra >= 360. @@ -1422,10 +1370,6 @@ def association( 'uncertainty_ns_skyc2' ], axis=1 ) - - logger.debug("skyc1_srcs column info:") - for column in skyc1_srcs.columns: - logger.debug(skyc1_srcs.iloc[nan_indices][column]) # generate new sky coord ready for next iteration skyc1 = SkyCoord( From 159816940e7afe745bbd4d39e9c808ca8e8f727e Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Fri, 16 Feb 2024 14:45:17 +1100 Subject: [PATCH 30/48] Further dask groupby fixes --- vast_pipeline/pipeline/utils.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index c25a1ffce..a522dc039 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -707,13 +707,18 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame: n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu #from dask.distributed import Client #client = Client(n_workers=n_cpu, memory_limit="3GB") - chunksize=10000 + #chunksize=10000 + partition_size_mb=100 + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + npartitions = int(np.ceil(mem_usage_mb/partition_size_mb)) + if npartitions < n_cpu: + npartitions=n_cpu logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") - logger.debug(f"Running parallel_groupby with {chunksize} chunks....") - out = dd.from_pandas(df, chunksize=chunksize) - mem_per_partition = out.memory_usage_per_partition(index=True, deep=True) - logger.debug(f"Memory per partition: {mem_per_partition/(1024**2)}MB") + logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") + + out = dd.from_pandas(df.set_index('source'), npartitions=npartitions) + out = ( out.groupby('source') .apply( From a3e41351865c13bb48e6dd79a7431bb2bf3dca41 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 11:35:00 +1100 Subject: [PATCH 31/48] Added django.db.reset_queries --- vast_pipeline/pipeline/loading.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index c8aa692f1..8ce4983e4 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -5,7 +5,7 @@ from typing import List, Optional, Dict, Tuple, Generator, Iterable from itertools import islice -from django.db import transaction, connection, models +from django.db import transaction, connection, models, reset_queries from vast_pipeline.image.main import SelavyImage from vast_pipeline.pipeline.model_generator import ( @@ -53,6 +53,8 @@ def bulk_upload_model( None or a list of the database IDs of the uploaded objects. ''' + reset_queries() + bulk_ids = [] while True: items = list(islice(generator, batch_size)) From f3fb2b874651551125345db93c33835c9d5cd117 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 11:38:51 +1100 Subject: [PATCH 32/48] Added chunking for associations upload --- vast_pipeline/pipeline/loading.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index 8ce4983e4..bc6c0fcf5 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -238,12 +238,15 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: mem_usage = associations_df.memory_usage(deep=True).sum() / 1e6 logger.debug(f"associations_df memory usage: {mem_usage}MB") logger.debug(get_memory_usage()) - bulk_upload_model( - Association, - association_models_generator(associations_df), - batch_size=10000, - log_mem_usage=True - ) + + assoc_chunk_size = 100000 + for i in range(0,len(df),assoc_chunk_size) + bulk_upload_model( + Association, + association_models_generator(associations_df[i:i+assoc_chunk_size]), + batch_size=10000, + log_mem_usage=True + ) def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame: From 868d518f117c44b592be5175ac60af6fb05830d7 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 13:14:05 +1100 Subject: [PATCH 33/48] Fixes --- vast_pipeline/pipeline/loading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index bc6c0fcf5..eb52e178b 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -240,7 +240,7 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: logger.debug(get_memory_usage()) assoc_chunk_size = 100000 - for i in range(0,len(df),assoc_chunk_size) + for i in range(0,len(df),assoc_chunk_size): bulk_upload_model( Association, association_models_generator(associations_df[i:i+assoc_chunk_size]), From c06b2e52b3c9b160854a501bf90228ac5a3cc463 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 15:20:47 +1100 Subject: [PATCH 34/48] calculate measurement pairs with sensible partitions --- vast_pipeline/pipeline/pairs.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/pairs.py b/vast_pipeline/pipeline/pairs.py index 24ca76a97..f5bd69e7f 100644 --- a/vast_pipeline/pipeline/pairs.py +++ b/vast_pipeline/pipeline/pairs.py @@ -65,6 +65,17 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame: m_peak, m_int - variability modulation index """ n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu + + partition_size_mb=100 + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + npartitions = int(np.ceil(mem_usage_mb/partition_size_mb)) + + if npartitions < n_cpu: + npartitions=n_cpu + logger.debug(f"Running calculate_measurement_pair_metrics with {n_cpu} CPUs....") + logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") + + out = dd.from_pandas(df.set_index('source'), npartitions=npartitions) """Create a DataFrame containing all measurement ID combinations per source. Resultant DataFrame will have a MultiIndex(["source", RangeIndex]) where "source" is @@ -85,8 +96,10 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame: 2 12929 21994 11128 0 6216 23534 """ + + measurement_combinations = ( - dd.from_pandas(df, n_cpu) + dd.from_pandas(df.set_index("source"), npartitions=npartitions) .groupby("source")["id"] .apply( lambda x: pd.DataFrame(list(combinations(x, 2))), meta={0: "i", 1: "i"},) From 55b8a886d9475f6e9617ba63e2c1e002c9a67eef Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 15:22:19 +1100 Subject: [PATCH 35/48] parallel_groupby_coord with sensible partitions --- vast_pipeline/pipeline/utils.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index a522dc039..ff41e394f 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -777,7 +777,16 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame: 'wavg_dec': 'f', } n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu - out = dd.from_pandas(df, n_cpu) + partition_size_mb=100 + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + npartitions = int(np.ceil(mem_usage_mb/partition_size_mb)) + + if npartitions < n_cpu: + npartitions=n_cpu + logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") + logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") + + out = dd.from_pandas(df.set_index('source'), npartitions=npartitions) out = ( out.groupby('source') .apply(calc_ave_coord, meta=col_dtype) From 6b0455ca11e33925916707db89f945f778b45bb0 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 15:22:31 +1100 Subject: [PATCH 36/48] parallel_groupby_coord with sensible partitions --- vast_pipeline/pipeline/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index ff41e394f..5cae2e22a 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -783,7 +783,7 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame: if npartitions < n_cpu: npartitions=n_cpu - logger.debug(f"Running parallel_groupby with {n_cpu} CPUs....") + logger.debug(f"Running parallel_groupby_coord with {n_cpu} CPUs....") logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") out = dd.from_pandas(df.set_index('source'), npartitions=npartitions) From 75d2fb02cc679211fb33762105abb4e5c0caae8a Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 15:24:30 +1100 Subject: [PATCH 37/48] parallel_association with sensible partitions --- vast_pipeline/pipeline/association.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/association.py b/vast_pipeline/pipeline/association.py index 8db3c2c38..448968847 100644 --- a/vast_pipeline/pipeline/association.py +++ b/vast_pipeline/pipeline/association.py @@ -1590,10 +1590,18 @@ def parallel_association( id_incr_par_assoc = max(done_source_ids) if add_mode else 0 n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu + partition_size_mb=100 + mem_usage_mb = images_df.memory_usage(deep=True).sum() / 1e6 + npartitions = int(np.ceil(mem_usage_mb/partition_size_mb)) + + if npartitions < n_cpu: + npartitions=n_cpu + logger.debug(f"Running parallel_association with {n_cpu} CPUs....") + logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") # pass each skyreg_group through the normal association process. results = ( - dd.from_pandas(images_df, n_cpu) + dd.from_pandas(images_df.set_index('skyreg_group'), npartitions=npartitions) .groupby('skyreg_group') .apply( association, From 9da2404472baf5aff2ce4b3cd96c31a847b7465e Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 20 Feb 2024 15:26:14 +1100 Subject: [PATCH 38/48] parallel_get_rms_measurements with sensible partitions --- vast_pipeline/pipeline/new_sources.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index f26c67ea4..41234c853 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -232,9 +232,17 @@ def parallel_get_rms_measurements( } n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu - + partition_size_mb=100 + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + npartitions = int(np.ceil(mem_usage_mb/partition_size_mb)) + + if npartitions < n_cpu: + npartitions=n_cpu + logger.debug(f"Running parallel_get_rms_measurements with {n_cpu} CPUs....") + logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") + out = ( - dd.from_pandas(out, n_cpu) + dd.from_pandas(out.set_index('img_diff_rms_path'), npartitions=npartitions) .groupby('img_diff_rms_path') .apply( get_image_rms_measurements, From 1af0a9f726073a27ceacb5395b2d78c53f0ae953 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 21 Feb 2024 14:57:18 +1100 Subject: [PATCH 39/48] Fixed new_sources issue --- vast_pipeline/pipeline/new_sources.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 41234c853..1892c85b4 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -240,9 +240,11 @@ def parallel_get_rms_measurements( npartitions=n_cpu logger.debug(f"Running parallel_get_rms_measurements with {n_cpu} CPUs....") logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") - + #out = out.set_index('img_diff_rms_path') + #logger.debug(out) + #logger.debug(out['img_diff_rms_path']) out = ( - dd.from_pandas(out.set_index('img_diff_rms_path'), npartitions=npartitions) + dd.from_pandas(out, npartitions=npartitions) .groupby('img_diff_rms_path') .apply( get_image_rms_measurements, From 7d93b31658a1b349b9bfa9583784135c11ae1592 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Sun, 25 Feb 2024 22:10:28 +1100 Subject: [PATCH 40/48] commit all changes --- vast_pipeline/pipeline/loading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index eb52e178b..d51720577 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -240,7 +240,7 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: logger.debug(get_memory_usage()) assoc_chunk_size = 100000 - for i in range(0,len(df),assoc_chunk_size): + for i in range(0,len(associations_df),assoc_chunk_size): bulk_upload_model( Association, association_models_generator(associations_df[i:i+assoc_chunk_size]), From 40283e1f6365f8e2b0b32aa7abc7b63c1e776062 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 22 May 2024 17:14:39 +1000 Subject: [PATCH 41/48] Remove unnecessary debug logging statements --- vast_pipeline/pipeline/new_sources.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 1892c85b4..5490240d9 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -112,10 +112,7 @@ def get_image_rms_measurements( coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) - logger.debug(f"{image} - Time to generate SkyCoord: {get_rms_timer.reset()}s") - array_coords = gen_array_coords_from_wcs(coords, wcs) - logger.debug(f"{image} - Time to generate array_coords: {get_rms_timer.reset()}s") # check for pixel wrapping x_valid = np.logical_or( @@ -135,8 +132,6 @@ def get_image_rms_measurements( valid_indexes = group[valid].index.values group = group.loc[valid_indexes] - - logger.debug(f"{image} - Time to get valid indices: {get_rms_timer.reset()}s") if group.empty: # early return if all sources failed range check @@ -153,10 +148,8 @@ def get_image_rms_measurements( coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) - logger.debug(f"{image} - Time to generate second SkyCoord: {get_rms_timer.reset()}s") array_coords = gen_array_coords_from_wcs(coords, wcs) - logger.debug(f"{image} - Time to generate second array_coords: {get_rms_timer.reset()}s") acceptable_no_nan_dist = int( round(bmaj.to('arcsec').value / 2. / pixelscale.value) @@ -177,8 +170,6 @@ def get_image_rms_measurements( nan_valid.append(True) valid_indexes = group[nan_valid].index.values - - logger.debug(f"{image} - Time to get second valid indices: {get_rms_timer.reset()}s") if np.any(nan_valid): # only run if there are actual values to measure From 2355d9ee372c34f6649f10f276d9bf11e04cd6f2 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Wed, 22 May 2024 20:00:35 +1000 Subject: [PATCH 42/48] Remove superfluous debug statements --- vast_pipeline/image/main.py | 10 +--------- vast_pipeline/pipeline/forced_extraction.py | 9 --------- vast_pipeline/pipeline/new_sources.py | 5 +---- 3 files changed, 2 insertions(+), 22 deletions(-) diff --git a/vast_pipeline/image/main.py b/vast_pipeline/image/main.py index 24a65ee1f..d50d25a49 100644 --- a/vast_pipeline/image/main.py +++ b/vast_pipeline/image/main.py @@ -431,15 +431,11 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: logger.debug("Condon errors done.") - logger.debug("Calculating positional errors...") # TODO: avoid extra column given that it is a single value df['ew_sys_err'] = self.config["ra_uncertainty"] / 3600. df['ns_sys_err'] = self.config["dec_uncertainty"] / 3600. - logger.debug(df['ra_err']) - logger.debug(df['ra_err'].mean()) - df['error_radius'] = calc_error_radius( df['ra'].values, df['ra_err'].values, @@ -451,9 +447,6 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: df['ew_sys_err'].values, df['error_radius'].values ) - logger.debug(df['uncertainty_ew']) - logger.debug(df['uncertainty_ew'].mean()) - df['uncertainty_ns'] = np.hypot( df['ns_sys_err'].values, df['error_radius'].values ) @@ -461,8 +454,7 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: # weight calculations to use later df['weight_ew'] = 1. / df['uncertainty_ew'].values**2 df['weight_ns'] = 1. / df['uncertainty_ns'].values**2 - logger.debug('Positional errors done.') - + # Initialise the forced column as False df['forced'] = False diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 5fcb306fe..747e25c70 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -687,15 +687,6 @@ def forced_extraction( long_comps = extr_df.loc[extr_df['component_id'].str.len() > 63] long_isls = extr_df.loc[extr_df['island_id'].str.len() > 63] - if len(long_names) > 0: - logger.debug("Entries with long names:") - logger.debug(long_names) - if len(long_comps) > 0: - logger.debug("Entries with long component ids:") - logger.debug(long_comps) - if len(long_isls) > 0: - logger.debug("Entries with long island ids:") - logger.debug(long_isls) extr_df = make_upload_measurements(extr_df) extr_df = extr_df.rename(columns={'source_tmp_id': 'source'}) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 5490240d9..407bbe46a 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -344,8 +344,6 @@ def new_sources( ).values(*tuple(cols)) )).set_index('name') - logger.debug(f"Time to make images_df: {debug_timer.reset()}s") - # Get rid of sources that are not 'new', i.e. sources which the # first sky region image is not in the image list new_sources_df = missing_sources_df[ @@ -353,8 +351,7 @@ def new_sources( ].drop( columns=['in_primary'] ) - logger.debug(f"Time to make new_sources_df: {debug_timer.reset()}s") - + # Check if the previous sources would have actually been seen # i.e. are the previous images sensitive enough From 4f4eb8b5864977cd74c72ec82271726cd3d92d95 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 23 May 2024 12:05:21 +1000 Subject: [PATCH 43/48] Lots of debugging statements --- vast_pipeline/pipeline/new_sources.py | 52 +++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 407bbe46a..e1190f8c0 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -79,10 +79,17 @@ def get_image_rms_measurements( The group dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. """ + if len(group) == 0: # input dataframe is empty, nothing to do + logger.debug(f"No image RMS measurements to get, returning") return group + image = group.iloc[0]['img_diff_rms_path'] + logger.debug(f"{image} - num. meas. to get: {len(group)}") + partition_mem = group.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"{image} - partition memory usage: {partition_mem}MB") + get_rms_timer = StopWatch() with open_fits(image) as hdul: header = hdul[0].header @@ -229,8 +236,9 @@ def parallel_get_rms_measurements( if npartitions < n_cpu: npartitions=n_cpu - logger.debug(f"Running parallel_get_rms_measurements with {n_cpu} CPUs....") - logger.debug(f"and using {npartitions} partions of {partition_size_mb}MB...") + logger.debug(f"df mem usage: {mem_usage_mb}MB") + logger.debug(f"Applying get_rms_measurements with {n_cpu} CPUs....") + logger.debug(f"and using {npartitions} partitions of {partition_size_mb}MB...") #out = out.set_index('img_diff_rms_path') #logger.debug(out) #logger.debug(out['img_diff_rms_path']) @@ -243,12 +251,49 @@ def parallel_get_rms_measurements( meta=col_dtype ).compute(num_workers=n_cpu, scheduler='processes') ) - + logger.debug("Finished applying get_image_rms_measurements") + + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"df length: {len(df)}") + logger.debug(f"df mem usage: {mem_usage_mb}MB") + + mem_usage_mb = out.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"out length: {len(out)}") + logger.debug(f"out mem usage: {mem_usage_mb}MB") + + logger.debug(f"Number source NaN: {out.source.isna().sum()}") + logger.debug(f"Number rms NaN: {out.img_diff_true_rms.isna().sum()}") + + logger.debug("Starting df merge...") + + #df_to_merge = (df.sort_values( + # by=['source', 'flux_peak'] + # ) + # .drop_duplicates('source') + # .drop(['img_diff_rms_path'], axis=1) + #) + #logger.debug(df_to_merge.columns) + logger.debug(df.columns) + #logger.debug(f"Length df to merge: {len(df_to_merge)}") df = df.merge( out[['source', 'img_diff_true_rms']], left_on='source', right_on='source', how='left' ) + logger.debug("Finished df merge...") + + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Merged df length: {len(df)}") + logger.debug(f"Merged df mem usage: {mem_usage_mb}MB") + logger.debug(df) + logger.debug(df.columns) + + + df_readable = df + df_readable['name'] = df.img_diff_rms_path.str.split('/').str[-1] + logger.debug(df_readable[['source', 'name', 'img_diff_true_rms']]) + logger.debug(df_readable.source.sort_values()) + logger.debug(df_readable.img_diff_true_rms.sort_values()) return df @@ -431,6 +476,7 @@ def new_sources( # measure the actual rms in the previous images at # the source location. + logger.debug("Getting rms measurements...") new_sources_df = parallel_get_rms_measurements( new_sources_df, edge_buffer=edge_buffer ) From edae5f802eb6b97de19955aeb9c5b616a12ca9e0 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 23 May 2024 14:08:42 +1000 Subject: [PATCH 44/48] Correctly estimate partition size --- vast_pipeline/pipeline/new_sources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index e1190f8c0..95b2a8158 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -231,7 +231,7 @@ def parallel_get_rms_measurements( n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu partition_size_mb=100 - mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + mem_usage_mb = out.memory_usage(deep=True).sum() / 1e6 npartitions = int(np.ceil(mem_usage_mb/partition_size_mb)) if npartitions < n_cpu: From f0cffcc2a44f9049462606712e1d18536c7e966a Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 23 May 2024 15:31:17 +1000 Subject: [PATCH 45/48] More playing --- vast_pipeline/pipeline/new_sources.py | 32 +++++++++++++++++++-------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 95b2a8158..a948d6021 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -75,7 +75,7 @@ def get_image_rms_measurements( edge_buffer: Multiplicative factor applied to nbeam to act as a buffer. - Returns: + Returns:F The group dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. """ @@ -216,6 +216,8 @@ def parallel_get_rms_measurements( The original input dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. """ + df.to_csv('parallel_get_rms_measurements_input.csv') + out = df[[ 'source', 'wavg_ra', 'wavg_dec', 'img_diff_rms_path' @@ -266,14 +268,20 @@ def parallel_get_rms_measurements( logger.debug("Starting df merge...") - #df_to_merge = (df.sort_values( - # by=['source', 'flux_peak'] - # ) - # .drop_duplicates('source') - # .drop(['img_diff_rms_path'], axis=1) - #) + df_to_merge = (df.sort_values( + by=['source', 'flux_peak'] + ) + .drop_duplicates('source') + .drop(['img_diff_rms_path'], axis=1) + ) + out_to_merge = (out.sort_values( + by=['source','img_diff_true_rms'] + ) + .drop_duplicates('source') + ) + #logger.debug(df_to_merge.columns) - logger.debug(df.columns) + #logger.debug(df.columns) #logger.debug(f"Length df to merge: {len(df_to_merge)}") df = df.merge( out[['source', 'img_diff_true_rms']], @@ -498,8 +506,9 @@ def new_sources( # We only care about the highest true sigma new_sources_df = new_sources_df.sort_values( - by=['source', 'true_sigma'] + by=['source', 'true_sigma'], ascending=False ) + new_sources_df.to_csv('new_sources_df_before_drop.csv') # keep only the highest for each source, rename for the daatabase new_sources_df = ( @@ -507,16 +516,21 @@ def new_sources( .drop_duplicates('source') .set_index('source') .rename(columns={'true_sigma': 'new_high_sigma'}) + .sort_values('source') ) # moving forward only the new_high_sigma columns is needed, drop all # others. new_sources_df = new_sources_df[['new_high_sigma']] + new_sources_df.to_csv('new_high_sigma_orig_corrected.csv') + logger.debug(f"Time to to do final cleanup steps {debug_timer.reset()}s") logger.info( 'Total new source analysis time: %.2f seconds', timer.reset_init() ) + + raise Exception("End of new source calc") return new_sources_df From 108259603ec94e6f4eddec86427df75a4a03f1a2 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 23 May 2024 15:31:36 +1000 Subject: [PATCH 46/48] Stash? --- vast_pipeline/pipeline/finalise.py | 1 + vast_pipeline/pipeline/main.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/vast_pipeline/pipeline/finalise.py b/vast_pipeline/pipeline/finalise.py index 9a1014801..e5a7cdb85 100644 --- a/vast_pipeline/pipeline/finalise.py +++ b/vast_pipeline/pipeline/finalise.py @@ -175,6 +175,7 @@ def final_operations( # create measurement pairs, aka 2-epoch metrics if calculate_pairs: + sources_df.to_parquet('calcalate_measurement_pair_metrics_input_df.parquet') timer.reset() measurement_pairs_df = calculate_measurement_pair_metrics(sources_df) logger.info('Measurement pair metrics time: %.2f seconds', timer.reset()) diff --git a/vast_pipeline/pipeline/main.py b/vast_pipeline/pipeline/main.py index 1abf16069..fcecaa43e 100644 --- a/vast_pipeline/pipeline/main.py +++ b/vast_pipeline/pipeline/main.py @@ -294,6 +294,10 @@ def process_pipeline(self, p_run: Run) -> None: del missing_sources_df logger.debug(get_memory_usage()) + + sources_df.to_parquet('final_operations_input_sources_df.parquet') + new_sources_df.to_parquet('final_operations_input_new_sources_df.parquet') + # STEP #6: finalise the df getting unique sources, calculating # metrics and upload data to database From ec6630075258de097772a7991b6928fab4b87cc4 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Thu, 23 May 2024 17:08:37 +1000 Subject: [PATCH 47/48] More changes to make this work --- vast_pipeline/pipeline/new_sources.py | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index a948d6021..32b63b75b 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -216,7 +216,7 @@ def parallel_get_rms_measurements( The original input dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. """ - df.to_csv('parallel_get_rms_measurements_input.csv') + #df.to_csv('parallel_get_rms_measurements_input.csv') out = df[[ 'source', 'wavg_ra', 'wavg_dec', @@ -269,7 +269,8 @@ def parallel_get_rms_measurements( logger.debug("Starting df merge...") df_to_merge = (df.sort_values( - by=['source', 'flux_peak'] + by=['source', 'flux_peak'], + ascending=False ) .drop_duplicates('source') .drop(['img_diff_rms_path'], axis=1) @@ -280,11 +281,8 @@ def parallel_get_rms_measurements( .drop_duplicates('source') ) - #logger.debug(df_to_merge.columns) - #logger.debug(df.columns) - #logger.debug(f"Length df to merge: {len(df_to_merge)}") - df = df.merge( - out[['source', 'img_diff_true_rms']], + df = df_to_merge.merge( + out_to_merge[['source', 'img_diff_true_rms']], left_on='source', right_on='source', how='left' ) @@ -294,15 +292,7 @@ def parallel_get_rms_measurements( logger.debug(f"Merged df length: {len(df)}") logger.debug(f"Merged df mem usage: {mem_usage_mb}MB") logger.debug(df) - logger.debug(df.columns) - - df_readable = df - df_readable['name'] = df.img_diff_rms_path.str.split('/').str[-1] - logger.debug(df_readable[['source', 'name', 'img_diff_true_rms']]) - logger.debug(df_readable.source.sort_values()) - logger.debug(df_readable.img_diff_true_rms.sort_values()) - return df @@ -508,7 +498,7 @@ def new_sources( new_sources_df = new_sources_df.sort_values( by=['source', 'true_sigma'], ascending=False ) - new_sources_df.to_csv('new_sources_df_before_drop.csv') + #new_sources_df.to_csv('new_sources_df_before_drop.csv') # keep only the highest for each source, rename for the daatabase new_sources_df = ( @@ -523,14 +513,12 @@ def new_sources( # others. new_sources_df = new_sources_df[['new_high_sigma']] - new_sources_df.to_csv('new_high_sigma_orig_corrected.csv') + #new_sources_df.to_csv('new_high_sigma_newcalc2.csv') logger.debug(f"Time to to do final cleanup steps {debug_timer.reset()}s") logger.info( 'Total new source analysis time: %.2f seconds', timer.reset_init() ) - - raise Exception("End of new source calc") return new_sources_df From a7f6f4b41b7d16c7a77a061a87d3e733d6549930 Mon Sep 17 00:00:00 2001 From: Dougal Dobie Date: Tue, 16 Jul 2024 17:28:32 +1000 Subject: [PATCH 48/48] Fix merge --- vast_pipeline/pipeline/loading.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index 3b68c968e..bdbf451fb 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -235,24 +235,12 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: None. """ logger.info('Upload associations...') -<<<<<<< HEAD - mem_usage = associations_df.memory_usage(deep=True).sum() / 1e6 - logger.debug(f"associations_df memory usage: {mem_usage}MB") - logger.debug(get_memory_usage()) - -======= ->>>>>>> dev + assoc_chunk_size = 100000 for i in range(0,len(associations_df),assoc_chunk_size): bulk_upload_model( Association, -<<<<<<< HEAD - association_models_generator(associations_df[i:i+assoc_chunk_size]), - batch_size=10000, - log_mem_usage=True -======= association_models_generator(associations_df[i:i+assoc_chunk_size]) ->>>>>>> dev )