Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1794373: Support DataFrameWriter.insertInto/insert_into #2835

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- `nullifzero`
- `snowflake_cortex_sentiment`
- Added `Catalog` class to manage snowflake objects. It can be accessed via `Session.catalog`.
- Added support for `DataFrameWriter.insert_into/insertInto`. This method also supports local testing mode.

#### Improvements

Expand Down
2 changes: 2 additions & 0 deletions docs/source/snowpark/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Input/Output
DataFrameWriter.save
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrameWriter.insertInto
DataFrameWriter.insert_into
FileOperation.get
FileOperation.get_stream
FileOperation.put
Expand Down
42 changes: 42 additions & 0 deletions src/snowflake/snowpark/dataframe_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
from snowflake.snowpark.async_job import AsyncJob, _AsyncResultType
from snowflake.snowpark.column import Column, _to_col_if_str
from snowflake.snowpark.exceptions import SnowparkClientException
from snowflake.snowpark.functions import sql_expr
from snowflake.snowpark.mock._connection import MockServerConnection
from snowflake.snowpark.row import Row
Expand Down Expand Up @@ -913,4 +914,45 @@ def parquet(
**copy_options,
)

@publicapi
def insert_into(
self, table_name: Union[str, Iterable[str]], overwrite: bool = False
) -> None:
"""
Inserts the content of the DataFrame to the specified table.
It requires that the schema of the DataFrame is the same as the schema of the table.

Args:
table_name: A string or list of strings representing table name.
If input is a string, it represents the table name; if input is of type iterable of strings,
it represents the fully-qualified object identifier (database name, schema name, and table name).
overwrite: If True, the content of table will be overwritten.
If False, the data will be appended to the table. Default is False.

Example::

>>> # save this dataframe to a json file on the session stage
>>> df = session.create_dataframe([["John", "Berry"]], schema = ["FIRST_NAME", "LAST_NAME"])
>>> df.write.save_as_table("my_table", table_type="temporary")
>>> df2 = session.create_dataframe([["Rick", "Berry"]], schema = ["FIRST_NAME", "LAST_NAME"])
>>> df2.write.insert_into("my_table")
>>> session.table("my_table").collect()
[Row(FIRST_NAME='John', LAST_NAME='Berry'), Row(FIRST_NAME='Rick', LAST_NAME='Berry')]
"""
full_table_name = (
table_name if isinstance(table_name, str) else ".".join(table_name)
)
Comment on lines +942 to +944
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to add a TODO to built ast for this API https://snowflakecomputing.atlassian.net/browse/SNOW-1489960

validate_object_name(full_table_name)
qualified_table_name = (
parse_table_name(table_name) if isinstance(table_name, str) else table_name
)

target_table = self._dataframe._session.table(qualified_table_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it work in pyspark even when table does not exist? I think this would fail if table doesn't exist.

if target_table.schema != self._dataframe.schema:
raise SnowparkClientException(
f"Schema of the DataFrame: {self._dataframe.schema} does not match the schema of the table {full_table_name}: {target_table.schema}."
)
Comment on lines +951 to +954
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should do this check on the client side. I think it is possible to append the data by type coercion even though the schemas are not an exact match.

self.save_as_table(table_name, mode="truncate" if overwrite else "append")

insertInto = insert_into
saveAsTable = save_as_table
73 changes: 73 additions & 0 deletions tests/integ/scala/test_dataframe_writer_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import pytest

import snowflake.connector.errors

from snowflake.snowpark.exceptions import SnowparkClientException
from snowflake.snowpark import Row
from snowflake.snowpark._internal.utils import TempObjectType, parse_table_name
from snowflake.snowpark.exceptions import SnowparkSQLException
Expand Down Expand Up @@ -874,3 +876,74 @@ def test_writer_parquet(session, tmpdir_factory, local_testing_mode):
Utils.assert_rows_count(data4, ROWS_COUNT)
finally:
Utils.drop_stage(session, temp_stage)


def test_insert_into(session):
"""
Test the insert_into API with positive and negative test cases.
"""
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)

try:
# Create a DataFrame with initial data
df = session.create_dataframe(
[["Alice", "Smith"], ["Bob", "Brown"]],
schema=["FIRST_NAME", "LAST_NAME"],
)
df.write.save_as_table(table_name)

# Positive Test: Append data to the table
df_append = session.create_dataframe(
[["Charlie", "White"]],
schema=["FIRST_NAME", "LAST_NAME"],
)
df_append.write.insert_into(table_name)
Utils.check_answer(
session.table(table_name),
[
Row(FIRST_NAME="Alice", LAST_NAME="Smith"),
Row(FIRST_NAME="Bob", LAST_NAME="Brown"),
Row(FIRST_NAME="Charlie", LAST_NAME="White"),
],
)

# Positive Test: Overwrite data in the table
df_overwrite = session.create_dataframe(
[["David", "Green"]],
schema=["FIRST_NAME", "LAST_NAME"],
)
df_overwrite.write.insert_into(table_name, overwrite=True)
Utils.check_answer(
session.table(table_name), [Row(FIRST_NAME="David", LAST_NAME="Green")]
)

# Negative Test: Schema mismatch, more columns
df_more_columns = session.create_dataframe(
[["Extra", "Column", 123]],
schema=["FIRST_NAME", "LAST_NAME", "AGE"],
)
with pytest.raises(SnowparkClientException):
df_more_columns.write.insert_into(table_name)

# Negative Test: Schema mismatch, less columns
df_less_column = session.create_dataframe(
[["Column"]],
schema=["FIRST_NAME"],
)
with pytest.raises(SnowparkClientException):
df_less_column.write.insert_into(table_name)

# Negative Test: Schema mismatch, type
df_not_same_type = session.create_dataframe(
[[[1, 2, 3, 4], False]],
schema=["FIRST_NAME", "LAST_NAME"],
)
with pytest.raises(SnowparkClientException):
df_not_same_type.write.insert_into(table_name)

# Negative Test: Table does not exist
with pytest.raises(SnowparkClientException):
df.write.insert_into("non_existent_table")

finally:
Utils.drop_table(session, table_name)
Loading