Skip to content

Commit

Permalink
Merge branch 'main' into feature/add_geopandas_parquet_dataset
Browse files Browse the repository at this point in the history
Signed-off-by: Harm Matthias Harms <[email protected]>
  • Loading branch information
harm-matthias-harms authored Oct 1, 2024
2 parents 978ad6c + ca881f1 commit ff621af
Show file tree
Hide file tree
Showing 35 changed files with 854 additions and 31 deletions.
13 changes: 10 additions & 3 deletions kedro-airflow/kedro_airflow/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@
from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import Pipeline

try:
from kedro.io import CatalogProtocol
except ImportError: # pragma: no cover
pass


def _is_memory_dataset(catalog, dataset_name: str) -> bool:
if dataset_name not in catalog:
return True
return False


def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]:
def get_memory_datasets(
catalog: CatalogProtocol | DataCatalog, pipeline: Pipeline
) -> set[str]:
"""Gather all datasets in the pipeline that are of type MemoryDataset, excluding 'parameters'."""
return {
dataset_name
Expand All @@ -21,7 +28,7 @@ def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]:


def create_adjacency_list(
catalog: DataCatalog, pipeline: Pipeline
catalog: CatalogProtocol | DataCatalog, pipeline: Pipeline
) -> tuple[dict[str, set], dict[str, set]]:
"""
Builds adjacency list (adj_list) to search connected components - undirected graph,
Expand All @@ -48,7 +55,7 @@ def create_adjacency_list(


def group_memory_nodes(
catalog: DataCatalog, pipeline: Pipeline
catalog: CatalogProtocol | DataCatalog, pipeline: Pipeline
) -> tuple[dict[str, list[Node]], dict[str, list[str]]]:
"""
Nodes that are connected through MemoryDatasets cannot be distributed across
Expand Down
7 changes: 7 additions & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
| Type | Description | Location |
|-------------------------------------|-----------------------------------------------------------|-----------------------------------------|
| `pytorch.PyTorchDataset` | A dataset for securely saving and loading PyTorch models | `kedro_datasets_experimental.pytorch` |
| `prophet.ProphetModelDataset` | A dataset for Meta's Prophet model for time series forecasting | `kedro_datasets_experimental.prophet` |


* Added the following new core datasets:

Expand All @@ -14,6 +16,8 @@

## Bug fixes and other changes
* Refactored all datasets to set `fs_args` defaults in the same way as `load_args` and `save_args` and not have hardcoded values in the save methods.
* Fixed bug related to loading/saving models from/to remote storage using `TensorFlowModelDataset`.
* Fixed deprecated load and save approaches of GBQTableDataset and GBQQueryDataset by invoking save and load directly over `pandas-gbq` lib

## Breaking Changes
* Replaced the `geopandas.GeoJSONDataset` with `geopandas.GenericDataset` to support parquet and feather file formats.
Expand All @@ -22,6 +26,9 @@
Many thanks to the following Kedroids for contributing PRs to this release:
* [Brandon Meek](https://github.com/bpmeek)
* [yury-fedotov](https://github.com/yury-fedotov)
* [gitgud5000](https://github.com/gitgud5000)
* [janickspirig](https://github.com/janickspirig)
* [Galen Seilis](https://github.com/galenseilis)
* [harm-matthias-harms](https://github.com/harm-matthias-harms)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ kedro_datasets_experimental
langchain.ChatOpenAIDataset
langchain.OpenAIEmbeddingsDataset
netcdf.NetCDFDataset
prophet.ProphetModelDataset
pytorch.PyTorchDataset
rioxarray.GeoTIFFDataset
2 changes: 2 additions & 0 deletions kedro-datasets/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@
"xarray.core.dataset.Dataset",
"xarray.core.dataarray.DataArray",
"torch.nn.modules.module.Module",
"prophet.forecaster.Prophet",
"Prophet",
),
"py:data": (
"typing.Any",
Expand Down
20 changes: 11 additions & 9 deletions kedro-datasets/kedro_datasets/pandas/gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import fsspec
import pandas as pd
import pandas_gbq as pd_gbq
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.oauth2.credentials import Credentials
Expand Down Expand Up @@ -138,16 +139,17 @@ def _describe(self) -> dict[str, Any]:

def _load(self) -> pd.DataFrame:
sql = f"select * from {self._dataset}.{self._table_name}" # nosec
self._load_args.setdefault("query", sql)
return pd.read_gbq(
self._load_args.setdefault("query_or_table", sql)
return pd_gbq.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**self._load_args,
)

def _save(self, data: pd.DataFrame) -> None:
data.to_gbq(
f"{self._dataset}.{self._table_name}",
pd_gbq.to_gbq(
dataframe=data,
destination_table=f"{self._dataset}.{self._table_name}",
project_id=self._project_id,
credentials=self._credentials,
**self._save_args,
Expand Down Expand Up @@ -176,7 +178,7 @@ def _validate_location(self):

class GBQQueryDataset(AbstractDataset[None, pd.DataFrame]):
"""``GBQQueryDataset`` loads data from a provided SQL query from Google
BigQuery. It uses ``pandas.read_gbq`` which itself uses ``pandas-gbq``
BigQuery. It uses ``pandas_gbq.read_gbq`` which itself uses ``pandas-gbq``
internally to read from BigQuery table. Therefore it supports all allowed
pandas options on ``read_gbq``.
Expand Down Expand Up @@ -274,7 +276,7 @@ def __init__( # noqa: PLR0913

# load sql query from arg or from file
if sql:
self._load_args["query"] = sql
self._load_args["query_or_table"] = sql
self._filepath = None
else:
# filesystem for loading sql file
Expand All @@ -291,7 +293,7 @@ def __init__( # noqa: PLR0913
def _describe(self) -> dict[str, Any]:
load_args = copy.deepcopy(self._load_args)
desc = {}
desc["sql"] = str(load_args.pop("query", None))
desc["sql"] = str(load_args.pop("query_or_table", None))
desc["filepath"] = str(self._filepath)
desc["load_args"] = str(load_args)

Expand All @@ -303,9 +305,9 @@ def _load(self) -> pd.DataFrame:
if self._filepath:
load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol)
with self._fs.open(load_path, mode="r") as fs_file:
load_args["query"] = fs_file.read()
load_args["query_or_table"] = fs_file.read()

return pd.read_gbq(
return pd_gbq.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**load_args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _load(self) -> tf.keras.Model:
# We assume .keras
path = str(PurePath(tempdir) / TEMPORARY_KERAS_FILE) # noqa: PLW2901

self._fs.copy(load_path, path)
self._fs.get(load_path, path)

# Pass the local temporary directory/file path to keras.load_model
device_name = self._load_args.pop("tf_device", None)
Expand All @@ -169,7 +169,7 @@ def _save(self, data: tf.keras.Model) -> None:

# Use fsspec to take from local tempfile directory/file and
# put in ArbitraryFileSystem
self._fs.copy(path, save_path)
self._fs.put(path, save_path)

def _exists(self) -> bool:
try:
Expand Down
11 changes: 11 additions & 0 deletions kedro-datasets/kedro_datasets_experimental/prophet/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""``JSONDataset`` implementation to load/save data from/to a Prophet model file."""

from typing import Any

import lazy_loader as lazy

ProphetDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__, submod_attrs={"prophet_dataset": ["ProphetModelDataset"]}
)
121 changes: 121 additions & 0 deletions kedro-datasets/kedro_datasets_experimental/prophet/prophet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from __future__ import annotations

from typing import Any

from kedro.io.core import Version, get_filepath_str
from prophet import Prophet
from prophet.serialize import model_from_json, model_to_json

from kedro_datasets.json import JSONDataset


class ProphetModelDataset(JSONDataset):
"""``ProphetModelDataset`` loads/saves Facebook Prophet models to a JSON file using an
underlying filesystem (e.g., local, S3, GCS). It uses Prophet's built-in
serialization to handle the JSON file.
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog_yaml_examples.html>`_:
.. code-block:: yaml
model:
type: custom_datasets.ProphetModelDataset
filepath: gcs://your_bucket/model.json
fs_args:
project: my-project
credentials: my_gcp_credentials
Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
advanced_data_catalog_usage.html>`_:
.. code-block:: pycon
>>> from kedro_datasets_experimental.prophet import ProphetModelDataset
>>> from prophet import Prophet
>>> import pandas as pd
>>>
>>> df = pd.DataFrame({
>>> "ds": ["2024-01-01", "2024-01-02", "2024-01-03"],
>>> "y": [100, 200, 300]
>>> })
>>>
>>> model = Prophet()
>>> model.fit(df)
>>> dataset = ProphetModelDataset(filepath="path/to/model.json")
>>> dataset.save(model)
>>> reloaded_model = dataset.load()
"""

def __init__( # noqa: PLR0913
self,
*,
filepath: str,
save_args: dict[str, Any] | None = None,
version: Version | None = None,
credentials: dict[str, Any] | None = None,
fs_args: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new instance of ``ProphetModelDataset`` pointing to a concrete JSON file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a JSON file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
save_args: json options for saving JSON files (arguments passed
into ```json.dump``). Here you can find all available arguments:
https://docs.python.org/3/library/json.html
All defaults are preserved, but "default_flow_style", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
to pass to the filesystem's `open` method through nested keys
`open_args_load` and `open_args_save`.
Here you can find all available arguments for `open`:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
super().__init__(
filepath=filepath,
save_args=save_args,
version=version,
credentials=credentials,
fs_args=fs_args,
metadata=metadata,
)

def _load(self) -> Prophet:
"""Loads a Prophet model from a JSON file.
Returns:
Prophet: A deserialized Prophet model.
"""
load_path = get_filepath_str(self._get_load_path(), self._protocol)

with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
return model_from_json(fs_file.read())

def _save(self, data: Prophet) -> None:
"""Saves a Prophet model to a JSON file.
Args:
data: The Prophet model instance to be serialized and saved.
"""
save_path = get_filepath_str(self._get_save_path(), self._protocol)

with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
fs_file.write(model_to_json(data))

self._invalidate_cache()
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ def _describe(self) -> dict[str, Any]:

def _load(self) -> Any:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
return torch.load(load_path, **self._fs_open_args_load)
return torch.load(load_path, **self._fs_open_args_load) #nosec: B614

def _save(self, data: torch.nn.Module) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
torch.save(data.state_dict(), save_path, **self._fs_open_args_save)
torch.save(data.state_dict(), save_path, **self._fs_open_args_save) #nosec: B614

self._invalidate_cache()

Expand Down
34 changes: 34 additions & 0 deletions kedro-datasets/kedro_datasets_experimental/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
This file contains the fixtures that are reusable by any tests within
this directory. You don't need to import the fixtures as pytest will
discover them automatically. More info here:
https://docs.pytest.org/en/latest/fixture.html
"""

from kedro.io.core import generate_timestamp
from pytest import fixture


@fixture(params=[None])
def load_version(request):
return request.param


@fixture(params=[None])
def save_version(request):
return request.param or generate_timestamp()


@fixture(params=[None])
def load_args(request):
return request.param


@fixture(params=[None])
def save_args(request):
return request.param


@fixture(params=[None])
def fs_args(request):
return request.param
Loading

0 comments on commit ff621af

Please sign in to comment.