Skip to content

Commit

Permalink
FIX-modin-project#5091: Handle pd.Grouper objects correctly (modin-pr…
Browse files Browse the repository at this point in the history
…oject#6174)

Signed-off-by: Karthik Velayutham <[email protected]>
  • Loading branch information
Karthik Velayutham committed May 22, 2023
1 parent 941a49b commit 161b49a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 14 deletions.
7 changes: 6 additions & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2851,7 +2851,12 @@ def _groupby_internal_columns(self, by, drop):
else:
if not isinstance(by, list):
by = [by] if by is not None else []
internal_by = [o for o in by if hashable(o) and o in self.columns]
internal_by = []
for o in by:
if isinstance(o, pandas.Grouper) and o.key in self.columns:
internal_by.append(o.key)
elif hashable(o) and o in self.columns:
internal_by.append(o)
internal_qc = (
[self.getitem_column_array(internal_by)] if len(internal_by) else []
)
Expand Down
29 changes: 18 additions & 11 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,35 +494,42 @@ def groupby(
drop = by._parent is self
idx_name = by.name
by = by._query_compiler
elif isinstance(by, pandas.Grouper):
drop = by.key in self
elif is_list_like(by):
# fastpath for multi column groupby
if axis == 0 and all(
(
(hashable(o) and (o in self))
or isinstance(o, Series)
or (isinstance(o, pandas.Grouper) and o.key in self)
or (is_list_like(o) and len(o) == len(self.axes[axis]))
)
for o in by
):
# We want to split 'by's into those that belongs to the self (internal_by)
# and those that doesn't (external_by)
internal_by, external_by = [], []
has_external = False
processed_by = []

for current_by in by:
if hashable(current_by):
internal_by.append(current_by)
if isinstance(current_by, pandas.Grouper):
processed_by.append(current_by)
has_external = True
elif hashable(current_by):
processed_by.append(current_by)
elif isinstance(current_by, Series):
if current_by._parent is self:
internal_by.append(current_by.name)
processed_by.append(current_by.name)
else:
external_by.append(current_by._query_compiler)
processed_by.append(current_by._query_compiler)
has_external = True
else:
external_by.append(current_by)
has_external = True
processed_by.append(current_by)

by = internal_by + external_by
by = processed_by

if len(external_by) == 0:
by = self[internal_by]._query_compiler
if not has_external:
by = self[processed_by]._query_compiler

drop = True
else:
Expand Down
12 changes: 10 additions & 2 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,19 @@ def _internal_by(self):
internal_by = tuple()
if self._drop:
if is_list_like(self._by):
internal_by = tuple(by for by in self._by if isinstance(by, str))
internal_by_list = []
for by in self._by:
if isinstance(by, str):
internal_by_list.append(by)
elif isinstance(by, pandas.Grouper):
internal_by_list.append(by.key)
internal_by = tuple(internal_by_list)
elif isinstance(self._by, pandas.Grouper):
internal_by = tuple([self._by.key])
else:
ErrorMessage.catch_bugs_and_request_email(
failure_condition=not isinstance(self._by, BaseQueryCompiler),
extra_log=f"When 'drop' is True, 'by' must be either list-like or a QueryCompiler, met: {type(self._by)}.",
extra_log=f"When 'drop' is True, 'by' must be either list-like, Grouper, or a QueryCompiler, met: {type(self._by)}.",
)
internal_by = tuple(self._by.columns)

Expand Down
37 changes: 37 additions & 0 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pandas
import numpy as np
from unittest import mock
import datetime

from modin.config.envvars import Engine
from modin.core.dataframe.pandas.partitioning.axis_partition import (
Expand Down Expand Up @@ -2330,3 +2331,39 @@ def test_skew_corner_cases():
# https://github.com/modin-project/modin/issues/5545
modin_df, pandas_df = create_test_dfs({"col0": [1, 1], "col1": [171, 137]})
eval_general(modin_df, pandas_df, lambda df: df.groupby("col0").skew())


@pytest.mark.parametrize(
"by",
[
pandas.Grouper(key="time_stamp", freq="3D"),
[pandas.Grouper(key="time_stamp", freq="1M"), "count"],
],
)
def test_groupby_with_grouper(by):
# See https://github.com/modin-project/modin/issues/5091 for more details
# Generate larger data so that it can handle partitioning cases
data = {
"id": [i for i in range(200)],
"time_stamp": [
pd.Timestamp("2000-01-02") + datetime.timedelta(days=x) for x in range(200)
],
}
for i in range(200):
data[f"count_{i}"] = [i, i + 1] * 100

modin_df, pandas_df = create_test_dfs(data)
eval_general(
modin_df,
pandas_df,
lambda df: df.groupby(by).mean(),
)


def test_groupby_preserves_by_order():
modin_df, pandas_df = create_test_dfs({"col0": [1, 1, 1], "col1": [10, 10, 10]})

modin_res = modin_df.groupby([pd.Series([100, 100, 100]), "col0"]).mean()
pandas_res = pandas_df.groupby([pandas.Series([100, 100, 100]), "col0"]).mean()

df_equals(modin_res, pandas_res)

0 comments on commit 161b49a

Please sign in to comment.