Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add further debug logging #691

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
192289f
Initial changes
ddobie Jun 29, 2023
93c4e42
Bump to 1.0.1 for reference
ddobie Jul 4, 2023
1a0eebb
Added forced photometry timer debug statements
ddobie Jul 4, 2023
a05bb86
Fixed previous commit
ddobie Jul 4, 2023
fa0bda6
Log number of sources being forced fit
ddobie Jul 4, 2023
1f5c7c6
Updated forced fitting scheduler to processes
ddobie Jul 4, 2023
84c4423
Added timing debug loggging to new_sources.py
ddobie Jul 5, 2023
9b41f7b
Fixed f-strings
ddobie Jul 5, 2023
63e80c3
Added association_models_generator debug logging
ddobie Jul 7, 2023
8ab0f67
Drop associations batch size to 100
ddobie Jul 7, 2023
d7f95d1
Added get_memory_usage function
ddobie Jul 7, 2023
ba4f91b
Added get_memory_usage to finalise.py
ddobie Jul 7, 2023
0e6bf3a
Added get_memory_usage to loading.py
ddobie Jul 7, 2023
1cc2f0e
Added get_memory_usage to main.py
ddobie Jul 7, 2023
b181cd7
Cleaned things up
ddobie Jul 7, 2023
43d9d2f
Hardcode n_cpu
ddobie Jul 13, 2023
9c30c43
Shorten forced fit measurment name
ddobie Jul 30, 2023
029418e
Add some further logging
ddobie Jul 30, 2023
2cf9970
Only log if long
ddobie Jul 30, 2023
8faefdf
Fix merge conflicts
Dec 14, 2023
e95351f
Fixed indent
ddobie Dec 14, 2023
ea30f62
Fix syntax error
ddobie Dec 14, 2023
cb22d33
Revert association size to 1e5
ddobie Feb 6, 2024
e77d538
Temporarily hardcode n_cpu=10
ddobie Feb 13, 2024
72d1aeb
Added some more logging
ddobie Feb 13, 2024
b3e217b
Attempt to limit memory usage
ddobie Feb 13, 2024
41bd240
Attempt to limit memory usage
ddobie Feb 13, 2024
83b60cf
Update parallel_grouppby to use chunksize
ddobie Feb 14, 2024
9102656
Correctly set chunk size
ddobie Feb 15, 2024
19741ba
Remove superfluous logging
ddobie Feb 16, 2024
1598169
Further dask groupby fixes
ddobie Feb 16, 2024
a3e4135
Added django.db.reset_queries
ddobie Feb 20, 2024
f3fb2b8
Added chunking for associations upload
ddobie Feb 20, 2024
868d518
Fixes
ddobie Feb 20, 2024
c06b2e5
calculate measurement pairs with sensible partitions
ddobie Feb 20, 2024
55b8a88
parallel_groupby_coord with sensible partitions
ddobie Feb 20, 2024
6b0455c
parallel_groupby_coord with sensible partitions
ddobie Feb 20, 2024
75d2fb0
parallel_association with sensible partitions
ddobie Feb 20, 2024
9da2404
parallel_get_rms_measurements with sensible partitions
ddobie Feb 20, 2024
1af0a9f
Fixed new_sources issue
ddobie Feb 21, 2024
7d93b31
commit all changes
ddobie Feb 25, 2024
40283e1
Remove unnecessary debug logging statements
ddobie May 22, 2024
2355d9e
Remove superfluous debug statements
ddobie May 22, 2024
4f4eb8b
Lots of debugging statements
ddobie May 23, 2024
edae5f8
Correctly estimate partition size
ddobie May 23, 2024
f0cffcc
More playing
ddobie May 23, 2024
1082596
Stash?
ddobie May 23, 2024
ec66300
More changes to make this work
ddobie May 23, 2024
d4bc1a5
Resolve conflicts
ddobie Jul 16, 2024
a7f6f4b
Fix merge
ddobie Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def final_operations(
sources_df.source.unique().shape[0]
)
srcs_df = parallel_groupby(sources_df)
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"Initial srcs_df memory: {mem_usage}MB")
logger.info('Groupby-apply time: %.2f seconds', timer.reset())

# add new sources
Expand All @@ -145,6 +147,8 @@ def final_operations(
how="left",
)
srcs_df["new_high_sigma"] = srcs_df["new_high_sigma"].fillna(0.0)
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after adding new sources: {mem_usage}MB")

# calculate nearest neighbour
srcs_skycoord = SkyCoord(
Expand All @@ -159,12 +163,16 @@ def final_operations(

# add the separation distance in degrees
srcs_df['n_neighbour_dist'] = d2d.deg
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after nearest-neighbour: {mem_usage}MB")

# create measurement pairs, aka 2-epoch metrics
if calculate_pairs:
timer.reset()
measurement_pairs_df = calculate_measurement_pair_metrics(sources_df)
logger.info('Measurement pair metrics time: %.2f seconds', timer.reset())
mem_usage = measurement_pairs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"measurement_pairs_df memory: {mem_usage}MB")

# calculate measurement pair metric aggregates for sources by finding the row indices
# of the aggregate max of the abs(m) metric for each flux type.
Expand All @@ -189,6 +197,8 @@ def final_operations(
"m_abs_significant_max_int": 0.0,
})
logger.info("Measurement pair aggregate metrics time: %.2f seconds", timer.reset())
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after calculate_pairs: {mem_usage}MB")
else:
logger.info(
"Skipping measurement pair metric calculation as specified in the run configuration."
Expand All @@ -201,18 +211,31 @@ def final_operations(
# upload new ones first (new id's are fetched)
src_done_mask = srcs_df.index.isin(done_source_ids)
srcs_df_upload = srcs_df.loc[~src_done_mask].copy()
mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_upload initial memory: {mem_usage}MB")

srcs_df_upload = make_upload_sources(srcs_df_upload, p_run, add_mode)
mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_upload memory after upload: {mem_usage}MB")
# And now update
srcs_df_update = srcs_df.loc[src_done_mask].copy()
mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_update memory: {mem_usage}MB")
logger.info(
f"Updating {srcs_df_update.shape[0]} sources with new metrics.")

srcs_df = update_sources(srcs_df_update, batch_size=1000)
mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_update memory after update: {mem_usage}MB")
# Add back together
if not srcs_df_upload.empty:
srcs_df = pd.concat([srcs_df, srcs_df_upload])
else:
srcs_df = make_upload_sources(srcs_df, p_run, add_mode)

mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after upload_sources: {mem_usage}MB")

# gather the related df, upload to db and save to parquet file
# the df will look like
#
Expand All @@ -230,11 +253,15 @@ def final_operations(
.explode("related_list")
.rename(columns={"id": "from_source_id", "related_list": "to_source_id"})
)
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory: {mem_usage}MB")

# for the column 'from_source_id', replace relation source ids with db id
related_df["to_source_id"] = related_df["to_source_id"].map(srcs_df["id"].to_dict())
# drop relationships with the same source
related_df = related_df[related_df["from_source_id"] != related_df["to_source_id"]]
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory after calcs: {mem_usage}MB")

# write symmetrical relations to parquet
related_df.to_parquet(
Expand All @@ -256,7 +283,12 @@ def final_operations(
)
logger.debug(f'Add mode: #{related_df.shape[0]} relations to upload.')

mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory after partitioning: {mem_usage}MB")

make_upload_related_sources(related_df)
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory after upload: {mem_usage}MB")

del related_df

Expand All @@ -272,6 +304,8 @@ def final_operations(
sources_df.drop('related', axis=1)
.merge(srcs_df.rename(columns={'id': 'source_id'}), on='source')
)
mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"sources_df memory after srcs_df merge: {mem_usage}MB")

if add_mode:
# Load old associations so the already uploaded ones can be removed
Expand Down
10 changes: 10 additions & 0 deletions vast_pipeline/pipeline/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ def make_upload_sources(
Returns:
The input dataframe with the 'id' column added.
'''
logger.debug("Uploading sources...")
mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"sources_df memory usage: {mem_usage}MB")
# create sources in DB
with transaction.atomic():
if (add_mode is False and
Expand Down Expand Up @@ -204,6 +207,8 @@ def make_upload_related_sources(related_df: pd.DataFrame) -> None:
None.
"""
logger.info('Populate "related" field of sources...')
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory usage: {mem_usage}MB")
bulk_upload_model(RelatedSource, related_models_generator(related_df))


Expand All @@ -220,6 +225,8 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None:
None.
"""
logger.info('Upload associations...')
mem_usage = associations_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"associations_df memory usage: {mem_usage}MB")
bulk_upload_model(
Association, association_models_generator(associations_df)
)
Expand All @@ -237,6 +244,9 @@ def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame:
Returns:
Original DataFrame with the database ID attached to each row.
"""
logger.info("Upload measurements...")
mem_usage = measurements_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"measurements_df memory usage: {mem_usage}MB")
meas_dj_ids = bulk_upload_model(
Measurement,
measurement_models_generator(measurements_df),
Expand Down
5 changes: 4 additions & 1 deletion vast_pipeline/pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def process_pipeline(self, p_run: Run) -> None:
self.previous_parquets,
done_images_df
)

mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"Step 2: sources_df memory usage: {mem_usage}MB")
# Obtain the number of selavy measurements for the run
# n_selavy_measurements = sources_df.
nr_selavy_measurements = sources_df['id'].unique().shape[0]
Expand Down Expand Up @@ -285,6 +286,8 @@ def process_pipeline(self, p_run: Run) -> None:
done_images_df,
done_source_ids
)
mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"Step 5: sources_df memory usage: {mem_usage}MB")

del missing_sources_df

Expand Down