Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): Use polars parquet reader for delta scan #19103

Merged
58 changes: 23 additions & 35 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from polars._utils.deprecation import issue_deprecation_warning
from polars.convert import from_arrow
from polars.datatypes import Null, Time
from polars.datatypes.convert import unpack_dtypes
Expand All @@ -26,9 +25,10 @@ def read_delta(
*,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
rechunk: bool | None = None,
rechunk: bool = False,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
) -> DataFrame:
"""
Expand Down Expand Up @@ -62,12 +62,11 @@ def read_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
use_pyarrow
Flag to enable pyarrow dataset reads.
pyarrow_options
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
Keyword arguments while converting a Delta lake Table to pyarrow table.

.. deprecated:: 1.10.0
Remove pyarrow_options and use native polars filter, selection.

Returns
-------
DataFrame
Expand Down Expand Up @@ -138,33 +137,14 @@ def read_delta(
... table_path, delta_table_options=delta_table_options
... ) # doctest: +SKIP
"""
if pyarrow_options is not None:
issue_deprecation_warning(
message="`pyarrow_options` are deprecated, polars native parquet reader is used when not passing pyarrow options.",
version="1.13",
)
dl_tbl = _get_delta_lake_table(
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)
if rechunk is None:
rechunk = False
return from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
) # type: ignore[return-value]

if rechunk is not None:
issue_deprecation_warning(
message="`rechunk` is deprecated, this is automatically done now.",
version="1.13",
)
df = scan_delta(
source=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
use_pyarrow=use_pyarrow,
pyarrow_options=pyarrow_options,
rechunk=rechunk,
)

if columns is not None:
Expand All @@ -178,7 +158,9 @@ def scan_delta(
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
rechunk: bool = False,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
) -> LazyFrame:
"""
Lazily read from a Delta lake table.
Expand All @@ -203,13 +185,15 @@ def scan_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
use_pyarrow
Flag to enable pyarrow dataset reads.
pyarrow_options
Keyword arguments while converting a Delta lake Table to pyarrow table.
Use this parameter when filtering on partitioned columns or to read
from a 'fsspec' supported filesystem.

.. deprecated:: 1.10.0
Remove pyarrow_options and use native polars filter, selection.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.

Returns
-------
Expand Down Expand Up @@ -294,14 +278,15 @@ def scan_delta(
delta_table_options=delta_table_options,
)

if pyarrow_options is not None:
issue_deprecation_warning(
message="PyArrow options are deprecated, polars native parquet scanner is used when not passing pyarrow options.",
version="1.13",
)
if use_pyarrow:
pyarrow_options = pyarrow_options or {}
pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)

if pyarrow_options is not None:
msg = "To make use of pyarrow_options, set use_pyarrow to True"
raise ValueError(msg)

import pyarrow as pa
from deltalake.exceptions import DeltaProtocolError
from deltalake.table import (
Expand Down Expand Up @@ -331,6 +316,8 @@ def scan_delta(
msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner."
raise DeltaProtocolError(msg)

# Requires conversion through pyarrow table because there is no direct way yet to
# convert a delta schema into a polars schema
delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True)
polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr]
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
partition_columns = dl_tbl.metadata().partition_columns
Expand Down Expand Up @@ -361,6 +348,7 @@ def _split_schema(
allow_missing_columns=True,
hive_partitioning=len(partition_columns) > 0,
storage_options=storage_options,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
rechunk=rechunk,
)


Expand Down
Loading