diff --git a/vast_pipeline/_version.py b/vast_pipeline/_version.py index b02bd157..053832bc 100644 --- a/vast_pipeline/_version.py +++ b/vast_pipeline/_version.py @@ -1 +1 @@ -__version__ = '1.0.0dev' +__version__ = '1.0.1dev' diff --git a/vast_pipeline/image/main.py b/vast_pipeline/image/main.py index 2a8edaf5..d50d25a4 100644 --- a/vast_pipeline/image/main.py +++ b/vast_pipeline/image/main.py @@ -431,11 +431,11 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: logger.debug("Condon errors done.") - logger.debug("Calculating positional errors...") # TODO: avoid extra column given that it is a single value df['ew_sys_err'] = self.config["ra_uncertainty"] / 3600. df['ns_sys_err'] = self.config["dec_uncertainty"] / 3600. + df['error_radius'] = calc_error_radius( df['ra'].values, df['ra_err'].values, @@ -446,7 +446,7 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: df['uncertainty_ew'] = np.hypot( df['ew_sys_err'].values, df['error_radius'].values ) - + df['uncertainty_ns'] = np.hypot( df['ns_sys_err'].values, df['error_radius'].values ) @@ -454,9 +454,7 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: # weight calculations to use later df['weight_ew'] = 1. / df['uncertainty_ew'].values**2 df['weight_ns'] = 1. / df['uncertainty_ns'].values**2 - - logger.debug('Positional errors done.') - + # Initialise the forced column as False df['forced'] = False diff --git a/vast_pipeline/pipeline/association.py b/vast_pipeline/pipeline/association.py index d5e46cb0..57586045 100644 --- a/vast_pipeline/pipeline/association.py +++ b/vast_pipeline/pipeline/association.py @@ -934,7 +934,7 @@ def advanced_association( association. ''' # read the needed sources fields - # Step 1: get matches within semimajor axis of image. + # Step 1: get matches within semimajor axis of image. idx_skyc1, idx_skyc2, d2d, d3d = skyc2.search_around_sky( skyc1, bw_max ) @@ -1298,7 +1298,7 @@ def association( ) sources_df = sources_df.drop(['ra_wrap'], axis=1) - + tmp_srcs_df = ( sources_df.loc[ (sources_df['source'] != -1) & (sources_df['forced'] == False), @@ -1334,7 +1334,9 @@ def association( 'weight_ns': 'uncertainty_ns' }) ) - + nan_indices = weighted_df.query("ra.isnull()", engine='python').index + nan_sources = weighted_df.iloc[nan_indices].source.values + # correct the RA wrapping ra_wrap_mask = weighted_df.ra >= 360. weighted_df.loc[ diff --git a/vast_pipeline/pipeline/finalise.py b/vast_pipeline/pipeline/finalise.py index b02a27d4..e5a7cdb8 100644 --- a/vast_pipeline/pipeline/finalise.py +++ b/vast_pipeline/pipeline/finalise.py @@ -14,7 +14,7 @@ update_sources ) from vast_pipeline.pipeline.pairs import calculate_measurement_pair_metrics -from vast_pipeline.pipeline.utils import parallel_groupby +from vast_pipeline.pipeline.utils import parallel_groupby, get_memory_usage logger = logging.getLogger(__name__) @@ -132,8 +132,15 @@ def final_operations( 'Calculating statistics for %i sources...', sources_df.source.unique().shape[0] ) + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"sources_df memory: {mem_usage}MB") + srcs_df = parallel_groupby(sources_df) + logger.info('Groupby-apply time: %.2f seconds', timer.reset()) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Initial srcs_df memory: {mem_usage}MB") + logger.debug(get_memory_usage()) # add new sources srcs_df["new"] = srcs_df.index.isin(new_sources_df.index) @@ -145,6 +152,9 @@ def final_operations( how="left", ) srcs_df["new_high_sigma"] = srcs_df["new_high_sigma"].fillna(0.0) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after adding new sources: {mem_usage}MB") + logger.debug(get_memory_usage()) # calculate nearest neighbour srcs_skycoord = SkyCoord( @@ -159,12 +169,19 @@ def final_operations( # add the separation distance in degrees srcs_df['n_neighbour_dist'] = d2d.deg + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after nearest-neighbour: {mem_usage}MB") + logger.debug(get_memory_usage()) # create measurement pairs, aka 2-epoch metrics if calculate_pairs: + sources_df.to_parquet('calcalate_measurement_pair_metrics_input_df.parquet') timer.reset() measurement_pairs_df = calculate_measurement_pair_metrics(sources_df) logger.info('Measurement pair metrics time: %.2f seconds', timer.reset()) + mem_usage = measurement_pairs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"measurement_pairs_df memory: {mem_usage}MB") + logger.debug(get_memory_usage()) # calculate measurement pair metric aggregates for sources by finding the row indices # of the aggregate max of the abs(m) metric for each flux type. @@ -189,6 +206,9 @@ def final_operations( "m_abs_significant_max_int": 0.0, }) logger.info("Measurement pair aggregate metrics time: %.2f seconds", timer.reset()) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after calculate_pairs: {mem_usage}MB") + logger.debug(get_memory_usage()) else: logger.info( "Skipping measurement pair metric calculation as specified in the run configuration." @@ -201,18 +221,36 @@ def final_operations( # upload new ones first (new id's are fetched) src_done_mask = srcs_df.index.isin(done_source_ids) srcs_df_upload = srcs_df.loc[~src_done_mask].copy() + mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_upload initial memory: {mem_usage}MB") + logger.debug(get_memory_usage()) + srcs_df_upload = make_upload_sources(srcs_df_upload, p_run, add_mode) + mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_upload memory after upload: {mem_usage}MB") + logger.debug(get_memory_usage()) # And now update srcs_df_update = srcs_df.loc[src_done_mask].copy() + mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_update memory: {mem_usage}MB") + logger.debug(get_memory_usage()) logger.info( f"Updating {srcs_df_update.shape[0]} sources with new metrics.") + srcs_df = update_sources(srcs_df_update, batch_size=1000) + mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df_update memory after update: {mem_usage}MB") + logger.debug(get_memory_usage()) # Add back together if not srcs_df_upload.empty: srcs_df = pd.concat([srcs_df, srcs_df_upload]) else: srcs_df = make_upload_sources(srcs_df, p_run, add_mode) + mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"srcs_df memory after upload_sources: {mem_usage}MB") + logger.debug(get_memory_usage()) + # gather the related df, upload to db and save to parquet file # the df will look like # @@ -230,11 +268,17 @@ def final_operations( .explode("related_list") .rename(columns={"id": "from_source_id", "related_list": "to_source_id"}) ) + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory: {mem_usage}MB") + logger.debug(get_memory_usage()) # for the column 'from_source_id', replace relation source ids with db id related_df["to_source_id"] = related_df["to_source_id"].map(srcs_df["id"].to_dict()) # drop relationships with the same source related_df = related_df[related_df["from_source_id"] != related_df["to_source_id"]] + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory after calcs: {mem_usage}MB") + logger.debug(get_memory_usage()) # write symmetrical relations to parquet related_df.to_parquet( @@ -256,7 +300,14 @@ def final_operations( ) logger.debug(f'Add mode: #{related_df.shape[0]} relations to upload.') + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory after partitioning: {mem_usage}MB") + logger.debug(get_memory_usage()) + make_upload_related_sources(related_df) + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory after upload: {mem_usage}MB") + logger.debug(get_memory_usage()) del related_df @@ -272,6 +323,9 @@ def final_operations( sources_df.drop('related', axis=1) .merge(srcs_df.rename(columns={'id': 'source_id'}), on='source') ) + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"sources_df memory after srcs_df merge: {mem_usage}MB") + logger.debug(get_memory_usage()) if add_mode: # Load old associations so the already uploaded ones can be removed diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index eef7a3ed..747e25c7 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -166,6 +166,7 @@ def extract_from_image( Dictionary with input dataframe with added columns (flux_int, flux_int_err, chi_squared_fit) and image name. """ + timer = StopWatch() # create the skycoord obj to pass to the forced extraction # see usage https://github.com/dlakaplan/forced_phot P_islands = SkyCoord( @@ -173,6 +174,7 @@ def extract_from_image( df['wavg_dec'].values, unit=(u.deg, u.deg) ) + # load the image, background and noisemaps into memory # a dedicated function may seem unneccesary, but will be useful if we # split the load to a separate thread. @@ -183,12 +185,14 @@ def extract_from_image( ) FP = ForcedPhot(*forcedphot_input) + flux, flux_err, chisq, DOF, cluster_id = FP.measure( P_islands, cluster_threshold=cluster_threshold, allow_nan=allow_nan, edge_buffer=edge_buffer ) + logger.debug(f"Time to measure FP for {image}: {timer.reset()}s") df['flux_int'] = flux * 1.e3 df['flux_int_err'] = flux_err * 1.e3 df['chi_squared_fit'] = chisq @@ -238,7 +242,7 @@ def finalise_forced_dfs( df['component_id'] = df['island_id'].str.replace( 'island', 'component' ) + 'a' - img_prefix = image.split('.')[0] + '_' + img_prefix = ""#image.split('.')[0] + '_' df['name'] = img_prefix + df['component_id'] # assign all the other columns # convert fluxes to mJy @@ -390,7 +394,8 @@ def image_data_func(image_name: str) -> Dict[str, Any]: ) del col_to_drop - n_cpu = cpu_count() - 1 + #n_cpu = cpu_count() - 1 # this doesn't work because cpu_count returns the number of CPUs in the machine, not the container. + n_cpu = 10 bags = db.from_sequence(list_to_map, npartitions=len(list_to_map)) forced_dfs = ( bags.map(lambda x: extract_from_image( @@ -399,7 +404,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]: allow_nan=allow_nan, **x )) - .compute() + .compute(scheduler='processes', num_workers=n_cpu) ) del bags # create intermediates dfs combining the mapping data and the forced @@ -477,7 +482,7 @@ def get_fname(n): return os.path.join( 'forced_measurements_' + n.replace('.', '_') + '.parquet' ) dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images)) - n_cpu = cpu_count() - 1 + n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu # writing parquets using Dask bag bags = db.from_sequence(dfs) @@ -610,7 +615,7 @@ def forced_extraction( ) # make measurement names unique for db constraint - extr_df['name'] = extr_df['name'] + f'_f_run{p_run.id:06d}' + extr_df['name'] = extr_df['name'] + f'_f_run{p_run.id:03d}' # select sensible flux values and set the columns with fix values values = { @@ -678,6 +683,10 @@ def forced_extraction( extr_df = extr_df[col_order + remaining] # upload the measurements, a column 'id' is returned with the DB id + long_names = extr_df.loc[extr_df['name'].str.len() > 63] + long_comps = extr_df.loc[extr_df['component_id'].str.len() > 63] + long_isls = extr_df.loc[extr_df['island_id'].str.len() > 63] + extr_df = make_upload_measurements(extr_df) extr_df = extr_df.rename(columns={'source_tmp_id': 'source'}) diff --git a/vast_pipeline/pipeline/loading.py b/vast_pipeline/pipeline/loading.py index fe752c50..bdbf451f 100644 --- a/vast_pipeline/pipeline/loading.py +++ b/vast_pipeline/pipeline/loading.py @@ -18,7 +18,9 @@ Association, Band, Measurement, SkyRegion, Source, RelatedSource, Run, Image ) -from vast_pipeline.pipeline.utils import get_create_img, get_create_img_band +from vast_pipeline.pipeline.utils import ( + get_create_img, get_create_img_band, get_memory_usage +) from vast_pipeline.utils.utils import StopWatch @@ -31,6 +33,7 @@ def bulk_upload_model( generator: Iterable[Generator[models.Model, None, None]], batch_size: int=10_000, return_ids: bool=False, + log_mem_usage: bool=False, ) -> List[int]: ''' Bulk upload a list of generator objects of django models to db. @@ -55,6 +58,8 @@ def bulk_upload_model( bulk_ids = [] while True: items = list(islice(generator, batch_size)) + if log_mem_usage: + logger.debug(get_memory_usage()) if not items: break out_bulk = djmodel.objects.bulk_create(items) @@ -168,6 +173,10 @@ def make_upload_sources( Returns: The input dataframe with the 'id' column added. ''' + logger.debug("Uploading sources...") + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"sources_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) # create sources in DB with transaction.atomic(): if (add_mode is False and @@ -207,6 +216,9 @@ def make_upload_related_sources(related_df: pd.DataFrame) -> None: None. """ logger.info('Populate "related" field of sources...') + mem_usage = related_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"related_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) bulk_upload_model(RelatedSource, related_models_generator(related_df)) @@ -223,6 +235,7 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None: None. """ logger.info('Upload associations...') + assoc_chunk_size = 100000 for i in range(0,len(associations_df),assoc_chunk_size): bulk_upload_model( @@ -243,6 +256,10 @@ def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame: Returns: Original DataFrame with the database ID attached to each row. """ + logger.info("Upload measurements...") + mem_usage = measurements_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"measurements_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) meas_dj_ids = bulk_upload_model( Measurement, measurement_models_generator(measurements_df), diff --git a/vast_pipeline/pipeline/main.py b/vast_pipeline/pipeline/main.py index 6e1fe172..fcecaa43 100644 --- a/vast_pipeline/pipeline/main.py +++ b/vast_pipeline/pipeline/main.py @@ -28,7 +28,8 @@ get_src_skyregion_merged_df, group_skyregions, get_parallel_assoc_image_df, - write_parquets + write_parquets, + get_memory_usage ) from .errors import MaxPipelineRunsError @@ -232,7 +233,9 @@ def process_pipeline(self, p_run: Run) -> None: self.previous_parquets, done_images_df ) - + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Step 2: sources_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) # Obtain the number of selavy measurements for the run # n_selavy_measurements = sources_df. nr_selavy_measurements = sources_df['id'].unique().shape[0] @@ -285,8 +288,16 @@ def process_pipeline(self, p_run: Run) -> None: done_images_df, done_source_ids ) + mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Step 5: sources_df memory usage: {mem_usage}MB") + logger.debug(get_memory_usage()) del missing_sources_df + logger.debug(get_memory_usage()) + + sources_df.to_parquet('final_operations_input_sources_df.parquet') + new_sources_df.to_parquet('final_operations_input_new_sources_df.parquet') + # STEP #6: finalise the df getting unique sources, calculating # metrics and upload data to database @@ -300,6 +311,7 @@ def process_pipeline(self, p_run: Run) -> None: done_source_ids, self.previous_parquets ) + logger.debug(get_memory_usage()) # calculate number processed images nr_img_processed = len(images) diff --git a/vast_pipeline/pipeline/model_generator.py b/vast_pipeline/pipeline/model_generator.py index 310babae..c81da32b 100644 --- a/vast_pipeline/pipeline/model_generator.py +++ b/vast_pipeline/pipeline/model_generator.py @@ -87,6 +87,7 @@ def association_models_generator( Returns: An iterable generator object containing the yielded Association objects. """ + logger.debug(f"Building {len(assoc_df)} association generators") for i, row in assoc_df.iterrows(): yield Association( meas_id=row['id'], @@ -94,6 +95,7 @@ def association_models_generator( d2d=row['d2d'], dr=row['dr'], ) + logger.debug(f"Built {len(assoc_df)} association generators") def related_models_generator( diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index 459f3047..5dcec5dd 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -75,20 +75,29 @@ def get_image_rms_measurements( edge_buffer: Multiplicative factor applied to nbeam to act as a buffer. - Returns: + Returns:F The group dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. """ + if len(group) == 0: # input dataframe is empty, nothing to do + logger.debug(f"No image RMS measurements to get, returning") return group + image = group.iloc[0]['img_diff_rms_path'] - + logger.debug(f"{image} - num. meas. to get: {len(group)}") + partition_mem = group.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"{image} - partition memory usage: {partition_mem}MB") + + get_rms_timer = StopWatch() with open_fits(image) as hdul: header = hdul[0].header wcs = WCS(header, naxis=2) data = hdul[0].data.squeeze() - + + logger.debug(f"{image} - Time to load fits: {get_rms_timer.reset()}s") + # Here we mimic the forced fits behaviour, # sources within 3 half BMAJ widths of the image # edges are ignored. The user buffer is also @@ -105,11 +114,11 @@ def get_image_rms_measurements( ) npix = int(round(npix * edge_buffer)) - + + get_rms_timer.reset() coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) - array_coords = gen_array_coords_from_wcs(coords, wcs) # check for pixel wrapping @@ -142,6 +151,7 @@ def get_image_rms_measurements( # Now we also need to check proximity to NaN values # as forced fits may also drop these values + get_rms_timer.reset() coords = SkyCoord( group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg) ) @@ -154,6 +164,7 @@ def get_image_rms_measurements( nan_valid = [] + get_rms_timer.reset() # Get slices of each source and check NaN is not included. for i, j in zip(array_coords[0], array_coords[1]): sl = tuple(( @@ -205,6 +216,8 @@ def parallel_get_rms_measurements( The original input dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. """ + #df.to_csv('parallel_get_rms_measurements_input.csv') + out = df[[ 'source', 'wavg_ra', 'wavg_dec', 'img_diff_rms_path' @@ -231,13 +244,46 @@ def parallel_get_rms_measurements( meta=col_dtype ).compute(num_workers=n_cpu, scheduler='processes') ) - - df = df.merge( - out[['source', 'img_diff_true_rms']], + logger.debug("Finished applying get_image_rms_measurements") + + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"df length: {len(df)}") + logger.debug(f"df mem usage: {mem_usage_mb}MB") + + mem_usage_mb = out.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"out length: {len(out)}") + logger.debug(f"out mem usage: {mem_usage_mb}MB") + + logger.debug(f"Number source NaN: {out.source.isna().sum()}") + logger.debug(f"Number rms NaN: {out.img_diff_true_rms.isna().sum()}") + + logger.debug("Starting df merge...") + + df_to_merge = (df.sort_values( + by=['source', 'flux_peak'], + ascending=False + ) + .drop_duplicates('source') + .drop(['img_diff_rms_path'], axis=1) + ) + out_to_merge = (out.sort_values( + by=['source','img_diff_true_rms'] + ) + .drop_duplicates('source') + ) + + df = df_to_merge.merge( + out_to_merge[['source', 'img_diff_true_rms']], left_on='source', right_on='source', how='left' ) - + logger.debug("Finished df merge...") + + mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6 + logger.debug(f"Merged df length: {len(df)}") + logger.debug(f"Merged df mem usage: {mem_usage_mb}MB") + logger.debug(df) + return df @@ -317,6 +363,7 @@ def new_sources( # ['VAST_0127-73A.EPOCH08.I.fits'] | # ----------------------------------+ timer = StopWatch() + debug_timer = StopWatch() logger.info("Starting new source analysis.") @@ -330,7 +377,7 @@ def new_sources( run=p_run ).values(*tuple(cols)) )).set_index('name') - + # Get rid of sources that are not 'new', i.e. sources which the # first sky region image is not in the image list new_sources_df = missing_sources_df[ @@ -338,7 +385,7 @@ def new_sources( ].drop( columns=['in_primary'] ) - + # Check if the previous sources would have actually been seen # i.e. are the previous images sensitive enough @@ -370,6 +417,8 @@ def new_sources( 'rms_median': 'img_diff_rms_median', 'noise_path': 'img_diff_rms_path' }) + + logger.debug(f"Time to reset & merge image info into new_sources_df: {debug_timer.reset()}s") # Select only those images that come before the detection image # in time. @@ -383,6 +432,8 @@ def new_sources( left_on=['source', 'detection'], right_on=['source', 'image'], how='left' ).drop(columns=['image']) + + logger.debug(f"Time to merge detection fluxes into new_sources_df: {debug_timer.reset()}s") # calculate the sigma of the source if it was placed in the # minimum rms region of the previous images @@ -395,6 +446,8 @@ def new_sources( new_sources_df = new_sources_df.loc[ new_sources_df['diff_sigma'] >= min_sigma ] + + logger.debug(f"Time to do new_sources_df threshold calcs: {debug_timer.reset()}s") # Now have list of sources that should have been seen before given # previous images minimum rms values. @@ -412,9 +465,11 @@ def new_sources( # measure the actual rms in the previous images at # the source location. + logger.debug("Getting rms measurements...") new_sources_df = parallel_get_rms_measurements( new_sources_df, edge_buffer=edge_buffer ) + logger.debug(f"Time to get rms measurements: {debug_timer.reset()}s") # this removes those that are out of range new_sources_df['img_diff_true_rms'] = ( @@ -432,8 +487,9 @@ def new_sources( # We only care about the highest true sigma new_sources_df = new_sources_df.sort_values( - by=['source', 'true_sigma'] + by=['source', 'true_sigma'], ascending=False ) + #new_sources_df.to_csv('new_sources_df_before_drop.csv') # keep only the highest for each source, rename for the daatabase new_sources_df = ( @@ -441,11 +497,16 @@ def new_sources( .drop_duplicates('source') .set_index('source') .rename(columns={'true_sigma': 'new_high_sigma'}) + .sort_values('source') ) # moving forward only the new_high_sigma columns is needed, drop all # others. new_sources_df = new_sources_df[['new_high_sigma']] + + #new_sources_df.to_csv('new_high_sigma_newcalc2.csv') + + logger.debug(f"Time to to do final cleanup steps {debug_timer.reset()}s") logger.info( 'Total new source analysis time: %.2f seconds', timer.reset_init() diff --git a/vast_pipeline/pipeline/pairs.py b/vast_pipeline/pipeline/pairs.py index a7c5eb10..fde7bbe8 100644 --- a/vast_pipeline/pipeline/pairs.py +++ b/vast_pipeline/pipeline/pairs.py @@ -89,6 +89,8 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame: 2 12929 21994 11128 0 6216 23534 """ + + measurement_combinations = ( dd.from_pandas(df, npartitions=n_partitions) .groupby("source")["id"] diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index bca35ce4..2ff42395 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -30,6 +30,7 @@ from vast_pipeline.models import ( Band, Image, Run, SkyRegion ) +import psutil logger = logging.getLogger(__name__) @@ -766,6 +767,7 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame: 'wavg_ra': 'f', 'wavg_dec': 'f', } + n_cpu = cpu_count() - 1 logger.debug(f"Running association with {n_cpu} CPUs") n_partitions = calculate_n_partitions(df, n_cpu) @@ -955,6 +957,7 @@ def get_src_skyregion_merged_df( # VAST_0127-73A.EPOCH01.I.fits | True | # ------------------------------+--------------+ logger.info("Creating ideal source coverage df...") + #logger.debug(sources_df.head()) merged_timer = StopWatch() @@ -1714,3 +1717,15 @@ def write_parquets( ) return skyregs_df + +def get_memory_usage(): + """ + This function gets the current memory usage and returns a string. + + Returns: + A string containing the current resource usage. + """ + mem = psutil.virtual_memory()[3] #resource usage in bytes + mem = mem / 1024**3 #resource usage in GB + + return f"Current memory usage: {mem:.3f}GB"