Skip to content

Commit

Permalink
Implementing logic to support snapshotting tables
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Jan 8, 2025
1 parent 91fd101 commit cf2c69a
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/reference/oop/table_refactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ client.
- query
- store_rows
- delete_rows
- snapshot
- delete_column
- add_column
- reorder_column
Expand Down
5 changes: 5 additions & 0 deletions synapseclient/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
as handling error cases for HTTP requests."""

import logging
import re
from typing import Union

import httpx
import requests

from synapseclient.core import utils

BEARER_TOKEN_PATTERN = re.compile(r"\'Bearer \S+\'")


class SynapseError(Exception):
"""Generic exception thrown by the client."""
Expand Down Expand Up @@ -175,6 +178,7 @@ def _raise_for_status(response, verbose=False):
# Append the request sent
message += f"\n\n{REQUEST_PREFIX}\n{response.request.url} {response.request.method}"
message += f"\n{HEADERS_PREFIX}{response.request.headers}"
message = re.sub(BEARER_TOKEN_PATTERN, "'Bearer <redacted>'", message)
message += f"\n{BODY_PREFIX}{response.request.body}"
except: # noqa
message += f"\n{UNABLE_TO_APPEND_REQUEST}"
Expand Down Expand Up @@ -273,6 +277,7 @@ def _raise_for_status_httpx(
# Append the request sent
message += f"\n\n{REQUEST_PREFIX}\n{response.request.url} {response.request.method}"
message += f"\n{HEADERS_PREFIX}{response.request.headers}"
message = re.sub(BEARER_TOKEN_PATTERN, "'Bearer <redacted>'", message)
message += f"\n{BODY_PREFIX}{response.request.content}"
except Exception: # noqa
logger.exception(UNABLE_TO_APPEND_REQUEST)
Expand Down
1 change: 0 additions & 1 deletion synapseclient/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from synapseclient.models.table import (
Column,
ColumnType,
CsvResultFormat,
FacetType,
Row,
RowsetResultFormat,
Expand Down
4 changes: 2 additions & 2 deletions synapseclient/models/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ async def store_async(
),
)
self.fill_from_dict(synapse_activity=saved_activity)
Synapse.get_client(synapse_client=synapse_client).logger.debug(
f"Stored activity {self.id}"
Synapse.get_client(synapse_client=synapse_client).logger.info(
f"[{parent.id}]: Stored activity"
)

return self
Expand Down
2 changes: 1 addition & 1 deletion synapseclient/models/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def store_async(
self.annotations = Annotations.from_dict(result)
self.etag = result["etag"]
Synapse.get_client(synapse_client=synapse_client).logger.debug(
f"Annotations stored for {self.id}"
f"[{self.id}]: Stored annotations"
)
return self

Expand Down
76 changes: 76 additions & 0 deletions synapseclient/models/protocols/table_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,79 @@ def query(
The results of the query as a Pandas DataFrame.
"""
return pd.DataFrame()

def snapshot(
self,
comment: str = None,
label: str = None,
include_activity: bool = True,
associate_activity_to_new_version: bool = True,
*,
synapse_client: Optional[Synapse] = None,
) -> Dict[str, Any]:
"""
Request to create a new snapshot of a table. The provided comment, label, and
activity will be applied to the current version thereby creating a snapshot
and locking the current version. After the snapshot is created a new version
will be started with an 'in-progress' label.
Arguments:
comment: Comment to add to this snapshot to the table.
label: Label to add to this snapshot to the table. The label must be unique,
if a label is not provided a unique label will be generated.
include_activity: If True the activity will be included in snapshot if it
exists. In order to include the activity, the activity must have already
been stored in Synapse by using the `activity` attribute on the Table
and calling the `store()` method on the Table instance. Adding an
activity to a snapshot of a table is meant to capture the provenance of
the data at the time of the snapshot.
associate_activity_to_new_version: If True the activity will be associated
with the new version of the table. If False the activity will not be
associated with the new version of the table.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Example: Creating a snapshot of a table
Comment and label are optional, but filled in for this example.
from synapseclient.models import Table
from synapseclient import Synapse
syn = Synapse()
syn.login()
my_table = Table(id="syn1234")
my_table.snapshot(
comment="This is a new snapshot comment",
label="This is a unique label"
)
Example: Including the activity (Provenance) in the snapshot and not pulling it forward to the new `in-progress` version of the table.
By default this method is set up to include the activity in the snapshot and
then pull the activity forward to the new version. If you do not want to
include the activity in the snapshot you can set `include_activity` to
False. If you do not want to pull the activity forward to the new version
you can set `associate_activity_to_new_version` to False.
See the [activity][synapseclient.models.Activity] attribute on the Table
class for more information on how to interact with the activity.
from synapseclient.models import Table
from synapseclient import Synapse
syn = Synapse()
syn.login()
my_table = Table(id="syn1234")
my_table.snapshot(
comment="This is a new snapshot comment",
label="This is a unique label",
include_activity=True,
associate_activity_to_new_version=False
)
Returns:
A dictionary that matches: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/SnapshotResponse.html>
"""
return {}
136 changes: 129 additions & 7 deletions synapseclient/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,13 @@ class Table(TableSynchronousProtocol, AccessControllable):
modified_on: The date this table was last modified.
In YYYY-MM-DD-Thh:mm:ss.sssZ format
modified_by: The ID of the user that last modified this table.
version_number: The version number issued to this version on the object.
version_label: The version label for this table
version_comment: The version comment for this table
version_number: (Read Only) The version number issued to this version on the
object. Use this the `.snapshot()` method to create a new version of the
table.
version_label: (Read Only) The version label for this table. Use the
`.snapshot()` method to create a new version of the table.
version_comment: (Read Only) The version comment for this table. Use the
`.snapshot()` method to create a new version of the table.
is_latest_version: (Read Only) If this is the latest version of the object.
is_search_enabled: When creating or updating a table or view specifies if full
text search should be enabled. Note that enabling full text search might
Expand Down Expand Up @@ -648,13 +652,16 @@ class Table(TableSynchronousProtocol, AccessControllable):
"""The ID of the user that last modified this table."""

version_number: Optional[int] = field(default=None, compare=False)
"""The version number issued to this version on the object."""
"""(Read Only) The version number issued to this version on the object. Use this the
`.snapshot()` method to create a new version of the table."""

version_label: Optional[str] = None
"""The version label for this table"""
"""(Read Only) The version label for this table. Use this the `.snapshot()` method
to create a new version of the table."""

version_comment: Optional[str] = None
"""The version comment for this table"""
"""(Read Only) The version comment for this table. Use this the `.snapshot()` method
to create a new version of the table."""

is_latest_version: Optional[bool] = field(default=None, compare=False)
"""(Read Only) If this is the latest version of the object."""
Expand Down Expand Up @@ -933,7 +940,10 @@ async def store_async(
raise NotImplementedError("This argument is not yet implemented")

if not self.has_changed:
client.logger.info(f"No changes detected for table {self.name or self.id}")
client.logger.info(
f"[{self.id}:{self.name}]: "
"No changes detected for table entity. Annotations, and activity are stored under a separate check."
)

if self.has_changed:
# TODO: Swap the storage of the table to be done in a single transaction via: https://rest-docs.synapse.org/rest/POST/entity/id/table/transaction/async/start.html
Expand Down Expand Up @@ -1270,3 +1280,115 @@ async def query_async(
# TODO: Lastly - When a query is executed both single-threaded and multi-threaded downloads of the CSV result should not write a file to disk, instead write the bytes to a BytesIO object and then use that object instead of a filepath
# TODO: Also support writing the CSV to disk if the user wants to do that
raise NotImplementedError("This method is not yet implemented")

async def snapshot_async(
self,
comment: str = None,
label: str = None,
include_activity: bool = True,
associate_activity_to_new_version: bool = True,
*,
synapse_client: Optional[Synapse] = None,
) -> Dict[str, Any]:
"""
Request to create a new snapshot of a table. The provided comment, label, and
activity will be applied to the current version thereby creating a snapshot
and locking the current version. After the snapshot is created a new version
will be started with an 'in-progress' label.
Arguments:
comment: Comment to add to this snapshot to the table.
label: Label to add to this snapshot to the table. The label must be unique,
if a label is not provided a unique label will be generated.
include_activity: If True the activity will be included in snapshot if it
exists. In order to include the activity, the activity must have already
been stored in Synapse by using the `activity` attribute on the Table
and calling the `store()` method on the Table instance. Adding an
activity to a snapshot of a table is meant to capture the provenance of
the data at the time of the snapshot.
associate_activity_to_new_version: If True the activity will be associated
with the new version of the table. If False the activity will not be
associated with the new version of the table.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Example: Creating a snapshot of a table
Comment and label are optional, but filled in for this example.
import asyncio
from synapseclient.models import Table
from synapseclient import Synapse
syn = Synapse()
syn.login()
async def main():
my_table = Table(id="syn1234")
await my_table.snapshot_async(
comment="This is a new snapshot comment",
label="3This is a unique label"
)
asyncio.run(main())
Example: Including the activity (Provenance) in the snapshot and not pulling it forward to the new `in-progress` version of the table.
By default this method is set up to include the activity in the snapshot and
then pull the activity forward to the new version. If you do not want to
include the activity in the snapshot you can set `include_activity` to
False. If you do not want to pull the activity forward to the new version
you can set `associate_activity_to_new_version` to False.
See the [activity][synapseclient.models.Activity] attribute on the Table
class for more information on how to interact with the activity.
import asyncio
from synapseclient.models import Table
from synapseclient import Synapse
syn = Synapse()
syn.login()
async def main():
my_table = Table(id="syn1234")
await my_table.snapshot_async(
comment="This is a new snapshot comment",
label="This is a unique label",
include_activity=True,
associate_activity_to_new_version=False
)
asyncio.run(main())
Returns:
A dictionary that matches: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/SnapshotResponse.html>
"""
client = Synapse.get_client(synapse_client=synapse_client)
# Ensure that we have seeded the table with the latest data
await self.get_async(include_activity=True, synapse_client=client)
client.logger.info(
f"[{self.id}:{self.name}]: Creating a snapshot of the table."
)

loop = asyncio.get_event_loop()
snapshot_response = await loop.run_in_executor(
None,
lambda: client._create_table_snapshot(
table=self.id,
comment=comment,
label=label,
activity=self.activity.id
if self.activity and include_activity
else None,
),
)

if associate_activity_to_new_version and self.activity:
self._last_persistent_instance.activity = None
await self.store_async(synapse_client=synapse_client)
else:
await self.get_async(include_activity=True, synapse_client=synapse_client)

return snapshot_response

0 comments on commit cf2c69a

Please sign in to comment.