From fb39324e9fd251447f85ca50dae9a8299f3ae923 Mon Sep 17 00:00:00 2001 From: Labanya Mukhopadhyay Date: Mon, 28 Oct 2024 16:46:41 -0700 Subject: [PATCH] SNOW-1692064: Implement DataFrame/Series align for axis = 0 (#2483) 1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR. Fixes SNOW-1692064 2. Fill out the following pre-review checklist: - [ x I am adding a new automated test(s) to verify correctness of my new code - [ ] If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing - [ ] I am adding new logging messages - [ ] I am adding a new telemetry message - [ ] I am adding new credentials - [ ] I am adding a new dependency - [ ] If this is a new feature/behavior, I'm adding the Local Testing parity changes. - [x] I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: [Thread-safe Developer Guidelines](https://docs.google.com/document/d/162d_i4zZ2AfcGRXojj0jByt8EUq-DrSHPPnTa4QvwbA/edit#bookmark=id.e82u4nekq80k) 3. Please describe how your code solves the related issue. Support for DataFrame/Series align for axis = 0 and default ``fill_value`` of np.nan. --------- Signed-off-by: Labanya Mukhopadhyay --- CHANGELOG.md | 1 + .../modin/supported/dataframe_supported.rst | 5 +- .../modin/supported/series_supported.rst | 5 +- .../modin/plugin/_internal/align_utils.py | 95 ++++++ .../modin/plugin/_internal/join_utils.py | 4 +- .../plugin/_internal/ordered_dataframe.py | 15 +- .../snowpark/modin/plugin/_typing.py | 4 + .../compiler/snowflake_query_compiler.py | 115 +++++++ .../snowpark/modin/plugin/docstrings/base.py | 59 ++++ .../modin/plugin/extensions/base_overrides.py | 52 +++- tests/integ/modin/frame/test_align.py | 290 ++++++++++++++++++ tests/integ/modin/series/test_align.py | 167 ++++++++++ tests/integ/modin/test_ordered_dataframe.py | 31 +- tests/unit/modin/test_unsupported.py | 2 - 14 files changed, 820 insertions(+), 25 deletions(-) create mode 100644 src/snowflake/snowpark/modin/plugin/_internal/align_utils.py create mode 100644 tests/integ/modin/frame/test_align.py create mode 100644 tests/integ/modin/series/test_align.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c6af0ccad4b..1c6261cc223 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ - Added support for `DataFrame.attrs` and `Series.attrs`. - Added support for `DataFrame.style`. - Added support for `Index.to_numpy`. +- Added support for `DataFrame.align` and `Series.align` for `axis=0`. #### Improvements diff --git a/docs/source/modin/supported/dataframe_supported.rst b/docs/source/modin/supported/dataframe_supported.rst index 59ae57124bb..7603d15fff2 100644 --- a/docs/source/modin/supported/dataframe_supported.rst +++ b/docs/source/modin/supported/dataframe_supported.rst @@ -78,7 +78,10 @@ Methods | ``aggregate`` | P | ``margins``, ``observed``, | See ``agg`` | | | | ``sort`` | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``align`` | N | | | +| ``align`` | P | ``copy``, ``level``, | ``N`` for MultiIndex, for deprecated parameters | +| | | ``fill_value`` | ``method``, ``limit``, ``fill_axis``, | +| | | | ``broadcast_axis``, if ``axis`` == 1 or None, or | +| | | | if ``fill_value`` is not default of np.nan | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``all`` | P | | ``N`` for non-integer/boolean types | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/docs/source/modin/supported/series_supported.rst b/docs/source/modin/supported/series_supported.rst index c5cf6d78fdf..f5995805b6a 100644 --- a/docs/source/modin/supported/series_supported.rst +++ b/docs/source/modin/supported/series_supported.rst @@ -84,7 +84,10 @@ Methods +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``aggregate`` | P | | See ``agg`` | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``align`` | N | | | +| ``align`` | P | ``copy``, ``level``, | ``N`` for MultiIndex, for deprecated parameters | +| | | ``fill_value`` | ``method``, ``limit``, ``fill_axis``, | +| | | | ``broadcast_axis``, if ``axis`` == 1 or None, or | +| | | | if ``fill_value`` is not default of np.nan | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``all`` | P | | ``N`` for non-integer/boolean types | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/plugin/_internal/align_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/align_utils.py new file mode 100644 index 00000000000..fbf2df133e9 --- /dev/null +++ b/src/snowflake/snowpark/modin/plugin/_internal/align_utils.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame +from snowflake.snowpark.modin.plugin._internal.join_utils import align_on_index + + +def align_axis_0_left( + frame: InternalFrame, other_frame: InternalFrame, join: str +) -> tuple[InternalFrame, InternalFrame, list[str], list[str]]: + """ + Gets the left align results. + + Args: + frame: original frame + other_frame: other frame + join: type of alignment to be performed. + + Returns: + Tuple containing: + InternalFrame result of join_utils.align_on_index, + final left_frame, + list of left_frame_data_ids, + list of left_index_ids + """ + if join == "right": + left_result, left_column_mapper = align_on_index(other_frame, frame, how="left") + left_frame_data_ids = left_column_mapper.map_right_quoted_identifiers( + frame.data_column_snowflake_quoted_identifiers + ) + left_index_ids = left_result.index_column_snowflake_quoted_identifiers + left_frame = left_result.ordered_dataframe.select( + left_frame_data_ids + left_index_ids + ) + else: + left_result, left_column_mapper = align_on_index(frame, other_frame, how=join) + left_frame_data_ids = left_column_mapper.map_left_quoted_identifiers( + frame.data_column_snowflake_quoted_identifiers + ) + left_index_ids = left_result.index_column_snowflake_quoted_identifiers + left_frame = left_result.ordered_dataframe.select( + left_frame_data_ids + left_index_ids + ) + return left_result, left_frame, left_frame_data_ids, left_index_ids + + +def align_axis_0_right( + frame: InternalFrame, other_frame: InternalFrame, join: str +) -> tuple[InternalFrame, InternalFrame, list[str], list[str]]: + """ + Gets the right align results. + + Args: + frame: original frame + other_frame: other frame + join: type of alignment to be performed. + + Returns: + Tuple containing: + InternalFrame result of join_utils.align_on_index, + final right_frame, + list of right_frame_data_ids, + list of right_index_ids + """ + if join == "left": + right_result, right_column_mapper = align_on_index(frame, other_frame, how=join) + right_frame_data_ids = right_column_mapper.map_right_quoted_identifiers( + other_frame.data_column_snowflake_quoted_identifiers + ) + right_index_ids = right_result.index_column_snowflake_quoted_identifiers + right_frame = right_result.ordered_dataframe.select( + right_frame_data_ids + right_index_ids + ) + elif join == "right": + right_result, right_column_mapper = align_on_index( + other_frame, frame, how="left" + ) + right_frame_data_ids = right_column_mapper.map_left_quoted_identifiers( + other_frame.data_column_snowflake_quoted_identifiers + ) + right_index_ids = right_result.index_column_snowflake_quoted_identifiers + right_frame = right_result.ordered_dataframe.select( + right_frame_data_ids + right_index_ids + ) + else: + right_result, right_column_mapper = align_on_index(other_frame, frame, how=join) + right_frame_data_ids = right_column_mapper.map_left_quoted_identifiers( + other_frame.data_column_snowflake_quoted_identifiers + ) + right_index_ids = right_result.index_column_snowflake_quoted_identifiers + right_frame = right_result.ordered_dataframe.select( + right_frame_data_ids + right_index_ids + ) + return right_result, right_frame, right_frame_data_ids, right_index_ids diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 10f1f9a4426..db01e62747c 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -1247,6 +1247,7 @@ def align( * coalesce: use only index from left frame, preserve left order. If left frame is empty left_on columns are coalesced with right_on columns. * outer: use union of index from both frames, sort index lexicographically. + * inner: use intersection of index from both frames, preserve left order. Returns: New aligned InternalFrame by aligning left frame with right frame. """ @@ -1330,12 +1331,13 @@ def align_on_index( Args: left: Left DataFrame. right: right DataFrame. - how: the align method {{'left', 'coalesce', 'outer'}}, by default is outer + how: the align method {{'left', 'coalesce', 'outer', 'inner'}}, by default is outer * left: use only index from left frame, preserve left order. * coalesce: if left frame has non-zero rows use only index from left frame, preserve left order otherwise use only right index and preserver right order. * outer: use union of index from both frames, sort index lexicographically. + * inner: use intersection of index from both frames, preserve left order. Returns: An InternalFrame for the aligned result. A JoinOrAlignResultColumnMapper that provides quoted identifiers mapping from the diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index faa2b2c8b34..b1ba815e5a6 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1377,6 +1377,7 @@ def align( how: We support the following align/join types: - "outer": Full outer align (default value) - "left": Left outer align + - "inner": Inner align - "coalesce": If left frame is not empty perform left outer align otherwise perform right outer align. When left frame is empty, the left_on column is replaced with the right_on column in the result. @@ -1672,9 +1673,19 @@ def align( elif how == "left": filter_expression = filter_expression & left_row_pos.is_not_null() select_list = result_projected_column_snowflake_quoted_identifiers - else: # outer + elif how == "inner": + filter_expression = ( + filter_expression + & left_row_pos.is_not_null() + & right_row_pos.is_not_null() + ) select_list = result_projected_column_snowflake_quoted_identifiers - + elif how == "outer": + select_list = result_projected_column_snowflake_quoted_identifiers + else: + raise ValueError( + f"how={how} is not valid argument for ordered_dataframe.align." + ) joined_ordered_frame = joined_ordered_frame.filter(filter_expression).sort( ordering_columns ) diff --git a/src/snowflake/snowpark/modin/plugin/_typing.py b/src/snowflake/snowpark/modin/plugin/_typing.py index 71c70315dc6..da2d52b9efd 100644 --- a/src/snowflake/snowpark/modin/plugin/_typing.py +++ b/src/snowflake/snowpark/modin/plugin/_typing.py @@ -44,6 +44,10 @@ class LabelIdentifierPair(NamedTuple): # align columns. "outer", # If align column values matches exactly, merge frames line by line (this is + # equivalent to joining on row position) otherwise perform INNER JOIN on + # align columns + "inner", + # If align column values matches exactly, merge frames line by line (this is # equivalent to joining on row position) otherwise # - perform LEFT OUTER JOIN if left frame is non-empty # - perform RIGHT OUTER JOIN if left frame is empty diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 8483a414582..3de09328e0c 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -171,6 +171,10 @@ repr_aggregate_function, using_named_aggregations_for_func, ) +from snowflake.snowpark.modin.plugin._internal.align_utils import ( + align_axis_0_left, + align_axis_0_right, +) from snowflake.snowpark.modin.plugin._internal.apply_utils import ( APPLY_LABEL_COLUMN_QUOTED_IDENTIFIER, APPLY_VALUE_COLUMN_QUOTED_IDENTIFIER, @@ -8400,6 +8404,117 @@ def vectorized_udf(df: pandas.DataFrame) -> pandas.Series: # pragma: no cover ) return SnowflakeQueryCompiler(new_frame) + def align( + self, + other: SnowparkDataFrame = None, + join: str = "outer", + axis: int = 0, + level: Level = None, + copy: bool = True, + fill_value: Scalar = None, + ) -> tuple["SnowflakeQueryCompiler", "SnowflakeQueryCompiler"]: + """ + Align two objects on their axes with the specified join method. + + Join method is specified for each axis Index. + + Args: + other: DataFrame or Series + join: {‘outer’, ‘inner’, ‘left’, ‘right’}, default ‘outer’ + Type of alignment to be performed. + left: use only keys from left frame, preserve key order. + right: use only keys from right frame, preserve key order. + outer: use union of keys from both frames, sort keys lexicographically. + inner: use intersection of keys from both frames, preserve the order of the left keys. + axis: allowed axis of the other object, default None + Align on index (0), columns (1), or both (None). + level: int or level name, default None + Broadcast across a level, matching Index values on the passed MultiIndex level. + copy: bool, default True + Always returns new objects. If copy=False and no reindexing is required then original objects are returned. + fill_value: scalar, default np.nan + Always returns new objects. If copy=False and no reindexing is required then original objects are returned. + + Returns: + tuple of SnowflakeQueryCompilers + Aligned objects. + + """ + if copy is not True: + ErrorMessage.not_implemented( + "Snowpark pandas 'align' method doesn't support 'copy=False'" + ) + if level is not None: + ErrorMessage.not_implemented( + "Snowpark pandas 'align' method doesn't support 'level'" + ) + if fill_value is not None: + # TODO: SNOW-1752860 + ErrorMessage.not_implemented( + "Snowpark pandas 'align' method doesn't support 'fill_value'" + ) + if axis != 0: + # TODO: SNOW-1752856 + ErrorMessage.not_implemented( + f"Snowpark pandas 'align' method doesn't support 'axis={axis}'" + ) + frame = self._modin_frame + other_frame = other._query_compiler._modin_frame + + if self.is_multiindex(axis=axis) or other._query_compiler.is_multiindex( + axis=axis + ): + raise NotImplementedError( + "Snowpark pandas doesn't support `align` with MultiIndex" + ) + + # convert frames to variant type if index is incompatible for join + frame, other_frame = join_utils.convert_incompatible_types_to_variant( + frame, + other_frame, + frame.index_column_snowflake_quoted_identifiers, + other_frame.index_column_snowflake_quoted_identifiers, + ) + + ( + left_result, + left_frame, + left_frame_data_ids, + left_index_ids, + ) = align_axis_0_left(frame, other_frame, join) + ( + right_result, + right_frame, + right_frame_data_ids, + right_index_ids, + ) = align_axis_0_right(frame, other_frame, join) + + left_qc = SnowflakeQueryCompiler( + InternalFrame.create( + ordered_dataframe=left_frame, + data_column_snowflake_quoted_identifiers=left_frame_data_ids, + data_column_pandas_labels=frame.data_column_pandas_labels, + data_column_pandas_index_names=frame.data_column_pandas_index_names, + data_column_types=frame.cached_data_column_snowpark_pandas_types, + index_column_snowflake_quoted_identifiers=left_index_ids, + index_column_pandas_labels=left_result.index_column_pandas_labels, + index_column_types=left_result.cached_index_column_snowpark_pandas_types, + ) + ) + right_qc = SnowflakeQueryCompiler( + InternalFrame.create( + ordered_dataframe=right_frame, + data_column_snowflake_quoted_identifiers=right_frame_data_ids, + data_column_pandas_labels=other_frame.data_column_pandas_labels, + data_column_pandas_index_names=other_frame.data_column_pandas_index_names, + data_column_types=other_frame.cached_data_column_snowpark_pandas_types, + index_column_snowflake_quoted_identifiers=right_index_ids, + index_column_pandas_labels=right_result.index_column_pandas_labels, + index_column_types=right_result.cached_index_column_snowpark_pandas_types, + ) + ) + return left_qc, right_qc + def apply( self, func: Union[AggFuncType, UserDefinedFunction], diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/base.py b/src/snowflake/snowpark/modin/plugin/docstrings/base.py index 59f0da9e475..e4608ad48ce 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/base.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/base.py @@ -443,6 +443,65 @@ def aggregate(): def align(): """ Align two objects on their axes with the specified join method. + + Join method is specified for each axis Index. + + Args: + other: DataFrame or Series + join: {‘outer’, ‘inner’, ‘left’, ‘right’}, default ‘outer’ + Type of alignment to be performed. + left: use only keys from left frame, preserve key order. + right: use only keys from right frame, preserve key order. + outer: use union of keys from both frames, sort keys lexicographically. + axis: allowed axis of the other object, default None + Align on index (0), columns (1), or both (None). + level: int or level name, default None + Broadcast across a level, matching Index values on the passed MultiIndex level. + copy: bool, default True + Always returns new objects. If copy=False and no reindexing is required then original objects are returned. + fill_value: scalar, default np.nan + Always returns new objects. If copy=False and no reindexing is required then original objects are returned. + + Returns: + tuple of (Series/DataFrame, type of other) + + Notes + ----- + Snowpark pandas DataFrame/Series.align currently does not support `axis = 1 or None`, non-default `fill_value`, + `copy`, `level`, and MultiIndex. + + Examples:: + + >>> df = pd.DataFrame( + ... [[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"], index=[1, 2] + ... ) + >>> other = pd.DataFrame( + ... [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + ... columns=["A", "B", "C", "D"], + ... index=[2, 3, 4], + ... ) + >>> df + D B E A + 1 1 2 3 4 + 2 6 7 8 9 + >>> other + A B C D + 2 10 20 30 40 + 3 60 70 80 90 + 4 600 700 800 900 + >>> left, right = df.align(other, join="outer", axis=0) + >>> left + D B E A + 1 1.0 2.0 3.0 4.0 + 2 6.0 7.0 8.0 9.0 + 3 NaN NaN NaN NaN + 4 NaN NaN NaN NaN + >>> right + A B C D + 1 NaN NaN NaN NaN + 2 10.0 20.0 30.0 40.0 + 3 60.0 70.0 80.0 90.0 + 4 600.0 700.0 800.0 900.0 """ @doc( diff --git a/src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py b/src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py index a472e131370..0cb31fd9e03 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py @@ -143,23 +143,6 @@ def decorator(base_method: Any): # 4. Performs operations on a native pandas Index object that are nontrivial for Snowpark pandas to manage. -@register_base_not_implemented() -def align( - self, - other, - join="outer", - axis=None, - level=None, - copy=None, - fill_value=None, - method=lib.no_default, - limit=lib.no_default, - fill_axis=lib.no_default, - broadcast_axis=lib.no_default, -): # noqa: PR01, RT01, D200 - pass # pragma: no cover - - @register_base_not_implemented() def asof(self, where, subset=None): # noqa: PR01, RT01, D200 pass # pragma: no cover @@ -882,6 +865,41 @@ def _get_attrs(self) -> dict: # noqa: RT01, D200 register_base_override("attrs")(property(_get_attrs, _set_attrs)) +@register_base_override("align") +def align( + self, + other: BasePandasDataset, + join: str = "outer", + axis: Axis = None, + level: Level = None, + copy: bool = True, + fill_value: Scalar = None, + method: str = None, + limit: int = None, + fill_axis: Axis = 0, + broadcast_axis: Axis = None, +): # noqa: PR01, RT01, D200 + if method is not None or limit is not None or fill_axis != 0: + raise NotImplementedError( + f"The 'method', 'limit', and 'fill_axis' keywords in {self.__class__.__name__}.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead." + ) + if broadcast_axis is not None: + raise NotImplementedError( + f"The 'broadcast_axis' keyword in {self.__class__.__name__}.align is deprecated and will be removed in a future version." + ) + if axis not in [0, 1, None]: + raise ValueError( + f"No axis named {axis} for object type {self.__class__.__name__}" + ) + query_compiler1, query_compiler2 = self._query_compiler.align( + other, join=join, axis=axis, level=level, copy=copy, fill_value=fill_value + ) + return ( + self._create_or_update_from_compiler(query_compiler1, False), + self._create_or_update_from_compiler(query_compiler2, False), + ) + + # Modin does not provide `MultiIndex` support and will default to pandas when `level` is specified, # and allows binary ops against native pandas objects that Snowpark pandas prohibits. @register_base_override("_binary_op") diff --git a/tests/integ/modin/frame/test_align.py b/tests/integ/modin/frame/test_align.py new file mode 100644 index 00000000000..b887f02283a --- /dev/null +++ b/tests/integ/modin/frame/test_align.py @@ -0,0 +1,290 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + + +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.utils import ( + assert_frame_equal, + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, +) +from tests.integ.utils.sql_counter import sql_count_checker + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_axis0(join): + native_df = native_pd.DataFrame( + [[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"] + ) + native_other_df = native_pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + native_left, native_right = native_df.align( + native_other_df, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + df = pd.DataFrame(native_df) + other_df = pd.DataFrame(native_other_df) + left, right = df.align(other_df, join=join, axis=0) + assert_frame_equal(left, native_left) + assert_frame_equal(right, native_right) + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_reorder_axis0(join): + native_df = native_pd.DataFrame( + [[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"], index=["R", "L"] + ) + native_other_df = native_pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + index=["A", "B", "C"], + ) + native_left, native_right = native_df.align( + native_other_df, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + df = pd.DataFrame(native_df) + other_df = pd.DataFrame(native_other_df) + left, right = df.align(other_df, join=join, axis=0) + assert_frame_equal(left, native_left) + assert_frame_equal(right, native_right) + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_diff_index_axis0(join): + native_df = native_pd.DataFrame( + [[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"], index=[10, 20] + ) + native_other_df = native_pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + index=["one", "two", "three"], + ) + native_left, native_right = native_df.align( + native_other_df, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + df = pd.DataFrame(native_df) + other_df = pd.DataFrame(native_other_df) + left, right = df.align(other_df, join=join, axis=0) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(left, native_left) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(right, native_right) + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_with_nulls_axis0(join): + native_df = native_pd.DataFrame( + [[1, 2, np.nan, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"] + ) + native_other_df = native_pd.DataFrame( + [[10, 20, 30, np.nan], [60, np.nan, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + native_left, native_right = native_df.align( + native_other_df, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + df = pd.DataFrame(native_df) + other_df = pd.DataFrame(native_other_df) + left, right = df.align(other_df, join=join, axis=0) + assert_frame_equal(left, native_left) + assert_frame_equal(right, native_right) + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_with_all_null_row_axis0(join): + native_df = native_pd.DataFrame( + [[1, 2, np.nan, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"] + ) + native_other_df = native_pd.DataFrame( + [[10, 20, 30, np.nan], [60, np.nan, 80, 90], [np.nan, np.nan, np.nan, np.nan]], + columns=["A", "B", "C", "D"], + ) + native_left, native_right = native_df.align( + native_other_df, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + df = pd.DataFrame(native_df) + other_df = pd.DataFrame(native_other_df) + left, right = df.align(other_df, join=join, axis=0) + assert_frame_equal(left, native_left) + assert_frame_equal(right, native_right) + + +@sql_count_checker(query_count=0) +def test_align_frame_with_nulls_axis_None_negative(): + df = pd.DataFrame([[1, 2, np.nan, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"]) + other_df = pd.DataFrame( + [[10, 20, 30, np.nan], [60, np.nan, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'axis=None'", + ): + left, right = df.align(other_df, join="outer", axis=None) + + +@sql_count_checker(query_count=0) +def test_align_frame_fill_value_negative(): + df = pd.DataFrame([[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"]) + other_df = pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'fill_value'", + ): + left, right = df.align(other_df, join="outer", axis=0, fill_value="empty") + + +@sql_count_checker(query_count=0) +def test_align_frame_axis1_negative(): + df = pd.DataFrame( + [[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"], index=[1, 2] + ) + other_df = pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + index=[2, 3, 4], + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'axis=1'", + ): + left, right = df.align(other_df, join="outer", axis=1) + + +@sql_count_checker(query_count=0) +@pytest.mark.parametrize("level", [0, 1]) +def test_level_negative(level): + df = pd.DataFrame( + [[1], [2]], + index=pd.MultiIndex.from_tuples( + [("foo", "bah", "ack"), ("bar", "bas", "bar")], names=["a", "b", "c"] + ), + columns=["num"], + ) + other_df = pd.DataFrame( + [[2], [3]], + index=pd.Series(["foo", "bah"], name="a"), + columns=["num"], + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'level'", + ): + left, right = df.align(other_df, join="outer", axis=0, level=0) + + +@sql_count_checker(query_count=0) +def test_multiindex_negative(): + df = pd.DataFrame( + [[1], [2]], + index=pd.MultiIndex.from_tuples( + [("foo", "bah", "ack"), ("bar", "bas", "bar")], names=["a", "b", "c"] + ), + columns=["num"], + ) + other_df = pd.DataFrame( + [[2], [3]], + index=pd.Series(["foo", "bah"], name="a"), + columns=["num"], + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas doesn't support `align` with MultiIndex", + ): + left, right = df.align(other_df, join="outer", axis=0) + + +@sql_count_checker(query_count=0) +def test_align_frame_copy_negative(): + df = pd.DataFrame([[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"]) + other_df = pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'copy=False'", + ): + left, right = df.align(other_df, join="outer", axis=0, copy=False) + + +@sql_count_checker(query_count=0) +def test_align_frame_invalid_axis_negative(): + df = pd.DataFrame([[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"]) + other_df = pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + axis = 2 + with pytest.raises( + ValueError, + match=f"No axis named {axis} for object type DataFrame", + ): + left, right = df.align(other_df, join="outer", axis=axis) + + +@sql_count_checker(query_count=0) +def test_align_frame_deprecated_negative(): + df = pd.DataFrame([[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"]) + other_df = pd.DataFrame( + [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]], + columns=["A", "B", "C", "D"], + ) + for method in ["backfill", "bfill", "pad", "ffill"]: + with pytest.raises( + NotImplementedError, + match="The 'method', 'limit', and 'fill_axis' keywords in DataFrame.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead.", + ): + left, right = df.align(other_df, join="outer", method=method) + with pytest.raises( + NotImplementedError, + match="The 'method', 'limit', and 'fill_axis' keywords in DataFrame.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead.", + ): + left, right = df.align(other_df, join="outer", limit=5) + with pytest.raises( + NotImplementedError, + match="The 'method', 'limit', and 'fill_axis' keywords in DataFrame.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead.", + ): + left, right = df.align(other_df, join="outer", fill_axis=1) + with pytest.raises( + NotImplementedError, + match="The 'broadcast_axis' keyword in DataFrame.align is deprecated and will be removed in a future version.", + ): + left, right = df.align(other_df, join="outer", broadcast_axis=0) diff --git a/tests/integ/modin/series/test_align.py b/tests/integ/modin/series/test_align.py new file mode 100644 index 00000000000..e597f33c44a --- /dev/null +++ b/tests/integ/modin/series/test_align.py @@ -0,0 +1,167 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + + +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.utils import ( + assert_series_equal, + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, +) +from tests.integ.utils.sql_counter import sql_count_checker + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_series_axis0(join): + native_ser = native_pd.Series([1, 2, 3]) + native_other_ser = native_pd.Series([60, 70, 80, 90, 100, np.nan]) + native_left, native_right = native_ser.align( + native_other_ser, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + ser = pd.Series(native_ser) + other_ser = pd.Series(native_other_ser) + left, right = ser.align(other_ser, join=join, axis=0) + assert_series_equal(left, native_left) + assert_series_equal(right, native_right) + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_series_with_nulls_axis0(join): + native_ser = native_pd.Series([np.nan, np.nan, np.nan]) + native_other_ser = native_pd.Series([60, 70, 80, 90, 100, np.nan]) + native_left, native_right = native_ser.align( + native_other_ser, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + ser = pd.Series(native_ser) + other_ser = pd.Series(native_other_ser) + left, right = ser.align(other_ser, join=join, axis=0) + assert_series_equal(left, native_left) + assert_series_equal(right, native_right) + + +@sql_count_checker(query_count=2, join_count=2) +@pytest.mark.parametrize("join", ["outer", "inner", "left", "right"]) +def test_align_basic_series_reorder_index_axis0(join): + native_ser = native_pd.Series([1, 2, 3], index=["Z", "V", "W"]) + native_other_ser = native_pd.Series( + [ + 60, + 70, + 80, + 90, + ], + index=["G", "H", "M", "A"], + ) + native_left, native_right = native_ser.align( + native_other_ser, + join=join, + axis=0, + limit=None, + fill_axis=0, + broadcast_axis=None, + ) + ser = pd.Series(native_ser) + other_ser = pd.Series(native_other_ser) + left, right = ser.align(other_ser, join=join, axis=0) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(left, native_left) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(right, native_right) + + +@sql_count_checker(query_count=0) +def test_align_series_axis_None_negative(): + ser = pd.Series([1, 2, 3]) + other_ser = pd.Series([60, 70, 80, 90, 100, np.nan]) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'axis=None'", + ): + left, right = ser.align(other_ser, join="outer", axis=None) + + +@sql_count_checker(query_count=0) +def test_align_series_fill_value_negative(): + ser = pd.Series([1, 2, 3]) + other_ser = pd.Series([60, 70, 80, 90, 100, np.nan]) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'fill_value'", + ): + left, right = ser.align(other_ser, join="outer", axis=0, fill_value="empty") + + +@sql_count_checker(query_count=0) +def test_align_series_axis_1_negative(): + ser = pd.Series([1, 2, 3]) + other_ser = pd.Series([60, 70, 80, 90, 100, np.nan]) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'axis=1'", + ): + left, right = ser.align(other_ser, join="outer", axis=1) + + +@sql_count_checker(query_count=0) +def test_align_series_copy_negative(): + ser = pd.Series([1, 2, 3]) + other_ser = pd.Series([60, 70, 80, 90, 100, np.nan]) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas 'align' method doesn't support 'copy=False'", + ): + left, right = ser.align(other_ser, join="outer", copy=False) + + +@sql_count_checker(query_count=0) +def test_align_series_invalid_axis_negative(): + ser = pd.Series([1, 2, 3]) + other_ser = pd.Series([60, 70, 80, 90, 100, np.nan]) + axis = 2 + with pytest.raises( + ValueError, + match=f"No axis named {axis} for object type Series", + ): + left, right = ser.align(other_ser, join="outer", axis=axis) + + +@sql_count_checker(query_count=0) +def test_align_series_deprecated_negative(): + ser = pd.Series([1, 2, 3]) + other_ser = pd.Series([60, 70, 80, 90, 100, np.nan]) + for method in ["backfill", "bfill", "pad", "ffill"]: + with pytest.raises( + NotImplementedError, + match="The 'method', 'limit', and 'fill_axis' keywords in Series.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead.", + ): + left, right = ser.align(other_ser, join="outer", method=method) + with pytest.raises( + NotImplementedError, + match="The 'method', 'limit', and 'fill_axis' keywords in Series.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead.", + ): + left, right = ser.align(other_ser, join="outer", limit=5) + with pytest.raises( + NotImplementedError, + match="The 'method', 'limit', and 'fill_axis' keywords in Series.align are deprecated and will be removed in a future version. Call fillna directly on the returned objects instead.", + ): + left, right = ser.align(other_ser, join="outer", fill_axis=1) + with pytest.raises( + NotImplementedError, + match="The 'broadcast_axis' keyword in Series.align is deprecated and will be removed in a future version.", + ): + left, right = ser.align(other_ser, join="outer", broadcast_axis=0) diff --git a/tests/integ/modin/test_ordered_dataframe.py b/tests/integ/modin/test_ordered_dataframe.py index 0d1b0fba800..99db9566ec9 100644 --- a/tests/integ/modin/test_ordered_dataframe.py +++ b/tests/integ/modin/test_ordered_dataframe.py @@ -397,7 +397,7 @@ def test_join_with_column_conflict(session, df1, df2, how): @pytest.mark.parametrize( "how", - ["left", "outer"], + ["left", "outer", "inner"], ) @sql_count_checker(query_count=1, join_count=1) def test_align_on_matching_columns(session, how): @@ -499,6 +499,35 @@ def test_align_on_mismatch_columns(session, df1, df2, how): ) +@sql_count_checker(query_count=0) +def test_align_on_matching_columns_right_negative(session): + how = "right" + df1 = native_pd.DataFrame({"A": [3, 2, 3], "B": [2, 2, 1], "row_pos": [0, 1, 2]}) + df2 = native_pd.DataFrame( + {"A_2": [2, 3, 3], "B_2": [2, 2, 1], "row_pos_2": [0, 1, 2]} + ) + ordered_df1 = _create_ordered_dataframe( + session, df1, ordering_columns=['"row_pos"'], row_position_column='"row_pos"' + ) + ordered_df2 = _create_ordered_dataframe( + session, + df2, + ordering_columns=['"row_pos_2"'], + row_position_column='"row_pos_2"', + ) + + with pytest.raises( + ValueError, + match=f"how={how} is not valid argument for ordered_dataframe.align.", + ): + ordered_df1.align( + ordered_df2, + left_on_cols=['"B"', '"row_pos"'], + right_on_cols=['"B_2"', '"row_pos_2"'], + how=how, + ) + + @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"]) def test_self_join_on_row_position_column(ordered_df, how): for right in [ordered_df, ordered_df.select('"row_position"', '"a"')]: diff --git a/tests/unit/modin/test_unsupported.py b/tests/unit/modin/test_unsupported.py index 62acffc9ae4..219df66ef92 100644 --- a/tests/unit/modin/test_unsupported.py +++ b/tests/unit/modin/test_unsupported.py @@ -56,7 +56,6 @@ def test_unsupported_general(general_method, kwargs): @pytest.mark.parametrize( "df_method, kwargs", [ - ["align", {"other": ""}], ["asof", {"where": ""}], ["at_time", {"time": ""}], ["between_time", {"start_time": "", "end_time": ""}], @@ -133,7 +132,6 @@ def test_unsupported_df(df_method, kwargs): @pytest.mark.parametrize( "series_method, kwargs", [ - ["align", {"other": ""}], ["argmax", {}], ["argmin", {}], ["argsort", {}],