Skip to content

Commit

Permalink
Merge branch 'main' into SNOW-1794373-api-coverage-data-frame-writer-…
Browse files Browse the repository at this point in the history
…insert-into
  • Loading branch information
sfc-gh-aling authored Jan 9, 2025
2 parents 6d41a99 + 98330fa commit 96f0abf
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
39 changes: 37 additions & 2 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
)

import snowflake.snowpark
from snowflake.connector.constants import FIELD_ID_TO_NAME
from snowflake.connector.cursor import ResultMetadata, SnowflakeCursor
from snowflake.connector.description import OPERATING_SYSTEM, PLATFORM
from snowflake.connector.options import MissingOptionalDependency, ModuleLikeObject
from snowflake.connector.version import VERSION as connector_version
from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages
from snowflake.snowpark.context import _should_use_structured_type_semantics
from snowflake.snowpark.row import Row
from snowflake.snowpark.version import VERSION as snowpark_version

Expand Down Expand Up @@ -698,19 +700,50 @@ def column_to_bool(col_):
return bool(col_)


def _parse_result_meta(
result_meta: Union[List[ResultMetadata], List["ResultMetadataV2"]]
) -> Tuple[Optional[List[str]], Optional[List[Callable]]]:
"""
Takes a list of result metadata objects and returns a list containing the names of all fields as
well as a list of functions that wrap specific columns.
A column type may need to be wrapped if the connector is unable to provide the columns data in
an expected format. For example StructType columns are returned as dict objects, but are better
represented as Row objects.
"""
if not result_meta:
return None, None
col_names = []
wrappers = []
for col in result_meta:
col_names.append(col.name)
if (
_should_use_structured_type_semantics()
and FIELD_ID_TO_NAME[col.type_code] == "OBJECT"
and col.fields is not None
):
wrappers.append(lambda x: Row(**x))
else:
wrappers.append(None)
return col_names, wrappers


def result_set_to_rows(
result_set: List[Any],
result_meta: Optional[Union[List[ResultMetadata], List["ResultMetadataV2"]]] = None,
case_sensitive: bool = True,
) -> List[Row]:
col_names = [col.name for col in result_meta] if result_meta else None
col_names, wrappers = _parse_result_meta(result_meta or [])
rows = []
row_struct = Row
if col_names:
row_struct = (
Row._builder.build(*col_names).set_case_sensitive(case_sensitive).to_row()
)
for data in result_set:
if wrappers:
data = [wrap(d) if wrap else d for wrap, d in zip(wrappers, data)]

if data is None:
raise ValueError("Result returned from Python connector is None")
row = row_struct(*data)
Expand All @@ -723,7 +756,7 @@ def result_set_to_iter(
result_meta: Optional[List[ResultMetadata]] = None,
case_sensitive: bool = True,
) -> Iterator[Row]:
col_names = [col.name for col in result_meta] if result_meta else None
col_names, wrappers = _parse_result_meta(result_meta)
row_struct = Row
if col_names:
row_struct = (
Expand All @@ -732,6 +765,8 @@ def result_set_to_iter(
for data in result_set:
if data is None:
raise ValueError("Result returned from Python connector is None")
if wrappers:
data = [wrap(d) if wrap else d for wrap, d in zip(wrappers, data)]
row = row_struct(*data)
yield row

Expand Down
6 changes: 3 additions & 3 deletions tests/integ/scala/test_datatype_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,8 +876,8 @@ def test_structured_dtypes_iceberg_create_from_values(
_, __, expected_schema = _create_example(True)
table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}"
data = [
({"x": 1}, {"A": "a", "b": 1}, [1, 1, 1]),
({"x": 2}, {"A": "b", "b": 2}, [2, 2, 2]),
({"x": 1}, Row(A="a", b=1), [1, 1, 1]),
({"x": 2}, Row(A="b", b=2), [2, 2, 2]),
]
try:
create_df = structured_type_session.create_dataframe(
Expand Down Expand Up @@ -1043,7 +1043,7 @@ def test_structured_dtypes_cast(structured_type_session, structured_type_support
)
assert cast_df.schema == expected_structured_schema
assert cast_df.collect() == [
Row([1, 2, 3], {"k1": 1, "k2": 2}, {"A": 1.0, "B": "foobar"})
Row([1, 2, 3], {"k1": 1, "k2": 2}, Row(A=1.0, B="foobar"))
]


Expand Down

0 comments on commit 96f0abf

Please sign in to comment.