Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
Signed-off-by: Ankita Katiyar <[email protected]>
  • Loading branch information
ankatiyar authored Sep 26, 2024
2 parents 3ea21b6 + 06cf752 commit 85964c4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
3 changes: 3 additions & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
## Bug fixes and other changes
* Refactored all datasets to set `fs_args` defaults in the same way as `load_args` and `save_args` and not have hardcoded values in the save methods.
* Fixed bug related to loading/saving models from/to remote storage using `TensorFlowModelDataset`.
* Fixed deprecated load and save approaches of GBQTableDataset and GBQQueryDataset by invoking save and load directly over `pandas-gbq` lib

## Breaking Changes
## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:
* [Brandon Meek](https://github.com/bpmeek)
* [yury-fedotov](https://github.com/yury-fedotov)
* [gitgud5000](https://github.com/gitgud5000)
* [janickspirig](https://github.com/janickspirig)


# Release 4.1.0
## Major features and improvements
Expand Down
20 changes: 11 additions & 9 deletions kedro-datasets/kedro_datasets/pandas/gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import fsspec
import pandas as pd
import pandas_gbq as pd_gbq
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.oauth2.credentials import Credentials
Expand Down Expand Up @@ -138,16 +139,17 @@ def _describe(self) -> dict[str, Any]:

def _load(self) -> pd.DataFrame:
sql = f"select * from {self._dataset}.{self._table_name}" # nosec
self._load_args.setdefault("query", sql)
return pd.read_gbq(
self._load_args.setdefault("query_or_table", sql)
return pd_gbq.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**self._load_args,
)

def _save(self, data: pd.DataFrame) -> None:
data.to_gbq(
f"{self._dataset}.{self._table_name}",
pd_gbq.to_gbq(
dataframe=data,
destination_table=f"{self._dataset}.{self._table_name}",
project_id=self._project_id,
credentials=self._credentials,
**self._save_args,
Expand Down Expand Up @@ -176,7 +178,7 @@ def _validate_location(self):

class GBQQueryDataset(AbstractDataset[None, pd.DataFrame]):
"""``GBQQueryDataset`` loads data from a provided SQL query from Google
BigQuery. It uses ``pandas.read_gbq`` which itself uses ``pandas-gbq``
BigQuery. It uses ``pandas_gbq.read_gbq`` which itself uses ``pandas-gbq``
internally to read from BigQuery table. Therefore it supports all allowed
pandas options on ``read_gbq``.
Expand Down Expand Up @@ -274,7 +276,7 @@ def __init__( # noqa: PLR0913

# load sql query from arg or from file
if sql:
self._load_args["query"] = sql
self._load_args["query_or_table"] = sql
self._filepath = None
else:
# filesystem for loading sql file
Expand All @@ -291,7 +293,7 @@ def __init__( # noqa: PLR0913
def _describe(self) -> dict[str, Any]:
load_args = copy.deepcopy(self._load_args)
desc = {}
desc["sql"] = str(load_args.pop("query", None))
desc["sql"] = str(load_args.pop("query_or_table", None))
desc["filepath"] = str(self._filepath)
desc["load_args"] = str(load_args)

Expand All @@ -303,9 +305,9 @@ def _load(self) -> pd.DataFrame:
if self._filepath:
load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol)
with self._fs.open(load_path, mode="r") as fs_file:
load_args["query"] = fs_file.read()
load_args["query_or_table"] = fs_file.read()

return pd.read_gbq(
return pd_gbq.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**load_args,
Expand Down
43 changes: 31 additions & 12 deletions kedro-datasets/tests/pandas/test_gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def test_save_extra_params(self, gbq_dataset, save_args):
def test_load_missing_file(self, gbq_dataset, mocker):
"""Check the error when trying to load missing table."""
pattern = r"Failed while loading data from data set GBQTableDataset\(.*\)"
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.side_effect = ValueError
with pytest.raises(DatasetError, match=pattern):
gbq_dataset.load()
Expand Down Expand Up @@ -133,30 +135,43 @@ def test_save_load_data(self, gbq_dataset, dummy_dataframe, mocker):
"""Test saving and reloading the data set."""
sql = f"select * from {DATASET}.{TABLE_NAME}"
table_id = f"{DATASET}.{TABLE_NAME}"
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_to_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd_gbq.to_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe
mocked_df = mocker.Mock()

gbq_dataset.save(mocked_df)
loaded_data = gbq_dataset.load()

mocked_df.to_gbq.assert_called_once_with(
table_id, project_id=PROJECT, credentials=None, progress_bar=False
mocked_to_gbq.assert_called_once_with(
dataframe=mocked_df,
destination_table=table_id,
project_id=PROJECT,
credentials=None,
progress_bar=False,
)
mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=sql
project_id=PROJECT, credentials=None, query_or_table=sql
)
assert_frame_equal(dummy_dataframe, loaded_data)

@pytest.mark.parametrize("load_args", [{"query": "Select 1"}], indirect=True)
@pytest.mark.parametrize(
"load_args", [{"query_or_table": "Select 1"}], indirect=True
)
def test_read_gbq_with_query(self, gbq_dataset, dummy_dataframe, mocker, load_args):
"""Test loading data set with query in the argument."""
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe
loaded_data = gbq_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=load_args["query"]
project_id=PROJECT,
credentials=None,
query_or_table=load_args["query_or_table"],
)

assert_frame_equal(dummy_dataframe, loaded_data)
Expand Down Expand Up @@ -239,26 +254,30 @@ def test_credentials_propagation(self, mocker):

def test_load(self, mocker, gbq_sql_dataset, dummy_dataframe):
"""Test `load` method invocation"""
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe

loaded_data = gbq_sql_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=SQL_QUERY
project_id=PROJECT, credentials=None, query_or_table=SQL_QUERY
)

assert_frame_equal(dummy_dataframe, loaded_data)

def test_load_query_file(self, mocker, gbq_sql_file_dataset, dummy_dataframe):
"""Test `load` method invocation using a file as input query"""
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe

loaded_data = gbq_sql_file_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=SQL_QUERY
project_id=PROJECT, credentials=None, query_or_table=SQL_QUERY
)

assert_frame_equal(dummy_dataframe, loaded_data)
Expand Down

0 comments on commit 85964c4

Please sign in to comment.