Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add further debug logging #691

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
192289f
Initial changes
ddobie Jun 29, 2023
93c4e42
Bump to 1.0.1 for reference
ddobie Jul 4, 2023
1a0eebb
Added forced photometry timer debug statements
ddobie Jul 4, 2023
a05bb86
Fixed previous commit
ddobie Jul 4, 2023
fa0bda6
Log number of sources being forced fit
ddobie Jul 4, 2023
1f5c7c6
Updated forced fitting scheduler to processes
ddobie Jul 4, 2023
84c4423
Added timing debug loggging to new_sources.py
ddobie Jul 5, 2023
9b41f7b
Fixed f-strings
ddobie Jul 5, 2023
63e80c3
Added association_models_generator debug logging
ddobie Jul 7, 2023
8ab0f67
Drop associations batch size to 100
ddobie Jul 7, 2023
d7f95d1
Added get_memory_usage function
ddobie Jul 7, 2023
ba4f91b
Added get_memory_usage to finalise.py
ddobie Jul 7, 2023
0e6bf3a
Added get_memory_usage to loading.py
ddobie Jul 7, 2023
1cc2f0e
Added get_memory_usage to main.py
ddobie Jul 7, 2023
b181cd7
Cleaned things up
ddobie Jul 7, 2023
43d9d2f
Hardcode n_cpu
ddobie Jul 13, 2023
9c30c43
Shorten forced fit measurment name
ddobie Jul 30, 2023
029418e
Add some further logging
ddobie Jul 30, 2023
2cf9970
Only log if long
ddobie Jul 30, 2023
8faefdf
Fix merge conflicts
Dec 14, 2023
e95351f
Fixed indent
ddobie Dec 14, 2023
ea30f62
Fix syntax error
ddobie Dec 14, 2023
cb22d33
Revert association size to 1e5
ddobie Feb 6, 2024
e77d538
Temporarily hardcode n_cpu=10
ddobie Feb 13, 2024
72d1aeb
Added some more logging
ddobie Feb 13, 2024
b3e217b
Attempt to limit memory usage
ddobie Feb 13, 2024
41bd240
Attempt to limit memory usage
ddobie Feb 13, 2024
83b60cf
Update parallel_grouppby to use chunksize
ddobie Feb 14, 2024
9102656
Correctly set chunk size
ddobie Feb 15, 2024
19741ba
Remove superfluous logging
ddobie Feb 16, 2024
1598169
Further dask groupby fixes
ddobie Feb 16, 2024
a3e4135
Added django.db.reset_queries
ddobie Feb 20, 2024
f3fb2b8
Added chunking for associations upload
ddobie Feb 20, 2024
868d518
Fixes
ddobie Feb 20, 2024
c06b2e5
calculate measurement pairs with sensible partitions
ddobie Feb 20, 2024
55b8a88
parallel_groupby_coord with sensible partitions
ddobie Feb 20, 2024
6b0455c
parallel_groupby_coord with sensible partitions
ddobie Feb 20, 2024
75d2fb0
parallel_association with sensible partitions
ddobie Feb 20, 2024
9da2404
parallel_get_rms_measurements with sensible partitions
ddobie Feb 20, 2024
1af0a9f
Fixed new_sources issue
ddobie Feb 21, 2024
7d93b31
commit all changes
ddobie Feb 25, 2024
40283e1
Remove unnecessary debug logging statements
ddobie May 22, 2024
2355d9e
Remove superfluous debug statements
ddobie May 22, 2024
4f4eb8b
Lots of debugging statements
ddobie May 23, 2024
edae5f8
Correctly estimate partition size
ddobie May 23, 2024
f0cffcc
More playing
ddobie May 23, 2024
1082596
Stash?
ddobie May 23, 2024
ec66300
More changes to make this work
ddobie May 23, 2024
d4bc1a5
Resolve conflicts
ddobie Jul 16, 2024
a7f6f4b
Fix merge
ddobie Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vast_pipeline/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.0.0dev'
__version__ = '1.0.1dev'
8 changes: 3 additions & 5 deletions vast_pipeline/image/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,11 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame:

logger.debug("Condon errors done.")

logger.debug("Calculating positional errors...")
# TODO: avoid extra column given that it is a single value
df['ew_sys_err'] = self.config["ra_uncertainty"] / 3600.
df['ns_sys_err'] = self.config["dec_uncertainty"] / 3600.


df['error_radius'] = calc_error_radius(
df['ra'].values,
df['ra_err'].values,
Expand All @@ -446,17 +446,15 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame:
df['uncertainty_ew'] = np.hypot(
df['ew_sys_err'].values, df['error_radius'].values
)

df['uncertainty_ns'] = np.hypot(
df['ns_sys_err'].values, df['error_radius'].values
)

# weight calculations to use later
df['weight_ew'] = 1. / df['uncertainty_ew'].values**2
df['weight_ns'] = 1. / df['uncertainty_ns'].values**2

logger.debug('Positional errors done.')


# Initialise the forced column as False
df['forced'] = False

Expand Down
8 changes: 5 additions & 3 deletions vast_pipeline/pipeline/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ def advanced_association(
association.
'''
# read the needed sources fields
# Step 1: get matches within semimajor axis of image.
# Step 1: get matches within semimajor axis of image.
idx_skyc1, idx_skyc2, d2d, d3d = skyc2.search_around_sky(
skyc1, bw_max
)
Expand Down Expand Up @@ -1298,7 +1298,7 @@ def association(
)

sources_df = sources_df.drop(['ra_wrap'], axis=1)

tmp_srcs_df = (
sources_df.loc[
(sources_df['source'] != -1) & (sources_df['forced'] == False),
Expand Down Expand Up @@ -1334,7 +1334,9 @@ def association(
'weight_ns': 'uncertainty_ns'
})
)

nan_indices = weighted_df.query("ra.isnull()", engine='python').index
nan_sources = weighted_df.iloc[nan_indices].source.values

# correct the RA wrapping
ra_wrap_mask = weighted_df.ra >= 360.
weighted_df.loc[
Expand Down
56 changes: 55 additions & 1 deletion vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
update_sources
)
from vast_pipeline.pipeline.pairs import calculate_measurement_pair_metrics
from vast_pipeline.pipeline.utils import parallel_groupby
from vast_pipeline.pipeline.utils import parallel_groupby, get_memory_usage


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,8 +132,15 @@ def final_operations(
'Calculating statistics for %i sources...',
sources_df.source.unique().shape[0]
)
mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"sources_df memory: {mem_usage}MB")

srcs_df = parallel_groupby(sources_df)

logger.info('Groupby-apply time: %.2f seconds', timer.reset())
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"Initial srcs_df memory: {mem_usage}MB")
logger.debug(get_memory_usage())

# add new sources
srcs_df["new"] = srcs_df.index.isin(new_sources_df.index)
Expand All @@ -145,6 +152,9 @@ def final_operations(
how="left",
)
srcs_df["new_high_sigma"] = srcs_df["new_high_sigma"].fillna(0.0)
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after adding new sources: {mem_usage}MB")
logger.debug(get_memory_usage())

# calculate nearest neighbour
srcs_skycoord = SkyCoord(
Expand All @@ -159,12 +169,19 @@ def final_operations(

# add the separation distance in degrees
srcs_df['n_neighbour_dist'] = d2d.deg
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after nearest-neighbour: {mem_usage}MB")
logger.debug(get_memory_usage())

# create measurement pairs, aka 2-epoch metrics
if calculate_pairs:
sources_df.to_parquet('calcalate_measurement_pair_metrics_input_df.parquet')
timer.reset()
measurement_pairs_df = calculate_measurement_pair_metrics(sources_df)
logger.info('Measurement pair metrics time: %.2f seconds', timer.reset())
mem_usage = measurement_pairs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"measurement_pairs_df memory: {mem_usage}MB")
logger.debug(get_memory_usage())

# calculate measurement pair metric aggregates for sources by finding the row indices
# of the aggregate max of the abs(m) metric for each flux type.
Expand All @@ -189,6 +206,9 @@ def final_operations(
"m_abs_significant_max_int": 0.0,
})
logger.info("Measurement pair aggregate metrics time: %.2f seconds", timer.reset())
mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after calculate_pairs: {mem_usage}MB")
logger.debug(get_memory_usage())
else:
logger.info(
"Skipping measurement pair metric calculation as specified in the run configuration."
Expand All @@ -201,18 +221,36 @@ def final_operations(
# upload new ones first (new id's are fetched)
src_done_mask = srcs_df.index.isin(done_source_ids)
srcs_df_upload = srcs_df.loc[~src_done_mask].copy()
mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_upload initial memory: {mem_usage}MB")
logger.debug(get_memory_usage())

srcs_df_upload = make_upload_sources(srcs_df_upload, p_run, add_mode)
mem_usage = srcs_df_upload.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_upload memory after upload: {mem_usage}MB")
logger.debug(get_memory_usage())
# And now update
srcs_df_update = srcs_df.loc[src_done_mask].copy()
mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_update memory: {mem_usage}MB")
logger.debug(get_memory_usage())
logger.info(
f"Updating {srcs_df_update.shape[0]} sources with new metrics.")

srcs_df = update_sources(srcs_df_update, batch_size=1000)
mem_usage = srcs_df_update.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df_update memory after update: {mem_usage}MB")
logger.debug(get_memory_usage())
# Add back together
if not srcs_df_upload.empty:
srcs_df = pd.concat([srcs_df, srcs_df_upload])
else:
srcs_df = make_upload_sources(srcs_df, p_run, add_mode)

mem_usage = srcs_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"srcs_df memory after upload_sources: {mem_usage}MB")
logger.debug(get_memory_usage())

# gather the related df, upload to db and save to parquet file
# the df will look like
#
Expand All @@ -230,11 +268,17 @@ def final_operations(
.explode("related_list")
.rename(columns={"id": "from_source_id", "related_list": "to_source_id"})
)
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory: {mem_usage}MB")
logger.debug(get_memory_usage())

# for the column 'from_source_id', replace relation source ids with db id
related_df["to_source_id"] = related_df["to_source_id"].map(srcs_df["id"].to_dict())
# drop relationships with the same source
related_df = related_df[related_df["from_source_id"] != related_df["to_source_id"]]
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory after calcs: {mem_usage}MB")
logger.debug(get_memory_usage())

# write symmetrical relations to parquet
related_df.to_parquet(
Expand All @@ -256,7 +300,14 @@ def final_operations(
)
logger.debug(f'Add mode: #{related_df.shape[0]} relations to upload.')

mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory after partitioning: {mem_usage}MB")
logger.debug(get_memory_usage())

make_upload_related_sources(related_df)
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory after upload: {mem_usage}MB")
logger.debug(get_memory_usage())

del related_df

Expand All @@ -272,6 +323,9 @@ def final_operations(
sources_df.drop('related', axis=1)
.merge(srcs_df.rename(columns={'id': 'source_id'}), on='source')
)
mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"sources_df memory after srcs_df merge: {mem_usage}MB")
logger.debug(get_memory_usage())

if add_mode:
# Load old associations so the already uploaded ones can be removed
Expand Down
19 changes: 14 additions & 5 deletions vast_pipeline/pipeline/forced_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,15 @@ def extract_from_image(
Dictionary with input dataframe with added columns (flux_int,
flux_int_err, chi_squared_fit) and image name.
"""
timer = StopWatch()
# create the skycoord obj to pass to the forced extraction
# see usage https://github.com/dlakaplan/forced_phot
P_islands = SkyCoord(
df['wavg_ra'].values,
df['wavg_dec'].values,
unit=(u.deg, u.deg)
)

# load the image, background and noisemaps into memory
# a dedicated function may seem unneccesary, but will be useful if we
# split the load to a separate thread.
Expand All @@ -183,12 +185,14 @@ def extract_from_image(
)
FP = ForcedPhot(*forcedphot_input)


flux, flux_err, chisq, DOF, cluster_id = FP.measure(
P_islands,
cluster_threshold=cluster_threshold,
allow_nan=allow_nan,
edge_buffer=edge_buffer
)
logger.debug(f"Time to measure FP for {image}: {timer.reset()}s")
df['flux_int'] = flux * 1.e3
df['flux_int_err'] = flux_err * 1.e3
df['chi_squared_fit'] = chisq
Expand Down Expand Up @@ -238,7 +242,7 @@ def finalise_forced_dfs(
df['component_id'] = df['island_id'].str.replace(
'island', 'component'
) + 'a'
img_prefix = image.split('.')[0] + '_'
img_prefix = ""#image.split('.')[0] + '_'
df['name'] = img_prefix + df['component_id']
# assign all the other columns
# convert fluxes to mJy
Expand Down Expand Up @@ -390,7 +394,8 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
)
del col_to_drop

n_cpu = cpu_count() - 1
#n_cpu = cpu_count() - 1 # this doesn't work because cpu_count returns the number of CPUs in the machine, not the container.
n_cpu = 10
bags = db.from_sequence(list_to_map, npartitions=len(list_to_map))
forced_dfs = (
bags.map(lambda x: extract_from_image(
Expand All @@ -399,7 +404,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
allow_nan=allow_nan,
**x
))
.compute()
.compute(scheduler='processes', num_workers=n_cpu)
)
del bags
# create intermediates dfs combining the mapping data and the forced
Expand Down Expand Up @@ -477,7 +482,7 @@ def get_fname(n): return os.path.join(
'forced_measurements_' + n.replace('.', '_') + '.parquet'
)
dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images))
n_cpu = cpu_count() - 1
n_cpu = 10 #cpu_count() - 1 # temporarily hardcode n_cpu

# writing parquets using Dask bag
bags = db.from_sequence(dfs)
Expand Down Expand Up @@ -610,7 +615,7 @@ def forced_extraction(
)

# make measurement names unique for db constraint
extr_df['name'] = extr_df['name'] + f'_f_run{p_run.id:06d}'
extr_df['name'] = extr_df['name'] + f'_f_run{p_run.id:03d}'

# select sensible flux values and set the columns with fix values
values = {
Expand Down Expand Up @@ -678,6 +683,10 @@ def forced_extraction(
extr_df = extr_df[col_order + remaining]

# upload the measurements, a column 'id' is returned with the DB id
long_names = extr_df.loc[extr_df['name'].str.len() > 63]
long_comps = extr_df.loc[extr_df['component_id'].str.len() > 63]
long_isls = extr_df.loc[extr_df['island_id'].str.len() > 63]

extr_df = make_upload_measurements(extr_df)

extr_df = extr_df.rename(columns={'source_tmp_id': 'source'})
Expand Down
19 changes: 18 additions & 1 deletion vast_pipeline/pipeline/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
Association, Band, Measurement, SkyRegion, Source, RelatedSource,
Run, Image
)
from vast_pipeline.pipeline.utils import get_create_img, get_create_img_band
from vast_pipeline.pipeline.utils import (
get_create_img, get_create_img_band, get_memory_usage
)
from vast_pipeline.utils.utils import StopWatch


Expand All @@ -31,6 +33,7 @@ def bulk_upload_model(
generator: Iterable[Generator[models.Model, None, None]],
batch_size: int=10_000,
return_ids: bool=False,
log_mem_usage: bool=False,
) -> List[int]:
'''
Bulk upload a list of generator objects of django models to db.
Expand All @@ -55,6 +58,8 @@ def bulk_upload_model(
bulk_ids = []
while True:
items = list(islice(generator, batch_size))
if log_mem_usage:
logger.debug(get_memory_usage())
if not items:
break
out_bulk = djmodel.objects.bulk_create(items)
Expand Down Expand Up @@ -168,6 +173,10 @@ def make_upload_sources(
Returns:
The input dataframe with the 'id' column added.
'''
logger.debug("Uploading sources...")
mem_usage = sources_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"sources_df memory usage: {mem_usage}MB")
logger.debug(get_memory_usage())
# create sources in DB
with transaction.atomic():
if (add_mode is False and
Expand Down Expand Up @@ -207,6 +216,9 @@ def make_upload_related_sources(related_df: pd.DataFrame) -> None:
None.
"""
logger.info('Populate "related" field of sources...')
mem_usage = related_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"related_df memory usage: {mem_usage}MB")
logger.debug(get_memory_usage())
bulk_upload_model(RelatedSource, related_models_generator(related_df))


Expand All @@ -223,6 +235,7 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None:
None.
"""
logger.info('Upload associations...')

assoc_chunk_size = 100000
for i in range(0,len(associations_df),assoc_chunk_size):
bulk_upload_model(
Expand All @@ -243,6 +256,10 @@ def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame:
Returns:
Original DataFrame with the database ID attached to each row.
"""
logger.info("Upload measurements...")
mem_usage = measurements_df.memory_usage(deep=True).sum() / 1e6
logger.debug(f"measurements_df memory usage: {mem_usage}MB")
logger.debug(get_memory_usage())
meas_dj_ids = bulk_upload_model(
Measurement,
measurement_models_generator(measurements_df),
Expand Down
Loading
Loading