Skip to content

Commit

Permalink
[SYNPY-1548] Patch sync code for durability of downloads and log mess…
Browse files Browse the repository at this point in the history
…age format along with TDQM progress bar formatting (#1147)

* Patch sync code for durability of downloads and log message format along with TDQM progress bar formatting
  • Loading branch information
BryanFauble authored Dec 17, 2024
1 parent b784b85 commit be4c78a
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 106 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:

strategy:
matrix:
os: [ubuntu-20.04, macos-12, windows-2022]
os: [ubuntu-20.04, macos-13, windows-2022]

# if changing the below change the run-integration-tests versions and the check-deploy versions
# Make sure that we are running the integration tests on the first and last versions of the matrix
Expand Down Expand Up @@ -399,7 +399,7 @@ jobs:

strategy:
matrix:
os: [ubuntu-20.04, macos-12, windows-2022]
os: [ubuntu-20.04, macos-13, windows-2022]

# python versions should be consistent with the strategy matrix and the runs-integration-tests versions
python: ['3.9', '3.10', '3.11', '3.12']
Expand Down
76 changes: 24 additions & 52 deletions synapseclient/core/download/download_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

import httpx

from synapseclient.api.file_services import (
get_file_handle_for_download,
get_file_handle_for_download_async,
)
from synapseclient.api.file_services import get_file_handle_for_download
from synapseclient.core.exceptions import (
SynapseDownloadAbortedException,
_raise_for_status_httpx,
Expand All @@ -29,6 +26,7 @@
RETRYABLE_CONNECTION_ERRORS,
RETRYABLE_CONNECTION_EXCEPTIONS,
with_retry_time_based,
with_retry_time_based_async,
)
from synapseclient.core.transfer_bar import get_or_create_download_progress_bar

Expand Down Expand Up @@ -110,24 +108,6 @@ class PresignedUrlProvider:
# offset parameter used to buffer url expiration checks, time in seconds
_TIME_BUFFER: datetime.timedelta = datetime.timedelta(seconds=5)

async def get_info_async(self) -> PresignedUrlInfo:
"""
Using async, returns the cached info if it's not expired, otherwise
retrieves a new pre-signed url and returns that.
Returns:
Information about a retrieved presigned-url from either the cache or a
new request
"""
if not self._cached_info or (
datetime.datetime.now(tz=datetime.timezone.utc)
+ PresignedUrlProvider._TIME_BUFFER
>= self._cached_info.expiration_utc
):
self._cached_info = await self._get_pre_signed_info_async()

return self._cached_info

def get_info(self) -> PresignedUrlInfo:
"""
Using a thread lock, returns the cached info if it's not expired, otherwise
Expand Down Expand Up @@ -168,27 +148,6 @@ def _get_pre_signed_info(self) -> PresignedUrlInfo:
expiration_utc=_pre_signed_url_expiration_time(pre_signed_url),
)

async def _get_pre_signed_info_async(self) -> PresignedUrlInfo:
"""
Make an HTTP request to get a pre-signed url to download a file.
Returns:
Information about a retrieved presigned-url from a new request.
"""
response = await get_file_handle_for_download_async(
file_handle_id=self.request.file_handle_id,
synapse_id=self.request.object_id,
entity_type=self.request.object_type,
synapse_client=self.client,
)
file_name = response["fileHandle"]["fileName"]
pre_signed_url = response["preSignedURL"]
return PresignedUrlInfo(
file_name=file_name,
url=pre_signed_url,
expiration_utc=_pre_signed_url_expiration_time(pre_signed_url),
)


def _generate_chunk_ranges(
file_size: int,
Expand Down Expand Up @@ -232,40 +191,47 @@ def _pre_signed_url_expiration_time(url: str) -> datetime:
return return_data


async def _get_file_size_wrapper(syn: "Synapse", url: str, debug: bool) -> int:
async def _get_file_size_wrapper(
syn: "Synapse", url_provider: PresignedUrlProvider, debug: bool
) -> int:
"""
Gets the size of the file located at url
Arguments:
syn: The synapseclient
url: The pre-signed url of the file
url_provider: A URL provider for the presigned urls
debug: A boolean to specify if debug mode is on
Returns:
The size of the file in bytes
"""

loop = asyncio.get_running_loop()
return await loop.run_in_executor(
syn._get_thread_pool_executor(asyncio_event_loop=loop),
_get_file_size,
syn,
url,
url_provider,
debug,
)


def _get_file_size(syn: "Synapse", url: str, debug: bool) -> int:
def _get_file_size(
syn: "Synapse", presigned_url_provider: PresignedUrlProvider, debug: bool
) -> int:
"""
Gets the size of the file located at url
Arguments:
url: The pre-signed url of the file
url_provider: A URL provider for the presigned urls
debug: A boolean to specify if debug mode is on
Returns:
The size of the file in bytes
"""
with syn._requests_session_storage.stream("GET", url) as response:
with syn._requests_session_storage.stream(
method="GET", url=presigned_url_provider.get_info().url
) as response:
_raise_for_status_httpx(
response=response,
logger=syn.logger,
Expand Down Expand Up @@ -306,9 +272,15 @@ async def download_file(self) -> None:
"""
url_provider = PresignedUrlProvider(self._syn, request=self._download_request)

url_info = await url_provider.get_info_async()
file_size = await _get_file_size_wrapper(
syn=self._syn, url=url_info.url, debug=self._download_request.debug
file_size = await with_retry_time_based_async(
function=lambda: _get_file_size_wrapper(
syn=self._syn,
url_provider=url_provider,
debug=self._download_request.debug,
),
retry_status_codes=[403],
retry_max_wait_before_failure=30,
read_response_content=False,
)
self._progress_bar = get_or_create_download_progress_bar(
file_size=file_size,
Expand Down
38 changes: 29 additions & 9 deletions synapseclient/core/download/download_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def download_file_entity(
if_collision=if_collision,
synapse_cache_location=synapse_cache_location,
cached_file_path=cached_file_path,
entity_id=getattr(entity, "id", None),
synapse_client=client,
)
if download_path is None:
Expand All @@ -157,9 +158,15 @@ async def download_file_entity(
if not os.path.exists(download_location):
os.makedirs(download_location)
client.logger.info(
f"Copying existing file from {cached_file_path} to {download_path}"
f"[{getattr(entity, 'id', None)}:{file_name}]: Copying existing "
f"file from {cached_file_path} to {download_path}"
)
shutil.copy(cached_file_path, download_path)
else:
client.logger.info(
f"[{getattr(entity, 'id', None)}:{file_name}]: Found existing file "
f"at {download_path}, skipping download."
)

else: # download the file from URL (could be a local file)
object_type = "FileEntity" if submission is None else "SubmissionAttachment"
Expand Down Expand Up @@ -257,6 +264,7 @@ async def download_file_entity_model(
if_collision=if_collision,
synapse_cache_location=synapse_cache_location,
cached_file_path=cached_file_path,
entity_id=file.id,
synapse_client=client,
)
if download_path is None:
Expand All @@ -268,9 +276,13 @@ async def download_file_entity_model(
if not os.path.exists(download_location):
os.makedirs(download_location)
client.logger.info(
f"Copying existing file from {cached_file_path} to {download_path}"
f"[{file.id}:{file_name}]: Copying existing file from {cached_file_path} to {download_path}"
)
shutil.copy(cached_file_path, download_path)
else:
client.logger.info(
f"[{file.id}:{file_name}]: Found existing file at {download_path}, skipping download."
)

else: # download the file from URL (could be a local file)
object_type = "FileEntity" if submission is None else "SubmissionAttachment"
Expand Down Expand Up @@ -526,7 +538,7 @@ def download_fn(
),
)

syn.logger.info(f"Downloaded {synapse_id} to {downloaded_path}")
syn.logger.info(f"[{synapse_id}]: Downloaded to {downloaded_path}")
syn.cache.add(
file_handle["id"], downloaded_path, file_handle.get("contentMd5", None)
)
Expand All @@ -541,7 +553,8 @@ def download_fn(
exc_info = sys.exc_info()
ex.progress = 0 if not hasattr(ex, "progress") else ex.progress
syn.logger.debug(
f"\nRetrying download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
f"\n[{synapse_id}]: Retrying "
f"download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
exc_info=True,
) # this will include stack trace
if ex.progress == 0: # No progress was made reduce remaining retries.
Expand Down Expand Up @@ -669,7 +682,7 @@ def download_from_url(
actual_md5 = None
redirect_count = 0
delete_on_md5_mismatch = True
client.logger.debug(f"Downloading from {url} to {destination}")
client.logger.debug(f"[{entity_id}]: Downloading from {url} to {destination}")
while redirect_count < REDIRECT_LIMIT:
redirect_count += 1
scheme = urllib_urlparse.urlparse(url).scheme
Expand Down Expand Up @@ -854,7 +867,8 @@ def _ftp_report_hook(
)
increment_progress_bar(n=transferred, progress_bar=progress_bar)
client.logger.debug(
f"Resuming partial download to {temp_destination}. "
f"[{entity_id}]: Resuming "
f"partial download to {temp_destination}. "
f"{previously_transferred}/{to_be_transferred} bytes already "
"transferred."
)
Expand Down Expand Up @@ -894,7 +908,8 @@ def _ftp_report_hook(
# verify that the file was completely downloaded and retry if it is not complete
if to_be_transferred > 0 and transferred < to_be_transferred:
client.logger.warning(
"\nRetrying download because the connection ended early.\n"
f"\n[{entity_id}]: "
"Retrying download because the connection ended early.\n"
)
continue

Expand All @@ -903,7 +918,9 @@ def _ftp_report_hook(
shutil.move(temp_destination, destination)
break
else:
client.logger.error(f"Unable to download URLs of type {scheme}")
client.logger.error(
f"[{entity_id}]: Unable to download URLs of type {scheme}"
)
return None

else: # didn't break out of loop
Expand Down Expand Up @@ -949,6 +966,7 @@ def resolve_download_path_collisions(
if_collision: str,
synapse_cache_location: str,
cached_file_path: str,
entity_id: str,
*,
synapse_client: Optional["Synapse"] = None,
) -> Union[str, None]:
Expand All @@ -964,6 +982,7 @@ def resolve_download_path_collisions(
May be "overwrite.local", "keep.local", or "keep.both".
synapse_cache_location: The location in .synapseCache where the file would be
corresponding to its FileHandleId.
entity_id: The entity id
cached_file_path: The file path of the cached copy
Raises:
Expand Down Expand Up @@ -1000,7 +1019,8 @@ def resolve_download_path_collisions(
pass # Let the download proceed and overwrite the local file.
elif if_collision == COLLISION_KEEP_LOCAL:
client.logger.info(
f"Found existing file at {download_path}, skipping download."
f"[{entity_id}]: Found existing "
f"file at {download_path}, skipping download."
)

# Don't want to overwrite the local file.
Expand Down
42 changes: 30 additions & 12 deletions synapseclient/core/transfer_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@ def increment_progress_bar(n: int, progress_bar: Union[tqdm, None]) -> None:

@contextmanager
def shared_download_progress_bar(
file_size: int, *, synapse_client: Optional["Synapse"] = None
file_size: int,
custom_message: str = None,
*,
synapse_client: Optional["Synapse"] = None,
):
"""An outside process that will eventually trigger a download through this module
can configure a shared Progress Bar by running its code within this context manager.
Arguments:
file_size: The size of the file being downloaded.
custom_message: A custom message to display on the progress bar instead of default.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand All @@ -86,22 +90,28 @@ def shared_download_progress_bar(

syn = Synapse.get_client(synapse_client=synapse_client)
with logging_redirect_tqdm(loggers=[syn.logger]):
get_or_create_download_progress_bar(file_size=file_size, synapse_client=syn)
get_or_create_download_progress_bar(
file_size=file_size, custom_message=custom_message, synapse_client=syn
)
try:
yield
finally:
_thread_local.progress_bar_download_context_managed = False
if _thread_local.progress_bar_download:
_thread_local.progress_bar_download.close()
_thread_local.progress_bar_download.refresh()
del _thread_local.progress_bar_download
close_download_progress_bar()


def close_download_progress_bar() -> None:
"""Handle closing the download progress bar if it is not context managed."""
if not _is_context_managed_download_bar():
def close_download_progress_bar(force_close: bool = False) -> None:
"""Handle closing the download progress bar if it is not context managed. This will
also only close the progress bar if there are no other downloads sharing it."""
if force_close or not _is_context_managed_download_bar():
progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None)
if progress_bar is not None:
transfer_count: int = getattr(_thread_local, "transfer_count", 0)
transfer_count -= 1
if transfer_count < 0:
transfer_count = 0

_thread_local.transfer_count = transfer_count
if progress_bar is not None and not transfer_count:
progress_bar.close()
progress_bar.refresh()
del _thread_local.progress_bar_download
Expand All @@ -113,7 +123,11 @@ def _is_context_managed_download_bar() -> bool:


def get_or_create_download_progress_bar(
file_size: int, postfix: str = None, *, synapse_client: Optional["Synapse"] = None
file_size: int,
postfix: str = None,
custom_message: str = None,
*,
synapse_client: Optional["Synapse"] = None,
) -> Union[tqdm, None]:
"""Return the existing progress bar if it exists, otherwise create a new one.
Expand All @@ -132,11 +146,15 @@ def get_or_create_download_progress_bar(
if syn.silent:
return None

transfer_count: int = getattr(_thread_local, "transfer_count", 0)
transfer_count += 1
_thread_local.transfer_count = transfer_count

progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None)
if progress_bar is None:
progress_bar = tqdm(
total=file_size,
desc="Downloading files",
desc=custom_message or "Downloading files",
unit="B",
unit_scale=True,
smoothing=0,
Expand Down
Loading

0 comments on commit be4c78a

Please sign in to comment.