-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SYNPY-1548] Swap to a FIFO queue #1147
Changes from all commits
d34bc35
50cebee
734241f
f4cd5f0
15eea60
51b8249
f4cb200
6c96cf6
a31f338
d49cb07
b3b3054
c51d732
1ef88e6
59319a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,6 +146,7 @@ async def download_file_entity( | |
if_collision=if_collision, | ||
synapse_cache_location=synapse_cache_location, | ||
cached_file_path=cached_file_path, | ||
entity_id=getattr(entity, "id", None), | ||
synapse_client=client, | ||
) | ||
if download_path is None: | ||
|
@@ -157,9 +158,15 @@ async def download_file_entity( | |
if not os.path.exists(download_location): | ||
os.makedirs(download_location) | ||
client.logger.info( | ||
f"Copying existing file from {cached_file_path} to {download_path}" | ||
f"[{getattr(entity, 'id', None)}:{file_name}]: Copying existing " | ||
f"file from {cached_file_path} to {download_path}" | ||
) | ||
shutil.copy(cached_file_path, download_path) | ||
else: | ||
client.logger.info( | ||
f"[{getattr(entity, 'id', None)}:{file_name}]: Found existing file " | ||
f"at {download_path}, skipping download." | ||
) | ||
|
||
else: # download the file from URL (could be a local file) | ||
object_type = "FileEntity" if submission is None else "SubmissionAttachment" | ||
|
@@ -257,6 +264,7 @@ async def download_file_entity_model( | |
if_collision=if_collision, | ||
synapse_cache_location=synapse_cache_location, | ||
cached_file_path=cached_file_path, | ||
entity_id=file.id, | ||
synapse_client=client, | ||
) | ||
if download_path is None: | ||
|
@@ -268,9 +276,13 @@ async def download_file_entity_model( | |
if not os.path.exists(download_location): | ||
os.makedirs(download_location) | ||
client.logger.info( | ||
f"Copying existing file from {cached_file_path} to {download_path}" | ||
f"[{file.id}:{file_name}]: Copying existing file from {cached_file_path} to {download_path}" | ||
) | ||
shutil.copy(cached_file_path, download_path) | ||
else: | ||
client.logger.info( | ||
f"[{file.id}:{file_name}]: Found existing file at {download_path}, skipping download." | ||
) | ||
Comment on lines
+282
to
+285
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In lieu of structured logging setup this is at least a start to get to the point where logging is clearer within the client. I updated a bunch of the messages around the download process so that it's clear what Synapse ID/file/entity produced the message. |
||
|
||
else: # download the file from URL (could be a local file) | ||
object_type = "FileEntity" if submission is None else "SubmissionAttachment" | ||
|
@@ -526,7 +538,7 @@ def download_fn( | |
), | ||
) | ||
|
||
syn.logger.info(f"Downloaded {synapse_id} to {downloaded_path}") | ||
syn.logger.info(f"[{synapse_id}]: Downloaded to {downloaded_path}") | ||
syn.cache.add( | ||
file_handle["id"], downloaded_path, file_handle.get("contentMd5", None) | ||
) | ||
|
@@ -541,7 +553,8 @@ def download_fn( | |
exc_info = sys.exc_info() | ||
ex.progress = 0 if not hasattr(ex, "progress") else ex.progress | ||
syn.logger.debug( | ||
f"\nRetrying download on error: [{exc_info[0]}] after progressing {ex.progress} bytes", | ||
f"\n[{synapse_id}]: Retrying " | ||
f"download on error: [{exc_info[0]}] after progressing {ex.progress} bytes", | ||
exc_info=True, | ||
) # this will include stack trace | ||
if ex.progress == 0: # No progress was made reduce remaining retries. | ||
|
@@ -669,7 +682,7 @@ def download_from_url( | |
actual_md5 = None | ||
redirect_count = 0 | ||
delete_on_md5_mismatch = True | ||
client.logger.debug(f"Downloading from {url} to {destination}") | ||
client.logger.debug(f"[{entity_id}]: Downloading from {url} to {destination}") | ||
while redirect_count < REDIRECT_LIMIT: | ||
redirect_count += 1 | ||
scheme = urllib_urlparse.urlparse(url).scheme | ||
|
@@ -854,7 +867,8 @@ def _ftp_report_hook( | |
) | ||
increment_progress_bar(n=transferred, progress_bar=progress_bar) | ||
client.logger.debug( | ||
f"Resuming partial download to {temp_destination}. " | ||
f"[{entity_id}]: Resuming " | ||
f"partial download to {temp_destination}. " | ||
f"{previously_transferred}/{to_be_transferred} bytes already " | ||
"transferred." | ||
) | ||
|
@@ -894,7 +908,8 @@ def _ftp_report_hook( | |
# verify that the file was completely downloaded and retry if it is not complete | ||
if to_be_transferred > 0 and transferred < to_be_transferred: | ||
client.logger.warning( | ||
"\nRetrying download because the connection ended early.\n" | ||
f"\n[{entity_id}]: " | ||
"Retrying download because the connection ended early.\n" | ||
) | ||
continue | ||
|
||
|
@@ -903,7 +918,9 @@ def _ftp_report_hook( | |
shutil.move(temp_destination, destination) | ||
break | ||
else: | ||
client.logger.error(f"Unable to download URLs of type {scheme}") | ||
client.logger.error( | ||
f"[{entity_id}]: Unable to download URLs of type {scheme}" | ||
) | ||
return None | ||
|
||
else: # didn't break out of loop | ||
|
@@ -949,6 +966,7 @@ def resolve_download_path_collisions( | |
if_collision: str, | ||
synapse_cache_location: str, | ||
cached_file_path: str, | ||
entity_id: str, | ||
*, | ||
synapse_client: Optional["Synapse"] = None, | ||
) -> Union[str, None]: | ||
|
@@ -964,6 +982,7 @@ def resolve_download_path_collisions( | |
May be "overwrite.local", "keep.local", or "keep.both". | ||
synapse_cache_location: The location in .synapseCache where the file would be | ||
corresponding to its FileHandleId. | ||
entity_id: The entity id | ||
cached_file_path: The file path of the cached copy | ||
|
||
Raises: | ||
|
@@ -1000,7 +1019,8 @@ def resolve_download_path_collisions( | |
pass # Let the download proceed and overwrite the local file. | ||
elif if_collision == COLLISION_KEEP_LOCAL: | ||
client.logger.info( | ||
f"Found existing file at {download_path}, skipping download." | ||
f"[{entity_id}]: Found existing " | ||
f"file at {download_path}, skipping download." | ||
) | ||
|
||
# Don't want to overwrite the local file. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,13 +66,17 @@ def increment_progress_bar(n: int, progress_bar: Union[tqdm, None]) -> None: | |
|
||
@contextmanager | ||
def shared_download_progress_bar( | ||
file_size: int, *, synapse_client: Optional["Synapse"] = None | ||
file_size: int, | ||
custom_message: str = None, | ||
*, | ||
synapse_client: Optional["Synapse"] = None, | ||
): | ||
"""An outside process that will eventually trigger a download through this module | ||
can configure a shared Progress Bar by running its code within this context manager. | ||
|
||
Arguments: | ||
file_size: The size of the file being downloaded. | ||
custom_message: A custom message to display on the progress bar instead of default. | ||
synapse_client: If not passed in and caching was not disabled by | ||
`Synapse.allow_client_caching(False)` this will use the last created | ||
instance from the Synapse class constructor. | ||
|
@@ -86,22 +90,28 @@ def shared_download_progress_bar( | |
|
||
syn = Synapse.get_client(synapse_client=synapse_client) | ||
with logging_redirect_tqdm(loggers=[syn.logger]): | ||
get_or_create_download_progress_bar(file_size=file_size, synapse_client=syn) | ||
get_or_create_download_progress_bar( | ||
file_size=file_size, custom_message=custom_message, synapse_client=syn | ||
) | ||
try: | ||
yield | ||
finally: | ||
_thread_local.progress_bar_download_context_managed = False | ||
if _thread_local.progress_bar_download: | ||
_thread_local.progress_bar_download.close() | ||
_thread_local.progress_bar_download.refresh() | ||
del _thread_local.progress_bar_download | ||
close_download_progress_bar() | ||
|
||
|
||
def close_download_progress_bar() -> None: | ||
"""Handle closing the download progress bar if it is not context managed.""" | ||
if not _is_context_managed_download_bar(): | ||
def close_download_progress_bar(force_close: bool = False) -> None: | ||
"""Handle closing the download progress bar if it is not context managed. This will | ||
also only close the progress bar if there are no other downloads sharing it.""" | ||
if force_close or not _is_context_managed_download_bar(): | ||
progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None) | ||
if progress_bar is not None: | ||
transfer_count: int = getattr(_thread_local, "transfer_count", 0) | ||
transfer_count -= 1 | ||
if transfer_count < 0: | ||
transfer_count = 0 | ||
|
||
_thread_local.transfer_count = transfer_count | ||
if progress_bar is not None and not transfer_count: | ||
Comment on lines
+108
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See https://sagebionetworks.jira.com/browse/SYNPY-1507 - There was an issue where using |
||
progress_bar.close() | ||
progress_bar.refresh() | ||
del _thread_local.progress_bar_download | ||
|
@@ -113,7 +123,11 @@ def _is_context_managed_download_bar() -> bool: | |
|
||
|
||
def get_or_create_download_progress_bar( | ||
file_size: int, postfix: str = None, *, synapse_client: Optional["Synapse"] = None | ||
file_size: int, | ||
postfix: str = None, | ||
custom_message: str = None, | ||
*, | ||
synapse_client: Optional["Synapse"] = None, | ||
) -> Union[tqdm, None]: | ||
"""Return the existing progress bar if it exists, otherwise create a new one. | ||
|
||
|
@@ -132,11 +146,15 @@ def get_or_create_download_progress_bar( | |
if syn.silent: | ||
return None | ||
|
||
transfer_count: int = getattr(_thread_local, "transfer_count", 0) | ||
transfer_count += 1 | ||
_thread_local.transfer_count = transfer_count | ||
|
||
progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None) | ||
if progress_bar is None: | ||
progress_bar = tqdm( | ||
total=file_size, | ||
desc="Downloading files", | ||
desc=custom_message or "Downloading files", | ||
unit="B", | ||
unit_scale=True, | ||
smoothing=0, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the logic to here aligns with how the logic works when streaming each individual part of the files. This is executing in a different thread so the presigned URL should be instantly used.
I also wrapped this in a retry block as well to ensure it'll remain functional.