From cf2c69a6c490f45d565875f67ae5130d3cbf2995 Mon Sep 17 00:00:00 2001 From: BryanFauble <17128019+BryanFauble@users.noreply.github.com> Date: Wed, 8 Jan 2025 11:28:01 -0700 Subject: [PATCH] Implementing logic to support snapshotting tables --- docs/reference/oop/table_refactor.md | 1 + synapseclient/core/exceptions.py | 5 + synapseclient/models/__init__.py | 1 - synapseclient/models/activity.py | 4 +- synapseclient/models/annotations.py | 2 +- .../models/protocols/table_protocol.py | 76 ++++++++++ synapseclient/models/table.py | 136 +++++++++++++++++- 7 files changed, 214 insertions(+), 11 deletions(-) diff --git a/docs/reference/oop/table_refactor.md b/docs/reference/oop/table_refactor.md index b8f4dd89e..febee410f 100644 --- a/docs/reference/oop/table_refactor.md +++ b/docs/reference/oop/table_refactor.md @@ -13,6 +13,7 @@ client. - query - store_rows - delete_rows + - snapshot - delete_column - add_column - reorder_column diff --git a/synapseclient/core/exceptions.py b/synapseclient/core/exceptions.py index 57d7165fc..278743df8 100644 --- a/synapseclient/core/exceptions.py +++ b/synapseclient/core/exceptions.py @@ -2,6 +2,7 @@ as handling error cases for HTTP requests.""" import logging +import re from typing import Union import httpx @@ -9,6 +10,8 @@ from synapseclient.core import utils +BEARER_TOKEN_PATTERN = re.compile(r"\'Bearer \S+\'") + class SynapseError(Exception): """Generic exception thrown by the client.""" @@ -175,6 +178,7 @@ def _raise_for_status(response, verbose=False): # Append the request sent message += f"\n\n{REQUEST_PREFIX}\n{response.request.url} {response.request.method}" message += f"\n{HEADERS_PREFIX}{response.request.headers}" + message = re.sub(BEARER_TOKEN_PATTERN, "'Bearer '", message) message += f"\n{BODY_PREFIX}{response.request.body}" except: # noqa message += f"\n{UNABLE_TO_APPEND_REQUEST}" @@ -273,6 +277,7 @@ def _raise_for_status_httpx( # Append the request sent message += f"\n\n{REQUEST_PREFIX}\n{response.request.url} {response.request.method}" message += f"\n{HEADERS_PREFIX}{response.request.headers}" + message = re.sub(BEARER_TOKEN_PATTERN, "'Bearer '", message) message += f"\n{BODY_PREFIX}{response.request.content}" except Exception: # noqa logger.exception(UNABLE_TO_APPEND_REQUEST) diff --git a/synapseclient/models/__init__.py b/synapseclient/models/__init__.py index a487a3827..b8a66f9de 100644 --- a/synapseclient/models/__init__.py +++ b/synapseclient/models/__init__.py @@ -8,7 +8,6 @@ from synapseclient.models.table import ( Column, ColumnType, - CsvResultFormat, FacetType, Row, RowsetResultFormat, diff --git a/synapseclient/models/activity.py b/synapseclient/models/activity.py index c7fb4a0bd..379038aae 100644 --- a/synapseclient/models/activity.py +++ b/synapseclient/models/activity.py @@ -319,8 +319,8 @@ async def store_async( ), ) self.fill_from_dict(synapse_activity=saved_activity) - Synapse.get_client(synapse_client=synapse_client).logger.debug( - f"Stored activity {self.id}" + Synapse.get_client(synapse_client=synapse_client).logger.info( + f"[{parent.id}]: Stored activity" ) return self diff --git a/synapseclient/models/annotations.py b/synapseclient/models/annotations.py index f6df4f20f..d6c1d609c 100644 --- a/synapseclient/models/annotations.py +++ b/synapseclient/models/annotations.py @@ -88,7 +88,7 @@ async def store_async( self.annotations = Annotations.from_dict(result) self.etag = result["etag"] Synapse.get_client(synapse_client=synapse_client).logger.debug( - f"Annotations stored for {self.id}" + f"[{self.id}]: Stored annotations" ) return self diff --git a/synapseclient/models/protocols/table_protocol.py b/synapseclient/models/protocols/table_protocol.py index 1952c077a..9dcda94f4 100644 --- a/synapseclient/models/protocols/table_protocol.py +++ b/synapseclient/models/protocols/table_protocol.py @@ -151,3 +151,79 @@ def query( The results of the query as a Pandas DataFrame. """ return pd.DataFrame() + + def snapshot( + self, + comment: str = None, + label: str = None, + include_activity: bool = True, + associate_activity_to_new_version: bool = True, + *, + synapse_client: Optional[Synapse] = None, + ) -> Dict[str, Any]: + """ + Request to create a new snapshot of a table. The provided comment, label, and + activity will be applied to the current version thereby creating a snapshot + and locking the current version. After the snapshot is created a new version + will be started with an 'in-progress' label. + + Arguments: + comment: Comment to add to this snapshot to the table. + label: Label to add to this snapshot to the table. The label must be unique, + if a label is not provided a unique label will be generated. + include_activity: If True the activity will be included in snapshot if it + exists. In order to include the activity, the activity must have already + been stored in Synapse by using the `activity` attribute on the Table + and calling the `store()` method on the Table instance. Adding an + activity to a snapshot of a table is meant to capture the provenance of + the data at the time of the snapshot. + associate_activity_to_new_version: If True the activity will be associated + with the new version of the table. If False the activity will not be + associated with the new version of the table. + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)` this will use the last created + instance from the Synapse class constructor. + + Example: Creating a snapshot of a table + Comment and label are optional, but filled in for this example. + + from synapseclient.models import Table + from synapseclient import Synapse + + syn = Synapse() + syn.login() + + my_table = Table(id="syn1234") + my_table.snapshot( + comment="This is a new snapshot comment", + label="This is a unique label" + ) + + Example: Including the activity (Provenance) in the snapshot and not pulling it forward to the new `in-progress` version of the table. + By default this method is set up to include the activity in the snapshot and + then pull the activity forward to the new version. If you do not want to + include the activity in the snapshot you can set `include_activity` to + False. If you do not want to pull the activity forward to the new version + you can set `associate_activity_to_new_version` to False. + + See the [activity][synapseclient.models.Activity] attribute on the Table + class for more information on how to interact with the activity. + + from synapseclient.models import Table + from synapseclient import Synapse + + syn = Synapse() + syn.login() + + my_table = Table(id="syn1234") + my_table.snapshot( + comment="This is a new snapshot comment", + label="This is a unique label", + include_activity=True, + associate_activity_to_new_version=False + ) + + Returns: + A dictionary that matches: + """ + return {} diff --git a/synapseclient/models/table.py b/synapseclient/models/table.py index b46c13469..aadbd5eb0 100644 --- a/synapseclient/models/table.py +++ b/synapseclient/models/table.py @@ -365,9 +365,13 @@ class Table(TableSynchronousProtocol, AccessControllable): modified_on: The date this table was last modified. In YYYY-MM-DD-Thh:mm:ss.sssZ format modified_by: The ID of the user that last modified this table. - version_number: The version number issued to this version on the object. - version_label: The version label for this table - version_comment: The version comment for this table + version_number: (Read Only) The version number issued to this version on the + object. Use this the `.snapshot()` method to create a new version of the + table. + version_label: (Read Only) The version label for this table. Use the + `.snapshot()` method to create a new version of the table. + version_comment: (Read Only) The version comment for this table. Use the + `.snapshot()` method to create a new version of the table. is_latest_version: (Read Only) If this is the latest version of the object. is_search_enabled: When creating or updating a table or view specifies if full text search should be enabled. Note that enabling full text search might @@ -648,13 +652,16 @@ class Table(TableSynchronousProtocol, AccessControllable): """The ID of the user that last modified this table.""" version_number: Optional[int] = field(default=None, compare=False) - """The version number issued to this version on the object.""" + """(Read Only) The version number issued to this version on the object. Use this the + `.snapshot()` method to create a new version of the table.""" version_label: Optional[str] = None - """The version label for this table""" + """(Read Only) The version label for this table. Use this the `.snapshot()` method + to create a new version of the table.""" version_comment: Optional[str] = None - """The version comment for this table""" + """(Read Only) The version comment for this table. Use this the `.snapshot()` method + to create a new version of the table.""" is_latest_version: Optional[bool] = field(default=None, compare=False) """(Read Only) If this is the latest version of the object.""" @@ -933,7 +940,10 @@ async def store_async( raise NotImplementedError("This argument is not yet implemented") if not self.has_changed: - client.logger.info(f"No changes detected for table {self.name or self.id}") + client.logger.info( + f"[{self.id}:{self.name}]: " + "No changes detected for table entity. Annotations, and activity are stored under a separate check." + ) if self.has_changed: # TODO: Swap the storage of the table to be done in a single transaction via: https://rest-docs.synapse.org/rest/POST/entity/id/table/transaction/async/start.html @@ -1270,3 +1280,115 @@ async def query_async( # TODO: Lastly - When a query is executed both single-threaded and multi-threaded downloads of the CSV result should not write a file to disk, instead write the bytes to a BytesIO object and then use that object instead of a filepath # TODO: Also support writing the CSV to disk if the user wants to do that raise NotImplementedError("This method is not yet implemented") + + async def snapshot_async( + self, + comment: str = None, + label: str = None, + include_activity: bool = True, + associate_activity_to_new_version: bool = True, + *, + synapse_client: Optional[Synapse] = None, + ) -> Dict[str, Any]: + """ + Request to create a new snapshot of a table. The provided comment, label, and + activity will be applied to the current version thereby creating a snapshot + and locking the current version. After the snapshot is created a new version + will be started with an 'in-progress' label. + + Arguments: + comment: Comment to add to this snapshot to the table. + label: Label to add to this snapshot to the table. The label must be unique, + if a label is not provided a unique label will be generated. + include_activity: If True the activity will be included in snapshot if it + exists. In order to include the activity, the activity must have already + been stored in Synapse by using the `activity` attribute on the Table + and calling the `store()` method on the Table instance. Adding an + activity to a snapshot of a table is meant to capture the provenance of + the data at the time of the snapshot. + associate_activity_to_new_version: If True the activity will be associated + with the new version of the table. If False the activity will not be + associated with the new version of the table. + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)` this will use the last created + instance from the Synapse class constructor. + + Example: Creating a snapshot of a table + Comment and label are optional, but filled in for this example. + + import asyncio + from synapseclient.models import Table + from synapseclient import Synapse + + syn = Synapse() + syn.login() + + + async def main(): + my_table = Table(id="syn1234") + await my_table.snapshot_async( + comment="This is a new snapshot comment", + label="3This is a unique label" + ) + + asyncio.run(main()) + + Example: Including the activity (Provenance) in the snapshot and not pulling it forward to the new `in-progress` version of the table. + By default this method is set up to include the activity in the snapshot and + then pull the activity forward to the new version. If you do not want to + include the activity in the snapshot you can set `include_activity` to + False. If you do not want to pull the activity forward to the new version + you can set `associate_activity_to_new_version` to False. + + See the [activity][synapseclient.models.Activity] attribute on the Table + class for more information on how to interact with the activity. + + import asyncio + from synapseclient.models import Table + from synapseclient import Synapse + + syn = Synapse() + syn.login() + + + async def main(): + my_table = Table(id="syn1234") + await my_table.snapshot_async( + comment="This is a new snapshot comment", + label="This is a unique label", + include_activity=True, + associate_activity_to_new_version=False + ) + + asyncio.run(main()) + + Returns: + A dictionary that matches: + """ + client = Synapse.get_client(synapse_client=synapse_client) + # Ensure that we have seeded the table with the latest data + await self.get_async(include_activity=True, synapse_client=client) + client.logger.info( + f"[{self.id}:{self.name}]: Creating a snapshot of the table." + ) + + loop = asyncio.get_event_loop() + snapshot_response = await loop.run_in_executor( + None, + lambda: client._create_table_snapshot( + table=self.id, + comment=comment, + label=label, + activity=self.activity.id + if self.activity and include_activity + else None, + ), + ) + + if associate_activity_to_new_version and self.activity: + self._last_persistent_instance.activity = None + await self.store_async(synapse_client=synapse_client) + else: + await self.get_async(include_activity=True, synapse_client=synapse_client) + + return snapshot_response