Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] Ingestion performance #2434

Merged
merged 16 commits into from
Apr 12, 2024
3 changes: 2 additions & 1 deletion apis/python/src/tiledbsoma/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
from ._constants import SOMA_JOINID
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import DoesNotExistError, SOMAError
from ._exception import AlreadyExistsError, DoesNotExistError, SOMAError
from ._experiment import Experiment
from ._factory import open
from ._general_utilities import (
Expand All @@ -171,6 +171,7 @@
__version__ = get_implementation_version()

__all__ = [
"AlreadyExistsError",
"AxisColumnNames",
"AxisQuery",
"Collection",
Expand Down
26 changes: 18 additions & 8 deletions apis/python/src/tiledbsoma/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
from ._common_nd_array import NDArray
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import SOMAError, is_does_not_exist_error
from ._exception import (
AlreadyExistsError,
SOMAError,
is_already_exists_error,
is_does_not_exist_error,
)
from ._funcs import typeguard_ignore
from ._sparse_nd_array import SparseNDArray
from ._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -119,13 +124,18 @@
Experimental.
"""
context = _validate_soma_tiledb_context(context)
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

Check warning on line 138 in apis/python/src/tiledbsoma/_collection.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_collection.py#L138

Added line #L138 was not covered by tests

@classmethod
def open(
Expand Down
16 changes: 11 additions & 5 deletions apis/python/src/tiledbsoma/_common_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tiledb

from . import _arrow_types, _util
from ._exception import AlreadyExistsError, is_already_exists_error
from ._tiledb_array import TileDBArray
from ._types import OpenTimestamp
from .options._soma_tiledb_context import (
Expand Down Expand Up @@ -91,11 +92,16 @@
context,
is_sparse=cls.is_sparse,
)
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

Check warning on line 104 in apis/python/src/tiledbsoma/_common_nd_array.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_common_nd_array.py#L104

Added line #L104 was not covered by tests

@property
def shape(self) -> Tuple[int, ...]:
Expand Down
16 changes: 11 additions & 5 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from . import _arrow_types, _util
from . import pytiledbsoma as clib
from ._constants import SOMA_JOINID
from ._exception import AlreadyExistsError, is_already_exists_error
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._tdb_handles import DataFrameWrapper
Expand Down Expand Up @@ -217,11 +218,16 @@
TileDBCreateOptions.from_platform_config(platform_config),
context,
)
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

Check warning on line 230 in apis/python/src/tiledbsoma/_dataframe.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_dataframe.py#L230

Added line #L230 was not covered by tests

def keys(self) -> Tuple[str, ...]:
"""Returns the names of the columns when read back as a dataframe.
Expand Down
40 changes: 40 additions & 0 deletions apis/python/src/tiledbsoma/_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,46 @@
return False


class AlreadyExistsError(SOMAError):
"""Raised when attempting to create an already existing SOMA object.

Lifecycle: experimental
"""

pass


def is_already_exists_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return true if it indicates the object already exists

Lifecycle: experimental

Example:
XXX EDIT ME
johnkerl marked this conversation as resolved.
Show resolved Hide resolved
try:
with tiledb.open(uri):
...
except tiledb.TileDBError as e:
if is_does_not_exist_error(e):
...
raise e
"""
stre = str(e)
# Local-disk/S3 does-not-exist exceptions say 'Group does not exist'; TileDB Cloud
# does-not-exist exceptions are worded less clearly.
if (
"lready exists"
johnkerl marked this conversation as resolved.
Show resolved Hide resolved
in stre
# XXX
# or "Unrecognized array" in stre
# or "HTTP code 401" in stre
# or "HTTP code 404" in stre
johnkerl marked this conversation as resolved.
Show resolved Hide resolved
):
return True

return False

Check warning on line 95 in apis/python/src/tiledbsoma/_exception.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_exception.py#L95

Added line #L95 was not covered by tests


def is_duplicate_group_key_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return try if it indicates a duplicate member
add request in a tiledb.Group.
Expand Down
40 changes: 20 additions & 20 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
from .._collection import AnyTileDBCollection, CollectionBase
from .._common_nd_array import NDArray
from .._constants import SOMA_JOINID
from .._exception import DoesNotExistError, SOMAError
from .._exception import (
AlreadyExistsError,
DoesNotExistError,
SOMAError,
)
from .._tdb_handles import RawHandle
from .._tiledb_array import TileDBArray
from .._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -984,17 +988,13 @@
additional_metadata: AdditionalMetadata = None,
) -> CollectionBase[_TDBO]:
try:
thing = cls.open(uri, "w", context=context)
except DoesNotExistError:
pass # This is always OK; make a new one.
else:
coll = cls.create(uri, context=context)
except AlreadyExistsError:
# It already exists. Are we resuming?
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{uri} already exists")
add_metadata(thing, additional_metadata)
return thing
coll = cls.open(uri, "w", context=context)

coll = cls.create(uri, context=context)
add_metadata(coll, additional_metadata)
return coll

Expand Down Expand Up @@ -1194,15 +1194,18 @@
)

try:
soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)
except DoesNotExistError:
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

Check warning on line 1205 in apis/python/src/tiledbsoma/io/ingest.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/io/ingest.py#L1205

Added line #L1205 was not covered by tests

soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)

if ingestion_params.skip_existing_nonempty_domain:
storage_ned = _read_nonempty_domain(soma_df)
dim_range = ((int(df.index.min()), int(df.index.max())),)
Expand All @@ -1212,8 +1215,6 @@
_util.format_elapsed(s, f"SKIPPED {soma_df.uri}"),
)
return soma_df
elif ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -1291,10 +1292,6 @@
logging.log_io(None, f"START WRITING {uri}")

try:
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)
except DoesNotExistError:
# A SparseNDArray must be appendable in soma.io.
shape = [None for _ in matrix.shape] if cls.is_sparse else matrix.shape
soma_ndarray = cls.create(
Expand All @@ -1304,9 +1301,12 @@
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_ndarray.uri} already exists")
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -2749,15 +2749,15 @@
logging.log_io(msg, msg)
return
try:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)
except DoesNotExistError:
soma_arr = DenseNDArray.create(
arr_uri,
type=pa_dtype,
shape=value.shape,
platform_config=platform_config,
context=context,
)
except AlreadyExistsError:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)

Check warning on line 2760 in apis/python/src/tiledbsoma/io/ingest.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/io/ingest.py#L2759-L2760

Added lines #L2759 - L2760 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually just directly call DenseNDArray.open here (and same with DataFrame.open above).


# If resume mode: don't re-write existing data. This is the user's explicit request
# that we not re-write things that have already been written.
Expand Down
Loading