Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync APIDataSet from kedro's develop #184

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 61 additions & 53 deletions kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
"""
from typing import Any, Dict, Iterable, List, NoReturn, Union
from typing import Any, Dict, List, NoReturn, Tuple, Union

import requests
from kedro.io.core import AbstractDataSet, DataSetError
from requests import Session, sessions
from requests.auth import AuthBase

from kedro.io.core import AbstractDataSet, DataSetError

# NOTE: kedro.extras.datasets will be removed in Kedro 0.19.0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for this warning

# Any contribution to datasets should be made in kedro-datasets
# in kedro-plugins (https://github.com/kedro-org/kedro-plugins)


class APIDataSet(AbstractDataSet[None, requests.Response]):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
Expand Down Expand Up @@ -34,88 +40,89 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
data_catalog.html#use-the-data-catalog-with-the-code-api>`_:
::

>>> from kedro_datasets.api import APIDataSet
>>> from kedro.extras.datasets.api import APIDataSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw this has already been merged but I think this got copied over by mistake, I'll change it back in #189

>>>
>>>
>>> data_set = APIDataSet(
>>> url="https://quickstats.nass.usda.gov",
>>> params={
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> load_args={
>>> "params": {
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> },
>>> credentials=("username", "password")
>>> )
>>> data = data_set.load()
"""

# pylint: disable=too-many-arguments
def __init__(
self,
url: str,
method: str = "GET",
data: Any = None,
params: Dict[str, Any] = None,
headers: Dict[str, Any] = None,
auth: Union[Iterable[str], AuthBase] = None,
json: Union[List, Dict[str, Any]] = None,
timeout: int = 60,
credentials: Union[Iterable[str], AuthBase] = None,
load_args: Dict[str, Any] = None,
credentials: Union[Tuple[str, str], List[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.

Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
data: The request payload, used for POST, PUT, etc requests
https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests
params: The url parameters of the API.
https://requests.readthedocs.io/en/latest/user/quickstart/#passing-parameters-in-urls
headers: The HTTP headers.
https://requests.readthedocs.io/en/latest/user/quickstart/#custom-headers
auth: Anything ``requests`` accepts. Normally it's either ``('login', 'password')``,
or ``AuthBase``, ``HTTPBasicAuth`` instance for more complex cases. Any
iterable will be cast to a tuple.
json: The request payload, used for POST, PUT, etc requests, passed in
to the json kwarg in the requests object.
https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests
timeout: The wait time in seconds for a response, defaults to 1 minute.
https://requests.readthedocs.io/en/latest/user/quickstart/#timeouts
credentials: same as ``auth``. Allows specifying ``auth`` secrets in
credentials.yml.

load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
credentials: Allows specifying secrets in credentials.yml.
Expected format is ``('login', 'password')`` if given as a tuple or list.
An ``AuthBase`` instance can be provided for more complex cases.
Raises:
ValueError: if both ``credentials`` and ``auth`` are specified.
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are specified.
"""
super().__init__()

if credentials is not None and auth is not None:
self._load_args = load_args or {}
self._load_args_auth = self._load_args.pop("auth", None)

if credentials is not None and self._load_args_auth is not None:
raise ValueError("Cannot specify both auth and credentials.")

auth = credentials or auth
self._auth = credentials or self._load_args_auth

if isinstance(auth, Iterable):
auth = tuple(auth)
if "cert" in self._load_args:
self._load_args["cert"] = self._convert_type(self._load_args["cert"])

if "timeout" in self._load_args:
self._load_args["timeout"] = self._convert_type(self._load_args["timeout"])

self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"data": data,
"params": params,
"headers": headers,
"auth": auth,
"json": json,
"timeout": timeout,
"auth": self._convert_type(self._auth),
**self._load_args,
}

@staticmethod
def _convert_type(value: Any):
"""
From the Data Catalog, iterables are provided as Lists.
However, for some parameters in the Python requests library,
only Tuples are allowed.
"""
if isinstance(value, List):
return tuple(value)
return value

def _describe(self) -> Dict[str, Any]:
return {**self._request_args}
# prevent auth from logging
request_args_cp = self._request_args.copy()
request_args_cp.pop("auth", None)
return request_args_cp

def _execute_request(self) -> requests.Response:
def _execute_request(self, session: Session) -> requests.Response:
try:
response = requests.request(**self._request_args)
response = session.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to fetch data", exc) from exc
Expand All @@ -125,12 +132,13 @@ def _execute_request(self) -> requests.Response:
return response

def _load(self) -> requests.Response:
return self._execute_request()
with sessions.Session() as session:
return self._execute_request(session)

def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")

def _exists(self) -> bool:
response = self._execute_request()

with sessions.Session() as session:
response = self._execute_request(session)
return response.ok
18 changes: 18 additions & 0 deletions kedro-datasets/kedro_datasets/pandas/parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import fsspec
import pandas as pd
import pyarrow.parquet as pq

from kedro.io.core import (
PROTOCOL_DELIMITER,
AbstractVersionedDataSet,
Expand Down Expand Up @@ -157,6 +159,22 @@ def _describe(self) -> Dict[str, Any]:
}

def _load(self) -> pd.DataFrame:
load_path = get_filepath_str(self._get_load_path(), self._protocol)

if self._fs.isdir(load_path):
# It doesn't work at least on S3 if root folder was created manually
# https://issues.apache.org/jira/browse/ARROW-7867
data = (
pq.ParquetDataset(load_path, filesystem=self._fs)
.read(**self._load_args)
.to_pandas()
)
else:
data = self._load_from_pandas()

return data

def _load_from_pandas(self):
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
Expand Down
Loading