Skip to content

Commit

Permalink
sync APIDataSet from kedro's develop (kedro-org#184)
Browse files Browse the repository at this point in the history
* Update APIDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync ParquetDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync Test

Signed-off-by: Nok Chan <[email protected]>

* Linting

Signed-off-by: Nok Chan <[email protected]>

* Revert Unnecessary ParquetDataSet Changes

Signed-off-by: Nok Chan <[email protected]>

* Sync release notes

Signed-off-by: Nok Chan <[email protected]>

---------

Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>
  • Loading branch information
noklam authored and jmcdonnell committed May 11, 2023
1 parent 245c63b commit 7ce0bbe
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 186 deletions.
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Added SQLAlchemy 2.0 support (and dropped support for versions below 1.4).
* Added a save method to the APIDataSet

* Reduced constructor arguments for `APIDataSet` by replacing most arguments with a single constructor argument `load_args`. This makes it more consistent with other Kedro DataSets and the underlying `requests` API, and automatically enables the full configuration domain: stream, certificates, proxies, and more.
## Bug fixes and other changes
* Relaxed `delta-spark` upper bound to allow compatibility with Spark 3.1.x and 3.2.x.

Expand Down
159 changes: 57 additions & 102 deletions kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
"""
import json as json_ # make pylint happy
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Union
from typing import Any, Dict, List, Tuple, Union

import requests
from kedro.io.core import AbstractDataSet, DataSetError
from requests import Session, sessions
from requests.auth import AuthBase

# NOTE: kedro.extras.datasets will be removed in Kedro 0.19.0.
# Any contribution to datasets should be made in kedro-datasets
# in kedro-plugins (https://github.com/kedro-org/kedro-plugins)


class APIDataSet(AbstractDataSet[None, requests.Response]):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
Expand All @@ -27,19 +32,22 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
Example usage for the `Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_: ::
>>> from kedro_datasets.api import APIDataSet
>>> from kedro.extras.datasets.api import APIDataSet
>>>
>>>
>>> data_set = APIDataSet(
>>> url="https://quickstats.nass.usda.gov",
>>> params={
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> load_args={
>>> "params": {
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> },
>>> credentials=("username", "password")
>>> )
>>> data = data_set.load()
Expand Down Expand Up @@ -89,76 +97,65 @@ def __init__(
self,
url: str,
method: str = "GET",
data: Any = None,
params: Dict[str, Any] = None,
headers: Dict[str, Any] = None,
auth: Union[Iterable[str], AuthBase] = None,
json: Union[List, Dict[str, Any]] = None,
timeout: int = 60,
credentials: Union[Iterable[str], AuthBase] = None,
save_args: Dict[str, Any] = None,
load_args: Dict[str, Any] = None,
credentials: Union[Tuple[str, str], List[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.
Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT,
DELETE, HEAD, etc... data: The request payload, used for POST, PUT, etc
requests
https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests
params: The url parameters of the API.
https://requests.readthedocs.io/en/latest/user/quickstart/#passing-parameters-in-urls
headers: The HTTP headers.
https://requests.readthedocs.io/en/latest/user/quickstart/#custom-headers
auth: Anything ``requests`` accepts. Normally it's either ``('login',
'password')``,
or ``AuthBase``, ``HTTPBasicAuth`` instance for more complex cases. Any
iterable will be cast to a tuple.
json: The request payload, used for POST, PUT, etc requests, passed in
to the json kwarg in the requests object.
https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests
timeout: The wait time in seconds for a response, defaults to 1 minute.
https://requests.readthedocs.io/en/latest/user/quickstart/#timeouts
credentials: same as ``auth``. Allows specifying ``auth`` secrets in
credentials.yml.
save_args: Options for saving data on server. Includes all parameters used
during load method. Adds an optional parameter, ``chunk_size`` which determines the
size of the package sent at each request.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
credentials: Allows specifying secrets in credentials.yml.
Expected format is ``('login', 'password')`` if given as a tuple or list.
An ``AuthBase`` instance can be provided for more complex cases.
Raises:
ValueError: if both ``credentials`` and ``auth`` are specified.
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are specified.
"""
super().__init__()

self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)

if save_args is not None:
self._save_args.update(save_args)
self._load_args = load_args or {}
self._load_args_auth = self._load_args.pop("auth", None)

if credentials is not None and auth is not None:
if credentials is not None and self._load_args_auth is not None:
raise ValueError("Cannot specify both auth and credentials.")

auth = credentials or auth
self._auth = credentials or self._load_args_auth

if "cert" in self._load_args:
self._load_args["cert"] = self._convert_type(self._load_args["cert"])

if isinstance(auth, Iterable):
auth = tuple(auth)
if "timeout" in self._load_args:
self._load_args["timeout"] = self._convert_type(self._load_args["timeout"])

self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"data": data,
"params": params,
"headers": headers,
"auth": auth,
"json": json,
"timeout": timeout,
"auth": self._convert_type(self._auth),
**self._load_args,
}

@staticmethod
def _convert_type(value: Any):
"""
From the Data Catalog, iterables are provided as Lists.
However, for some parameters in the Python requests library,
only Tuples are allowed.
"""
if isinstance(value, List):
return tuple(value)
return value

def _describe(self) -> Dict[str, Any]:
return {**self._request_args}
# prevent auth from logging
request_args_cp = self._request_args.copy()
request_args_cp.pop("auth", None)
return request_args_cp

def _execute_request(self) -> requests.Response:
def _execute_request(self, session: Session) -> requests.Response:
try:
response = requests.request(**self._request_args)
response = session.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to fetch data", exc) from exc
Expand All @@ -170,50 +167,8 @@ def _execute_request(self) -> requests.Response:
def _load(self) -> requests.Response:
return self._execute_request()

def _execute_save_with_chunks(
self,
json_data: List[Dict[str, Any]],
) -> requests.Response:
chunk_size = self._save_args["chunk_size"]
n_chunks = len(json_data) // chunk_size + 1

for i in range(n_chunks):
send_data = json_data[i * chunk_size : (i + 1) * chunk_size]

self._save_args["json"] = send_data
try:
response = requests.request(**self._request_args)
response.raise_for_status()

except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to send data", exc) from exc

except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
return response

def _execute_save_request(self, json_data: Any) -> requests.Response:
self._save_args["json"] = json_data
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to send data", exc) from exc

except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
return response

def _save(self, data: Any) -> requests.Response:
# case where we have a list of json data
if isinstance(data, list):
return self._execute_save_with_chunks(json_data=data)
try:
json_.loads(data)
except TypeError:
data = json_.dumps(data)

return self._execute_save_request(json_data=data)
def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")

def _exists(self) -> bool:
response = self._execute_request()
Expand Down
Loading

0 comments on commit 7ce0bbe

Please sign in to comment.