Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-1.9] [python] Ingestion performance #2439

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apis/python/src/tiledbsoma/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
from ._constants import SOMA_JOINID
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import DoesNotExistError, SOMAError
from ._exception import AlreadyExistsError, DoesNotExistError, SOMAError
from ._experiment import Experiment
from ._factory import open
from ._general_utilities import (
Expand All @@ -171,6 +171,7 @@
__version__ = get_implementation_version()

__all__ = [
"AlreadyExistsError",
"AxisColumnNames",
"AxisQuery",
"Collection",
Expand Down
28 changes: 20 additions & 8 deletions apis/python/src/tiledbsoma/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
from ._common_nd_array import NDArray
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import SOMAError, is_does_not_exist_error
from ._exception import (
AlreadyExistsError,
SOMAError,
is_already_exists_error,
is_does_not_exist_error,
)
from ._funcs import typeguard_ignore
from ._sparse_nd_array import SparseNDArray
from ._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -112,20 +117,27 @@ def create(
the context.

Raises:
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.

Lifecycle:
Experimental.
"""
context = _validate_soma_tiledb_context(context)
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

@classmethod
def open(
Expand Down
18 changes: 13 additions & 5 deletions apis/python/src/tiledbsoma/_common_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tiledb

from . import _arrow_types, _util
from ._exception import AlreadyExistsError, is_already_exists_error
from ._tiledb_array import TileDBArray
from ._types import OpenTimestamp
from .options._soma_tiledb_context import (
Expand Down Expand Up @@ -77,6 +78,8 @@ def create(
If the ``type`` is unsupported.
ValueError:
If the ``shape`` is unsupported.
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.

Expand All @@ -91,11 +94,16 @@ def create(
context,
is_sparse=cls.is_sparse,
)
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

@property
def shape(self) -> Tuple[int, ...]:
Expand Down
18 changes: 13 additions & 5 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from . import _arrow_types, _util
from . import pytiledbsoma as clib
from ._constants import SOMA_JOINID
from ._exception import AlreadyExistsError, is_already_exists_error
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._tdb_handles import DataFrameWrapper
Expand Down Expand Up @@ -187,6 +188,8 @@ def create(
an undefined column name.
ValueError:
If the ``schema`` specifies illegal column names.
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.

Expand Down Expand Up @@ -217,11 +220,16 @@ def create(
TileDBCreateOptions.from_platform_config(platform_config),
context,
)
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

def keys(self) -> Tuple[str, ...]:
"""Returns the names of the columns when read back as a dataframe.
Expand Down
30 changes: 30 additions & 0 deletions apis/python/src/tiledbsoma/_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ def is_does_not_exist_error(e: tiledb.TileDBError) -> bool:
return False


class AlreadyExistsError(SOMAError):
"""Raised when attempting to create an already existing SOMA object.

Lifecycle: experimental
"""

pass


def is_already_exists_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return true if it indicates the object already exists

Lifecycle: experimental

Example:
try:
tiledb.Array.create(uri, schema, ctx=ctx)
...
except tiledb.TileDBError as e:
if is_already_exists_error(e):
...
raise e
"""
stre = str(e)
# Local-disk, S3, and TileDB Cloud exceptions all have the substring
# "already exists". Here we lower-case the exception message just
# in case someone ever uppercases it on the other end.
return "already exists" in stre.lower()


def is_duplicate_group_key_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return try if it indicates a duplicate member
add request in a tiledb.Group.
Expand Down
40 changes: 20 additions & 20 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
from .._collection import AnyTileDBCollection, CollectionBase
from .._common_nd_array import NDArray
from .._constants import SOMA_JOINID
from .._exception import DoesNotExistError, SOMAError
from .._exception import (
AlreadyExistsError,
DoesNotExistError,
SOMAError,
)
from .._tdb_handles import RawHandle
from .._tiledb_array import TileDBArray
from .._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -984,17 +988,13 @@ def _create_or_open_collection(
additional_metadata: AdditionalMetadata = None,
) -> CollectionBase[_TDBO]:
try:
thing = cls.open(uri, "w", context=context)
except DoesNotExistError:
pass # This is always OK; make a new one.
else:
coll = cls.create(uri, context=context)
except AlreadyExistsError:
# It already exists. Are we resuming?
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{uri} already exists")
add_metadata(thing, additional_metadata)
return thing
coll = cls.open(uri, "w", context=context)

coll = cls.create(uri, context=context)
add_metadata(coll, additional_metadata)
return coll

Expand Down Expand Up @@ -1194,15 +1194,18 @@ def _write_dataframe_impl(
)

try:
soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)
except DoesNotExistError:
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)

if ingestion_params.skip_existing_nonempty_domain:
storage_ned = _read_nonempty_domain(soma_df)
dim_range = ((int(df.index.min()), int(df.index.max())),)
Expand All @@ -1212,8 +1215,6 @@ def _write_dataframe_impl(
_util.format_elapsed(s, f"SKIPPED {soma_df.uri}"),
)
return soma_df
elif ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -1291,10 +1292,6 @@ def _create_from_matrix(
logging.log_io(None, f"START WRITING {uri}")

try:
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)
except DoesNotExistError:
# A SparseNDArray must be appendable in soma.io.
shape = [None for _ in matrix.shape] if cls.is_sparse else matrix.shape
soma_ndarray = cls.create(
Expand All @@ -1304,9 +1301,12 @@ def _create_from_matrix(
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_ndarray.uri} already exists")
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -2749,15 +2749,15 @@ def _ingest_uns_ndarray(
logging.log_io(msg, msg)
return
try:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)
except DoesNotExistError:
soma_arr = DenseNDArray.create(
arr_uri,
type=pa_dtype,
shape=value.shape,
platform_config=platform_config,
context=context,
)
except AlreadyExistsError:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)

# If resume mode: don't re-write existing data. This is the user's explicit request
# that we not re-write things that have already been written.
Expand Down
Loading