Skip to content

Commit

Permalink
Add support for upsert_rows
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Jan 10, 2025
1 parent 9b3cd5d commit b18580c
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 15 deletions.
1 change: 1 addition & 0 deletions docs/reference/oop/table_refactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ client.
- delete
- query
- store_rows
- upsert_rows
- delete_rows
- snapshot
- delete_column
Expand Down
96 changes: 96 additions & 0 deletions synapseclient/models/protocols/table_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,102 @@ def store(
"""
return self

def upsert_rows(
self,
values: pd.DataFrame,
upsert_columns: List[str],
*,
synapse_client: Optional[Synapse] = None,
) -> None:
"""
This method allows you to perform an `upsert` (Update and Insert) for a row.
This means that you may update a row with only the data that you want to change.
When supplied with a row that does not match the given `upsert_columns` a new
row will be inserted. If you want to replace a row entirely you may use the
`.store_rows()` method. See that method for more information.
Using the `upsert_columns` argument you may specify which columns to use to
determine if a row already exists. If a row exists with the same values in the
columns specified in this list the row will be updated. If a row does not exist
it will be inserted.
Limitations:
- The `upsert_columns` argument must contain at least one column.
- The `upsert_columns` argument must contain columns that are not a LIST type.
- The values used as the `upsert_columns` must be unique in the table. If there
are multiple rows with the same values in the `upsert_columns` the behavior
is that an exception will be raised.
- The columns used in `upsert_columns` cannot contain updated values. Since
the values in these columns are used to determine if a row exists, they
cannot be updated in the same transaction.
Arguments:
values: Supports storing data from the following sources:
- A string holding the path to a CSV file
- A list of lists (or tuples) where each element is a row
- A dictionary where the key is the column name and the value is one or more values. The values will be wrapped into a [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe).
- A [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe)
upsert_columns: The columns to use to determine if a row already exists. If
a row exists with the same values in the columns specified in this list
the row will be updated. If a row does not exist it will be inserted.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor
TODO: Add an example for deleting data out of a cell
TODO: Add an example/support for skipping over cells in the table. Suppose I want to update row 1 `col2`, and row 2 `col3`, but I don't want to specify the data for row 1 `col3` and row 2 `col2`. Should this be supported?
Example: Updating 2 rows and inserting 1 row
In this given example we have a table with the following data:
| col1 | col2 | col3 |
|------|------| -----|
| A | 1 | 1 |
| B | 2 | 2 |
The following code will update the first row's `col2` to `22`, update the
second row's `col3` to `33`, and insert a new row:
from synapseclient import Synapse
from synapseclient.models import Table
import pandas as pd
syn = Synapse()
syn.login()
table = Table(id="syn123").get(include_columns=True)
df = pd.DataFrame({
'col1': ['A', 'B', 'C'],
'col2': [22, 2, 3],
'col3': [1, 33, 3],
})
table.upsert_rows(values=df, upsert_columns=["col1"])
main()
The resulting table will look like this:
| col1 | col2 | col3 |
|------|------| -----|
| A | 22 | 1 |
| B | 2 | 33 |
| C | 3 | 3 |
"""
return None

def store_rows(
self,
values: Union[str, List[Dict[str, Any]], Dict[str, Any], pd.DataFrame],
Expand Down
228 changes: 223 additions & 5 deletions synapseclient/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapseclient.models.services.storable_entity_components import (
store_entity_components,
)
from synapseclient.table import delete_rows
from synapseclient.table import PartialRowset, cast_value, delete_rows


class FacetType(str, Enum):
Expand Down Expand Up @@ -452,7 +452,7 @@ class Table(TableSynchronousProtocol, AccessControllable):
import pandas as pd
from synapseclient import Synapse
from synapseclient.models import Table
from synapseclient.models import Table, SchemaStorageStrategy
syn = Synapse()
syn.login()
Expand All @@ -473,7 +473,7 @@ class Table(TableSynchronousProtocol, AccessControllable):
# The call to `store_rows` will also call `.store()` on the table too,
# meaning if the table does not exist it will be created.
table.store_rows(values=my_data)
table.store_rows(values=my_data, schema_storage_strategy=SchemaStorageStrategy.INFER_FROM_DATA)
# Prints out the stored data about this specific column
print(table.columns["my_string_column"])
Expand Down Expand Up @@ -844,6 +844,203 @@ def fill_from_dict(
)
return self

async def upsert_rows_async(
self,
values: pd.DataFrame,
upsert_columns: List[str],
*,
synapse_client: Optional[Synapse] = None,
) -> None:
"""
This method allows you to perform an `upsert` (Update and Insert) for a row.
This means that you may update a row with only the data that you want to change.
When supplied with a row that does not match the given `upsert_columns` a new
row will be inserted. If you want to replace a row entirely you may use the
`.store_rows()` method. See that method for more information.
Using the `upsert_columns` argument you may specify which columns to use to
determine if a row already exists. If a row exists with the same values in the
columns specified in this list the row will be updated. If a row does not exist
it will be inserted.
Limitations:
- The `upsert_columns` argument must contain at least one column.
- The `upsert_columns` argument must contain columns that are not a LIST type.
- The values used as the `upsert_columns` must be unique in the table. If there
are multiple rows with the same values in the `upsert_columns` the behavior
is that an exception will be raised.
- The columns used in `upsert_columns` cannot contain updated values. Since
the values in these columns are used to determine if a row exists, they
cannot be updated in the same transaction.
Arguments:
values: Supports storing data from the following sources:
- A string holding the path to a CSV file
- A list of lists (or tuples) where each element is a row
- A dictionary where the key is the column name and the value is one or more values. The values will be wrapped into a [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe).
- A [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe)
upsert_columns: The columns to use to determine if a row already exists. If
a row exists with the same values in the columns specified in this list
the row will be updated. If a row does not exist it will be inserted.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor
TODO: Add an example for deleting data out of a cell
TODO: Add an example/support for skipping over cells in the table. Suppose I want to update row 1 `col2`, and row 2 `col3`, but I don't want to specify the data for row 1 `col3` and row 2 `col2`. Should this be supported?
Example: Updating 2 rows and inserting 1 row
In this given example we have a table with the following data:
| col1 | col2 | col3 |
|------|------| -----|
| A | 1 | 1 |
| B | 2 | 2 |
The following code will update the first row's `col2` to `22`, update the
second row's `col3` to `33`, and insert a new row:
import asyncio
from synapseclient import Synapse
from synapseclient.models import Table
import pandas as pd
syn = Synapse()
syn.login()
async def main():
table = await Table(id="syn123").get_async(include_columns=True)
df = pd.DataFrame({
'col1': ['A', 'B', 'C'],
'col2': [22, 2, 3],
'col3': [1, 33, 3],
})
await table.upsert_rows_async(values=df, upsert_columns=["col1"])
asyncio.run(main())
The resulting table will look like this:
| col1 | col2 | col3 |
|------|------| -----|
| A | 22 | 1 |
| B | 2 | 33 |
| C | 3 | 3 |
"""
# The following is an example of the current implementation for a partial row update (Similar to upserting rows):

# results = syn.tableQuery("SELECT * FROM syn64574780")

# df = results.asDataFrame(rowIdAndVersionInIndex=False)

# partial_changes = {df['ROW_ID'][0]: {'col1': 'A_1'},
# df['ROW_ID'][1]: {'col2': '22'}}
# partial_rowset = PartialRowset.from_mapping(partial_changes, results)
# syn.store(partial_rowset)

# Let's first assume this can only be used via a DF
all_columns_from_df = values.columns

# Create select:
select_statement = (
f"SELECT ROW_ID, {', '.join(all_columns_from_df)} FROM {self.id} WHERE "
)
where_statements = []
for upsert_column in upsert_columns:
column_model = self.columns[upsert_column]
# TODO: Determine what column types that we need to wrap in quotes
if column_model.column_type == ColumnType.STRING:
non_null_values = [
f"'{value}'" for value in values[upsert_column] if value is not None
]
else:
non_null_values = [
str(value) for value in values[upsert_column] if value is not None
]
if not non_null_values:
continue
where_statements.append(
f"{upsert_column} IN ({', '.join(non_null_values)})"
)

where_statement = " AND ".join(where_statements)
select_statement += where_statement

print(f"Select statement: {select_statement}")

results = Table.query(query=select_statement, synapse_client=synapse_client)
partial_changes = {}
indexs_of_original_df_with_changes = []
for _, row in results.iterrows():
row_id = row["ROW_ID"]
partial_change = {}

# Find the matching row in `values` that matches the row in `results` for the upsert_columns
matching_conditions = values[upsert_columns[0]] == row[upsert_columns[0]]
for col in upsert_columns[1:]:
matching_conditions &= values[col] == row[col]
matching_row = values.loc[matching_conditions]

for column in all_columns_from_df:
if len(matching_row[column].values) > 1:
raise ValueError(
f"The values for the keys being upserted must be unique in the table: [{matching_row}]"
)

if len(matching_row[column].values) == 0:
continue

if matching_row[column].values[0] != row[column]:
partial_change[column] = cast_value(
value=matching_row[column].values[0],
column_type=self.columns[column].column_type,
)

if partial_change != {}:
partial_changes[row_id] = partial_change
indexs_of_original_df_with_changes.append(matching_row.index[0])

rows_to_insert_df = values.loc[
~values.index.isin(indexs_of_original_df_with_changes)
]

print(f"Partial changes: {partial_changes}")
print(f"Rows to insert: {rows_to_insert_df}")

name_to_column_id = {column.name: column.id for column in self.columns.values()}
row_etags = {
row["ROW_ID"]: row["ROW_ETAG"]
for _, row in results.iterrows()
if row["ROW_ID"] in partial_changes
and "ROW_ETAG" in row
and row["ROW_ETAG"]
}

partial_rowset = PartialRowset.from_mapping(
partial_changes, results, name_to_column_id, row_etags, self.id
)
client = Synapse.get_client(synapse_client=synapse_client)
client.store(partial_rowset)

if not rows_to_insert_df.empty:
await self.store_rows_async(
values=rows_to_insert_df, synapse_client=synapse_client
)

# TODO: Finish implementation
async def store_rows_async(
self,
Expand All @@ -854,7 +1051,14 @@ async def store_rows_async(
synapse_client: Optional[Synapse] = None,
) -> None:
"""
Takes in values from the sources defined below and stores the rows to Synapse.
Add or update rows in Synapse from the sources defined below. This method
works on a full row replacement in the case of an update. What this means is
that you may not do a partial update of a row. If you want to update a row
you must pass in all the data for that row, or the data for the columns not
provided will be set to null.
If you'd like to perform an `upsert` or partial update of a row you may use
the `.upsert_rows()` method. See that method for more information.
Arguments:
values: Supports storing data from the following sources:
Expand Down Expand Up @@ -898,6 +1102,10 @@ async def store_rows_async(
Returns:
None
# TODO: Add example for creating new rows
# TODO: Add example for updating rows
# TODO: Add example for creating and updating rows in the same call
"""
client = Synapse.get_client(synapse_client=synapse_client)

Expand Down Expand Up @@ -1679,7 +1887,17 @@ async def main():
# TODO: Additionally - the logic present in synapseclient/table.py::CsvFileTable::asDataFrame should be considered and implemented as well
# TODO: Lastly - When a query is executed both single-threaded and multi-threaded downloads of the CSV result should not write a file to disk, instead write the bytes to a BytesIO object and then use that object instead of a filepath
# TODO: Also support writing the CSV to disk if the user wants to do that
raise NotImplementedError("This method is not yet implemented")
loop = asyncio.get_event_loop()

# TODO: Implementation should not download as CSV, but left as a placeholder for now
results = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).tableQuery(
query=query,
),
)
return results.asDataFrame(rowIdAndVersionInIndex=False)
# raise NotImplementedError("This method is not yet implemented")

async def snapshot_async(
self,
Expand Down
Loading

0 comments on commit b18580c

Please sign in to comment.