Skip to content

Commit

Permalink
feat: Dataset API add save method (kedro-org#180)
Browse files Browse the repository at this point in the history
* [FEAT] add save method to APIDataset

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] create save_args parameter for api_dataset

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] add tests for socket + http errors

Signed-off-by: <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>

* [ENH] check save data is json

Signed-off-by: <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>

* [FIX] clean code

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] handle different data types

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] test coverage for exceptions

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] add examples in APIDataSet docstring

Signed-off-by: jmcdonnell <[email protected]>

* sync APIDataSet  from kedro's `develop` (kedro-org#184)

* Update APIDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync ParquetDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync Test

Signed-off-by: Nok Chan <[email protected]>

* Linting

Signed-off-by: Nok Chan <[email protected]>

* Revert Unnecessary ParquetDataSet Changes

Signed-off-by: Nok Chan <[email protected]>

* Sync release notes

Signed-off-by: Nok Chan <[email protected]>

---------

Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>

* [FIX] remove support for delete method

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] lint files

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] fix conflicts

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] remove fail save test

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] review suggestions

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] fix tests

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] reorder arguments

Signed-off-by: jmcdonnell <[email protected]>

---------

Signed-off-by: jmcdonnell <[email protected]>
Signed-off-by: <[email protected]>
Signed-off-by: Nok Chan <[email protected]>
Co-authored-by: jmcdonnell <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>
  • Loading branch information
3 people authored and kuriantom369 committed May 30, 2023
1 parent 74a211f commit 9d7820a
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 43 deletions.
10 changes: 6 additions & 4 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Major features and improvements:
* Added pandas 2.0 support.
* Added SQLAlchemy 2.0 support (and dropped support for versions below 1.4).
* Added a save method to the APIDataSet

* Reduced constructor arguments for `APIDataSet` by replacing most arguments with a single constructor argument `load_args`. This makes it more consistent with other Kedro DataSets and the underlying `requests` API, and automatically enables the full configuration domain: stream, certificates, proxies, and more.
* Relaxed Kedro version pin to `>=0.16`

Expand Down Expand Up @@ -42,10 +44,10 @@ Many thanks to the following Kedroids for contributing PRs to this release:

* Added the following new datasets:

| Type | Description | Location |
| ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |
| Type | Description | Location |
| -------------------------------- | --------------------------------------------------------------------------------------------------------------------- | -------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |

## Bug fixes and other changes
* Add `mssql` backend to the `SQLQueryDataSet` DataSet using `pyodbc` library.
Expand Down
138 changes: 113 additions & 25 deletions kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
"""
from typing import Any, Dict, List, NoReturn, Tuple, Union
import json as json_ # make pylint happy
from copy import deepcopy
from typing import Any, Dict, List, Tuple, Union

import requests
from kedro.io.core import AbstractDataSet, DataSetError
Expand All @@ -14,11 +16,10 @@


class APIDataSet(AbstractDataSet[None, requests.Response]):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
"""``APIDataSet`` loads/saves data from/to HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
Expand All @@ -34,10 +35,8 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
agg_level_desc: STATE,
year: 2000
Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_:
::
Example usage for the `Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_: ::
>>> from kedro.extras.datasets.api import APIDataSet
>>>
Expand All @@ -57,49 +56,101 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
>>> credentials=("username", "password")
>>> )
>>> data = data_set.load()
``APIDataSet`` can also be used to save output on a remote server using HTTP(S)
methods.
>>> example_table = '{"col1":["val1", "val2"], "col2":["val3", "val4"]}'
>>> data_set = APIDataSet(
method = "POST"
url = "url_of_remote_server",
save_args = {"chunk_size":1}
)
>>> data_set.save(example_table)
On initialisation, we can specify all the necessary parameters in the save args
dictionary. The default HTTP(S) method is POST but PUT is also supported. Two
important parameters to keep in mind are timeout and chunk_size. `timeout` defines
how long our program waits for a response after a request. `chunk_size`, is only
used if the input of save method is a list. It will divide the request into chunks
of size `chunk_size`. For example, here we will send two requests each containing
one row of our example DataFrame.
If the data passed to the save method is not a list, ``APIDataSet`` will check if it
can be loaded as JSON. If true, it will send the data unchanged in a single request.
Otherwise, the ``_save`` method will try to dump the data in JSON format and execute
the request.
"""

DEFAULT_SAVE_ARGS = {
"params": None,
"headers": None,
"auth": None,
"json": None,
"timeout": 60,
"chunk_size": 100,
}
# pylint: disable=too-many-arguments

def __init__(
self,
url: str,
method: str = "GET",
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
credentials: Union[Tuple[str, str], List[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.
Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
method: The method of the request. GET, POST, PUT are the only supported
methods
load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
credentials: Allows specifying secrets in credentials.yml.
Expected format is ``('login', 'password')`` if given as a tuple or list.
An ``AuthBase`` instance can be provided for more complex cases.
Expected format is ``('login', 'password')`` if given as a tuple or
list. An ``AuthBase`` instance can be provided for more complex cases.
save_args: Options for saving data on server. Includes all parameters used
during load method. Adds an optional parameter, ``chunk_size`` which
determines the size of the package sent at each request.
Raises:
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are specified.
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are
specified.
"""
super().__init__()

self._load_args = load_args or {}
self._load_args_auth = self._load_args.pop("auth", None)
# GET method means load
if method == "GET":
self._params = load_args or {}

# PUT, POST, DELETE means save
elif method in ["PUT", "POST"]:
self._params = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._params.update(save_args)
self._chunk_size = self._params.pop("chunk_size", 1)
else:
raise ValueError("Only GET, POST and PUT methods are supported")

self._param_auth = self._params.pop("auth", None)

if credentials is not None and self._load_args_auth is not None:
if credentials is not None and self._param_auth is not None:
raise ValueError("Cannot specify both auth and credentials.")

self._auth = credentials or self._load_args_auth
self._auth = credentials or self._param_auth

if "cert" in self._load_args:
self._load_args["cert"] = self._convert_type(self._load_args["cert"])
if "cert" in self._params:
self._params["cert"] = self._convert_type(self._params["cert"])

if "timeout" in self._load_args:
self._load_args["timeout"] = self._convert_type(self._load_args["timeout"])
if "timeout" in self._params:
self._params["timeout"] = self._convert_type(self._params["timeout"])

self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"auth": self._convert_type(self._auth),
**self._load_args,
**self._params,
}

@staticmethod
Expand Down Expand Up @@ -131,11 +182,48 @@ def _execute_request(self, session: Session) -> requests.Response:
return response

def _load(self) -> requests.Response:
with sessions.Session() as session:
return self._execute_request(session)
if self._request_args["method"] == "GET":
with sessions.Session() as session:
return self._execute_request(session)

raise DataSetError("Only GET method is supported for load")

def _execute_save_with_chunks(
self,
json_data: List[Dict[str, Any]],
) -> requests.Response:
chunk_size = self._chunk_size
n_chunks = len(json_data) // chunk_size + 1

for i in range(n_chunks):
send_data = json_data[i * chunk_size : (i + 1) * chunk_size]
response = self._execute_save_request(json_data=send_data)

return response

def _execute_save_request(self, json_data: Any) -> requests.Response:
try:
json_.loads(json_data)
except TypeError:
self._request_args["json"] = json_.dumps(json_data)
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to send data", exc) from exc

except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
return response

def _save(self, data: Any) -> requests.Response:
if self._request_args["method"] in ["PUT", "POST"]:
if isinstance(data, list):
return self._execute_save_with_chunks(json_data=data)

return self._execute_save_request(json_data=data)

def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")
raise DataSetError("Use PUT or POST methods for save")

def _exists(self) -> bool:
with sessions.Session() as session:
Expand Down
2 changes: 0 additions & 2 deletions kedro-datasets/kedro_datasets/pandas/generic_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ def _ensure_file_system_target(self) -> None:
)

def _load(self) -> pd.DataFrame:

self._ensure_file_system_target()

load_path = get_filepath_str(self._get_load_path(), self._protocol)
Expand All @@ -196,7 +195,6 @@ def _load(self) -> pd.DataFrame:
)

def _save(self, data: pd.DataFrame) -> None:

self._ensure_file_system_target()

save_path = get_filepath_str(self._get_save_path(), self._protocol)
Expand Down
1 change: 0 additions & 1 deletion kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def __init__(

# Update properties in load_args and save_args with credentials.
if credentials is not None:

# Check credentials for bad inputs.
for cred_key, cred_value in credentials.items():
if cred_value is None:
Expand Down
Loading

0 comments on commit 9d7820a

Please sign in to comment.