From b18580c5a44150f4598139a68c396b96c3ab3561 Mon Sep 17 00:00:00 2001 From: BryanFauble <17128019+BryanFauble@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:35:40 -0700 Subject: [PATCH] Add support for upsert_rows --- docs/reference/oop/table_refactor.md | 1 + .../models/protocols/table_protocol.py | 96 ++++++++ synapseclient/models/table.py | 228 +++++++++++++++++- synapseclient/table.py | 64 ++++- 4 files changed, 374 insertions(+), 15 deletions(-) diff --git a/docs/reference/oop/table_refactor.md b/docs/reference/oop/table_refactor.md index 5cd14346f..5b0ddb73b 100644 --- a/docs/reference/oop/table_refactor.md +++ b/docs/reference/oop/table_refactor.md @@ -12,6 +12,7 @@ client. - delete - query - store_rows + - upsert_rows - delete_rows - snapshot - delete_column diff --git a/synapseclient/models/protocols/table_protocol.py b/synapseclient/models/protocols/table_protocol.py index d5700458d..a2769b2db 100644 --- a/synapseclient/models/protocols/table_protocol.py +++ b/synapseclient/models/protocols/table_protocol.py @@ -55,6 +55,102 @@ def store( """ return self + def upsert_rows( + self, + values: pd.DataFrame, + upsert_columns: List[str], + *, + synapse_client: Optional[Synapse] = None, + ) -> None: + """ + This method allows you to perform an `upsert` (Update and Insert) for a row. + This means that you may update a row with only the data that you want to change. + When supplied with a row that does not match the given `upsert_columns` a new + row will be inserted. If you want to replace a row entirely you may use the + `.store_rows()` method. See that method for more information. + + + Using the `upsert_columns` argument you may specify which columns to use to + determine if a row already exists. If a row exists with the same values in the + columns specified in this list the row will be updated. If a row does not exist + it will be inserted. + + + Limitations: + + - The `upsert_columns` argument must contain at least one column. + - The `upsert_columns` argument must contain columns that are not a LIST type. + - The values used as the `upsert_columns` must be unique in the table. If there + are multiple rows with the same values in the `upsert_columns` the behavior + is that an exception will be raised. + - The columns used in `upsert_columns` cannot contain updated values. Since + the values in these columns are used to determine if a row exists, they + cannot be updated in the same transaction. + + + Arguments: + values: Supports storing data from the following sources: + + - A string holding the path to a CSV file + - A list of lists (or tuples) where each element is a row + - A dictionary where the key is the column name and the value is one or more values. The values will be wrapped into a [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe). + - A [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe) + + upsert_columns: The columns to use to determine if a row already exists. If + a row exists with the same values in the columns specified in this list + the row will be updated. If a row does not exist it will be inserted. + + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)` this will use the last created + instance from the Synapse class constructor + + + TODO: Add an example for deleting data out of a cell + + TODO: Add an example/support for skipping over cells in the table. Suppose I want to update row 1 `col2`, and row 2 `col3`, but I don't want to specify the data for row 1 `col3` and row 2 `col2`. Should this be supported? + + + Example: Updating 2 rows and inserting 1 row + In this given example we have a table with the following data: + + | col1 | col2 | col3 | + |------|------| -----| + | A | 1 | 1 | + | B | 2 | 2 | + + The following code will update the first row's `col2` to `22`, update the + second row's `col3` to `33`, and insert a new row: + + from synapseclient import Synapse + from synapseclient.models import Table + import pandas as pd + + syn = Synapse() + syn.login() + + table = Table(id="syn123").get(include_columns=True) + + df = pd.DataFrame({ + 'col1': ['A', 'B', 'C'], + 'col2': [22, 2, 3], + 'col3': [1, 33, 3], + }) + + table.upsert_rows(values=df, upsert_columns=["col1"]) + + main() + + The resulting table will look like this: + + | col1 | col2 | col3 | + |------|------| -----| + | A | 22 | 1 | + | B | 2 | 33 | + | C | 3 | 3 | + + """ + return None + def store_rows( self, values: Union[str, List[Dict[str, Any]], Dict[str, Any], pd.DataFrame], diff --git a/synapseclient/models/table.py b/synapseclient/models/table.py index 4bdd58dbc..50bed4c7d 100644 --- a/synapseclient/models/table.py +++ b/synapseclient/models/table.py @@ -27,7 +27,7 @@ from synapseclient.models.services.storable_entity_components import ( store_entity_components, ) -from synapseclient.table import delete_rows +from synapseclient.table import PartialRowset, cast_value, delete_rows class FacetType(str, Enum): @@ -452,7 +452,7 @@ class Table(TableSynchronousProtocol, AccessControllable): import pandas as pd from synapseclient import Synapse - from synapseclient.models import Table + from synapseclient.models import Table, SchemaStorageStrategy syn = Synapse() syn.login() @@ -473,7 +473,7 @@ class Table(TableSynchronousProtocol, AccessControllable): # The call to `store_rows` will also call `.store()` on the table too, # meaning if the table does not exist it will be created. - table.store_rows(values=my_data) + table.store_rows(values=my_data, schema_storage_strategy=SchemaStorageStrategy.INFER_FROM_DATA) # Prints out the stored data about this specific column print(table.columns["my_string_column"]) @@ -844,6 +844,203 @@ def fill_from_dict( ) return self + async def upsert_rows_async( + self, + values: pd.DataFrame, + upsert_columns: List[str], + *, + synapse_client: Optional[Synapse] = None, + ) -> None: + """ + This method allows you to perform an `upsert` (Update and Insert) for a row. + This means that you may update a row with only the data that you want to change. + When supplied with a row that does not match the given `upsert_columns` a new + row will be inserted. If you want to replace a row entirely you may use the + `.store_rows()` method. See that method for more information. + + + Using the `upsert_columns` argument you may specify which columns to use to + determine if a row already exists. If a row exists with the same values in the + columns specified in this list the row will be updated. If a row does not exist + it will be inserted. + + + Limitations: + + - The `upsert_columns` argument must contain at least one column. + - The `upsert_columns` argument must contain columns that are not a LIST type. + - The values used as the `upsert_columns` must be unique in the table. If there + are multiple rows with the same values in the `upsert_columns` the behavior + is that an exception will be raised. + - The columns used in `upsert_columns` cannot contain updated values. Since + the values in these columns are used to determine if a row exists, they + cannot be updated in the same transaction. + + + Arguments: + values: Supports storing data from the following sources: + + - A string holding the path to a CSV file + - A list of lists (or tuples) where each element is a row + - A dictionary where the key is the column name and the value is one or more values. The values will be wrapped into a [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe). + - A [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe) + + upsert_columns: The columns to use to determine if a row already exists. If + a row exists with the same values in the columns specified in this list + the row will be updated. If a row does not exist it will be inserted. + + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)` this will use the last created + instance from the Synapse class constructor + + + TODO: Add an example for deleting data out of a cell + + TODO: Add an example/support for skipping over cells in the table. Suppose I want to update row 1 `col2`, and row 2 `col3`, but I don't want to specify the data for row 1 `col3` and row 2 `col2`. Should this be supported? + + + Example: Updating 2 rows and inserting 1 row + In this given example we have a table with the following data: + + | col1 | col2 | col3 | + |------|------| -----| + | A | 1 | 1 | + | B | 2 | 2 | + + The following code will update the first row's `col2` to `22`, update the + second row's `col3` to `33`, and insert a new row: + + import asyncio + from synapseclient import Synapse + from synapseclient.models import Table + import pandas as pd + + syn = Synapse() + syn.login() + + + async def main(): + table = await Table(id="syn123").get_async(include_columns=True) + + df = pd.DataFrame({ + 'col1': ['A', 'B', 'C'], + 'col2': [22, 2, 3], + 'col3': [1, 33, 3], + }) + + await table.upsert_rows_async(values=df, upsert_columns=["col1"]) + + asyncio.run(main()) + + The resulting table will look like this: + + | col1 | col2 | col3 | + |------|------| -----| + | A | 22 | 1 | + | B | 2 | 33 | + | C | 3 | 3 | + + """ + # The following is an example of the current implementation for a partial row update (Similar to upserting rows): + + # results = syn.tableQuery("SELECT * FROM syn64574780") + + # df = results.asDataFrame(rowIdAndVersionInIndex=False) + + # partial_changes = {df['ROW_ID'][0]: {'col1': 'A_1'}, + # df['ROW_ID'][1]: {'col2': '22'}} + # partial_rowset = PartialRowset.from_mapping(partial_changes, results) + # syn.store(partial_rowset) + + # Let's first assume this can only be used via a DF + all_columns_from_df = values.columns + + # Create select: + select_statement = ( + f"SELECT ROW_ID, {', '.join(all_columns_from_df)} FROM {self.id} WHERE " + ) + where_statements = [] + for upsert_column in upsert_columns: + column_model = self.columns[upsert_column] + # TODO: Determine what column types that we need to wrap in quotes + if column_model.column_type == ColumnType.STRING: + non_null_values = [ + f"'{value}'" for value in values[upsert_column] if value is not None + ] + else: + non_null_values = [ + str(value) for value in values[upsert_column] if value is not None + ] + if not non_null_values: + continue + where_statements.append( + f"{upsert_column} IN ({', '.join(non_null_values)})" + ) + + where_statement = " AND ".join(where_statements) + select_statement += where_statement + + print(f"Select statement: {select_statement}") + + results = Table.query(query=select_statement, synapse_client=synapse_client) + partial_changes = {} + indexs_of_original_df_with_changes = [] + for _, row in results.iterrows(): + row_id = row["ROW_ID"] + partial_change = {} + + # Find the matching row in `values` that matches the row in `results` for the upsert_columns + matching_conditions = values[upsert_columns[0]] == row[upsert_columns[0]] + for col in upsert_columns[1:]: + matching_conditions &= values[col] == row[col] + matching_row = values.loc[matching_conditions] + + for column in all_columns_from_df: + if len(matching_row[column].values) > 1: + raise ValueError( + f"The values for the keys being upserted must be unique in the table: [{matching_row}]" + ) + + if len(matching_row[column].values) == 0: + continue + + if matching_row[column].values[0] != row[column]: + partial_change[column] = cast_value( + value=matching_row[column].values[0], + column_type=self.columns[column].column_type, + ) + + if partial_change != {}: + partial_changes[row_id] = partial_change + indexs_of_original_df_with_changes.append(matching_row.index[0]) + + rows_to_insert_df = values.loc[ + ~values.index.isin(indexs_of_original_df_with_changes) + ] + + print(f"Partial changes: {partial_changes}") + print(f"Rows to insert: {rows_to_insert_df}") + + name_to_column_id = {column.name: column.id for column in self.columns.values()} + row_etags = { + row["ROW_ID"]: row["ROW_ETAG"] + for _, row in results.iterrows() + if row["ROW_ID"] in partial_changes + and "ROW_ETAG" in row + and row["ROW_ETAG"] + } + + partial_rowset = PartialRowset.from_mapping( + partial_changes, results, name_to_column_id, row_etags, self.id + ) + client = Synapse.get_client(synapse_client=synapse_client) + client.store(partial_rowset) + + if not rows_to_insert_df.empty: + await self.store_rows_async( + values=rows_to_insert_df, synapse_client=synapse_client + ) + # TODO: Finish implementation async def store_rows_async( self, @@ -854,7 +1051,14 @@ async def store_rows_async( synapse_client: Optional[Synapse] = None, ) -> None: """ - Takes in values from the sources defined below and stores the rows to Synapse. + Add or update rows in Synapse from the sources defined below. This method + works on a full row replacement in the case of an update. What this means is + that you may not do a partial update of a row. If you want to update a row + you must pass in all the data for that row, or the data for the columns not + provided will be set to null. + + If you'd like to perform an `upsert` or partial update of a row you may use + the `.upsert_rows()` method. See that method for more information. Arguments: values: Supports storing data from the following sources: @@ -898,6 +1102,10 @@ async def store_rows_async( Returns: None + + # TODO: Add example for creating new rows + # TODO: Add example for updating rows + # TODO: Add example for creating and updating rows in the same call """ client = Synapse.get_client(synapse_client=synapse_client) @@ -1679,7 +1887,17 @@ async def main(): # TODO: Additionally - the logic present in synapseclient/table.py::CsvFileTable::asDataFrame should be considered and implemented as well # TODO: Lastly - When a query is executed both single-threaded and multi-threaded downloads of the CSV result should not write a file to disk, instead write the bytes to a BytesIO object and then use that object instead of a filepath # TODO: Also support writing the CSV to disk if the user wants to do that - raise NotImplementedError("This method is not yet implemented") + loop = asyncio.get_event_loop() + + # TODO: Implementation should not download as CSV, but left as a placeholder for now + results = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).tableQuery( + query=query, + ), + ) + return results.asDataFrame(rowIdAndVersionInIndex=False) + # raise NotImplementedError("This method is not yet implemented") async def snapshot_async( self, diff --git a/synapseclient/table.py b/synapseclient/table.py index 7a35caa8a..70f13ce53 100644 --- a/synapseclient/table.py +++ b/synapseclient/table.py @@ -34,7 +34,7 @@ import re import tempfile from builtins import zip -from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, Union from synapseclient.core.constants import concrete_types from synapseclient.core.exceptions import SynapseError @@ -44,6 +44,9 @@ from .entity import Entity, Folder, Project, entity_type_to_class from .evaluation import Evaluation +if TYPE_CHECKING: + from synapseclient import Synapse + aggregate_pattern = re.compile(r"(count|max|min|avg|sum)\((.+)\)") # default is STRING, only need to put the non-STRING keys in here @@ -272,6 +275,41 @@ def row_labels_from_rows(rows): ) +def cast_value(value: Any, column_type: str): + if value is None or value == "": + return None + elif column_type in { + "STRING", + "ENTITYID", + "FILEHANDLEID", + "LARGETEXT", + "USERID", + "LINK", + }: + return value + elif column_type == "DOUBLE": + return float(value) + elif column_type == "INTEGER": + return int(value) + elif column_type == "BOOLEAN": + return to_boolean(value) + elif column_type == "DATE": + return from_unix_epoch_time(value) + elif column_type in { + "STRING_LIST", + "INTEGER_LIST", + "BOOLEAN_LIST", + "ENTITYID_LIST", + "USERID_LIST", + }: + return json.loads(value) + elif column_type == "DATE_LIST": + return json.loads(value, parse_int=from_unix_epoch_time) + else: + # default to string for unknown column type + return value + + def cast_values(values, headers): """ Convert a row of table query results from strings to the correct column type. @@ -1525,7 +1563,9 @@ class PartialRowset(AppendableRowset): """ @classmethod - def from_mapping(cls, mapping, originalQueryResult): + def from_mapping( + cls, mapping, originalQueryResult, name_to_column_id, row_etags, table_id + ): """ Creates a PartialRowset @@ -1540,7 +1580,7 @@ def from_mapping(cls, mapping, originalQueryResult): raise ValueError("mapping must be a supported Mapping type such as 'dict'") try: - name_to_column_id = { + name_to_column_id = name_to_column_id or { col.name: col.id for col in originalQueryResult.headers if "id" in col } except AttributeError: @@ -1552,11 +1592,15 @@ def from_mapping(cls, mapping, originalQueryResult): # row_ids in the originalQueryResult are not guaranteed to be in ascending order # iterate over all etags but only map the row_ids used for this partial update to their etags - row_etags = { - row_id: etag - for row_id, row_version, etag in originalQueryResult.iter_row_metadata() - if row_id in row_ids and etag is not None - } + row_etags = ( + { + row_id: etag + for row_id, row_version, etag in originalQueryResult.iter_row_metadata() + if row_id in row_ids and etag is not None + } + if row_etags is None + else row_etags + ) partial_rows = [ PartialRow( @@ -1568,7 +1612,7 @@ def from_mapping(cls, mapping, originalQueryResult): for row_id, row_changes in mapping.items() ] - return cls(originalQueryResult.tableId, partial_rows) + return cls(table_id or originalQueryResult.tableId, partial_rows) def __init__(self, schema, rows): super(PartialRowset, self).__init__(schema) @@ -2155,7 +2199,7 @@ class CsvFileTable(TableAbstractBaseClass): @classmethod def from_table_query( cls, - synapse, + synapse: "Synapse", query, quoteCharacter='"', escapeCharacter="\\",