Skip to content

Commit

Permalink
Patch sync code for durability of downloads and log message format al…
Browse files Browse the repository at this point in the history
…ong with TDQM progress bar formatting
  • Loading branch information
BryanFauble committed Dec 16, 2024
1 parent d34bc35 commit 50cebee
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 57 deletions.
41 changes: 32 additions & 9 deletions synapseclient/core/download/download_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def download_file_entity(
if_collision=if_collision,
synapse_cache_location=synapse_cache_location,
cached_file_path=cached_file_path,
entity_id=entity.id,
synapse_client=client,
)
if download_path is None:
Expand All @@ -157,9 +158,13 @@ async def download_file_entity(
if not os.path.exists(download_location):
os.makedirs(download_location)
client.logger.info(
f"Copying existing file from {cached_file_path} to {download_path}"
f"[{entity.id}:{file_name}]: Copying existing file from {cached_file_path} to {download_path}"
)
shutil.copy(cached_file_path, download_path)
else:
client.logger.info(
f"[{entity.id}:{file_name}]: Found existing file at {download_path}, skipping download."
)

else: # download the file from URL (could be a local file)
object_type = "FileEntity" if submission is None else "SubmissionAttachment"
Expand Down Expand Up @@ -257,6 +262,7 @@ async def download_file_entity_model(
if_collision=if_collision,
synapse_cache_location=synapse_cache_location,
cached_file_path=cached_file_path,
entity_id=file.id,
synapse_client=client,
)
if download_path is None:
Expand All @@ -268,9 +274,13 @@ async def download_file_entity_model(
if not os.path.exists(download_location):
os.makedirs(download_location)
client.logger.info(
f"Copying existing file from {cached_file_path} to {download_path}"
f"[{file.id}:{file_name}]: Copying existing file from {cached_file_path} to {download_path}"
)
shutil.copy(cached_file_path, download_path)
else:
client.logger.info(
f"[{file.id}:{file_name}]: Found existing file at {download_path}, skipping download."
)

else: # download the file from URL (could be a local file)
object_type = "FileEntity" if submission is None else "SubmissionAttachment"
Expand Down Expand Up @@ -526,7 +536,9 @@ def download_fn(
),
)

syn.logger.info(f"Downloaded {synapse_id} to {downloaded_path}")
syn.logger.info(
f"[{synapse_id}:{os.path.basename(downloaded_path)}]: Downloaded to {downloaded_path}"
)
syn.cache.add(
file_handle["id"], downloaded_path, file_handle.get("contentMd5", None)
)
Expand All @@ -541,7 +553,8 @@ def download_fn(
exc_info = sys.exc_info()
ex.progress = 0 if not hasattr(ex, "progress") else ex.progress
syn.logger.debug(
f"\nRetrying download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
f"\n[{synapse_id}:{os.path.basename(downloaded_path)}]: Retrying "
f"download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
exc_info=True,
) # this will include stack trace
if ex.progress == 0: # No progress was made reduce remaining retries.
Expand Down Expand Up @@ -669,7 +682,9 @@ def download_from_url(
actual_md5 = None
redirect_count = 0
delete_on_md5_mismatch = True
client.logger.debug(f"Downloading from {url} to {destination}")
client.logger.debug(
f"[{entity_id}:{os.path.basename(destination)}]: Downloading from {url} to {destination}"
)
while redirect_count < REDIRECT_LIMIT:
redirect_count += 1
scheme = urllib_urlparse.urlparse(url).scheme
Expand Down Expand Up @@ -854,7 +869,8 @@ def _ftp_report_hook(
)
increment_progress_bar(n=transferred, progress_bar=progress_bar)
client.logger.debug(
f"Resuming partial download to {temp_destination}. "
f"[{entity_id}:{os.path.basename(destination)}]: Resuming "
f"partial download to {temp_destination}. "
f"{previously_transferred}/{to_be_transferred} bytes already "
"transferred."
)
Expand Down Expand Up @@ -894,7 +910,8 @@ def _ftp_report_hook(
# verify that the file was completely downloaded and retry if it is not complete
if to_be_transferred > 0 and transferred < to_be_transferred:
client.logger.warning(
"\nRetrying download because the connection ended early.\n"
f"\n[{entity_id}:{os.path.basename(destination)}]: "
"Retrying download because the connection ended early.\n"
)
continue

Expand All @@ -903,7 +920,10 @@ def _ftp_report_hook(
shutil.move(temp_destination, destination)
break
else:
client.logger.error(f"Unable to download URLs of type {scheme}")
client.logger.error(
f"[{entity_id}:{os.path.basename(destination)}]: "
f"Unable to download URLs of type {scheme}"
)
return None

else: # didn't break out of loop
Expand Down Expand Up @@ -949,6 +969,7 @@ def resolve_download_path_collisions(
if_collision: str,
synapse_cache_location: str,
cached_file_path: str,
entity_id: str,
*,
synapse_client: Optional["Synapse"] = None,
) -> Union[str, None]:
Expand All @@ -964,6 +985,7 @@ def resolve_download_path_collisions(
May be "overwrite.local", "keep.local", or "keep.both".
synapse_cache_location: The location in .synapseCache where the file would be
corresponding to its FileHandleId.
entity_id: The entity id
cached_file_path: The file path of the cached copy
Raises:
Expand Down Expand Up @@ -1000,7 +1022,8 @@ def resolve_download_path_collisions(
pass # Let the download proceed and overwrite the local file.
elif if_collision == COLLISION_KEEP_LOCAL:
client.logger.info(
f"Found existing file at {download_path}, skipping download."
f"[{entity_id}:{os.path.basename(download_path)}]: Found existing "
f"file at {download_path}, skipping download."
)

# Don't want to overwrite the local file.
Expand Down
42 changes: 30 additions & 12 deletions synapseclient/core/transfer_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@ def increment_progress_bar(n: int, progress_bar: Union[tqdm, None]) -> None:

@contextmanager
def shared_download_progress_bar(
file_size: int, *, synapse_client: Optional["Synapse"] = None
file_size: int,
custom_message: str = None,
*,
synapse_client: Optional["Synapse"] = None,
):
"""An outside process that will eventually trigger a download through this module
can configure a shared Progress Bar by running its code within this context manager.
Arguments:
file_size: The size of the file being downloaded.
custom_message: A custom message to display on the progress bar instead of default.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand All @@ -86,22 +90,28 @@ def shared_download_progress_bar(

syn = Synapse.get_client(synapse_client=synapse_client)
with logging_redirect_tqdm(loggers=[syn.logger]):
get_or_create_download_progress_bar(file_size=file_size, synapse_client=syn)
get_or_create_download_progress_bar(
file_size=file_size, custom_message=custom_message, synapse_client=syn
)
try:
yield
finally:
_thread_local.progress_bar_download_context_managed = False
if _thread_local.progress_bar_download:
_thread_local.progress_bar_download.close()
_thread_local.progress_bar_download.refresh()
del _thread_local.progress_bar_download
close_download_progress_bar()


def close_download_progress_bar() -> None:
"""Handle closing the download progress bar if it is not context managed."""
if not _is_context_managed_download_bar():
def close_download_progress_bar(force_close: bool = False) -> None:
"""Handle closing the download progress bar if it is not context managed. This will
also only close the progress bar if there are no other downloads sharing it."""
if force_close or not _is_context_managed_download_bar():
progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None)
if progress_bar is not None:
transfer_count: int = getattr(_thread_local, "transfer_count", 0)
transfer_count -= 1
if transfer_count < 0:
transfer_count = 0

_thread_local.transfer_count = transfer_count
if progress_bar is not None and not transfer_count:
progress_bar.close()
progress_bar.refresh()
del _thread_local.progress_bar_download
Expand All @@ -113,7 +123,11 @@ def _is_context_managed_download_bar() -> bool:


def get_or_create_download_progress_bar(
file_size: int, postfix: str = None, *, synapse_client: Optional["Synapse"] = None
file_size: int,
postfix: str = None,
custom_message: str = None,
*,
synapse_client: Optional["Synapse"] = None,
) -> Union[tqdm, None]:
"""Return the existing progress bar if it exists, otherwise create a new one.
Expand All @@ -132,11 +146,15 @@ def get_or_create_download_progress_bar(
if syn.silent:
return None

transfer_count: int = getattr(_thread_local, "transfer_count", 0)
transfer_count += 1
_thread_local.transfer_count = transfer_count

progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None)
if progress_bar is None:
progress_bar = tqdm(
total=file_size,
desc="Downloading files",
desc=custom_message or "Downloading files",
unit="B",
unit_scale=True,
smoothing=0,
Expand Down
Loading

0 comments on commit 50cebee

Please sign in to comment.