Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] Ingestion performance #2434

Merged
merged 16 commits into from
Apr 12, 2024
4 changes: 3 additions & 1 deletion .github/workflows/python-so-copying.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ jobs:
./venv-soma/bin/python -c "import tiledbsoma; print(tiledbsoma.pytiledbsoma.version())"

macos:
runs-on: macos-12
runs-on: macos-13
name: "macos TILEDB_EXISTS: ${{ matrix.TILEDB_EXISTS }} TILEDBSOMA_EXISTS: ${{ matrix.TILEDBSOMA_EXISTS }}"
strategy:
fail-fast: false
Expand All @@ -153,6 +153,8 @@ jobs:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # for setuptools-scm
- name: Check if System Integrity Protection (SIP) is enabled
run: csrutil status
- name: Install pre-built libtiledb
if: ${{ matrix.TILEDB_EXISTS == 'yes' }}
run: |
Expand Down
3 changes: 2 additions & 1 deletion apis/python/src/tiledbsoma/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
from ._constants import SOMA_JOINID
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import DoesNotExistError, SOMAError
from ._exception import AlreadyExistsError, DoesNotExistError, SOMAError
from ._experiment import Experiment
from ._factory import open
from ._general_utilities import (
Expand All @@ -171,6 +171,7 @@
__version__ = get_implementation_version()

__all__ = [
"AlreadyExistsError",
"AxisColumnNames",
"AxisQuery",
"Collection",
Expand Down
28 changes: 20 additions & 8 deletions apis/python/src/tiledbsoma/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
from ._common_nd_array import NDArray
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import SOMAError, is_does_not_exist_error
from ._exception import (
AlreadyExistsError,
SOMAError,
is_already_exists_error,
is_does_not_exist_error,
)
from ._funcs import typeguard_ignore
from ._sparse_nd_array import SparseNDArray
from ._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -112,20 +117,27 @@
the context.

Raises:
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.

Lifecycle:
Experimental.
"""
context = _validate_soma_tiledb_context(context)
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

Check warning on line 140 in apis/python/src/tiledbsoma/_collection.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_collection.py#L140

Added line #L140 was not covered by tests

@classmethod
def open(
Expand Down
18 changes: 13 additions & 5 deletions apis/python/src/tiledbsoma/_common_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tiledb

from . import _arrow_types, _util
from ._exception import AlreadyExistsError, is_already_exists_error
from ._tiledb_array import TileDBArray
from ._types import OpenTimestamp
from .options._soma_tiledb_context import (
Expand Down Expand Up @@ -77,6 +78,8 @@
If the ``type`` is unsupported.
ValueError:
If the ``shape`` is unsupported.
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.

Expand All @@ -91,11 +94,16 @@
context,
is_sparse=cls.is_sparse,
)
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

Check warning on line 106 in apis/python/src/tiledbsoma/_common_nd_array.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_common_nd_array.py#L106

Added line #L106 was not covered by tests

@property
def shape(self) -> Tuple[int, ...]:
Expand Down
18 changes: 13 additions & 5 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from . import _arrow_types, _util
from . import pytiledbsoma as clib
from ._constants import SOMA_JOINID
from ._exception import AlreadyExistsError, is_already_exists_error
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._tdb_handles import DataFrameWrapper
Expand Down Expand Up @@ -187,6 +188,8 @@
an undefined column name.
ValueError:
If the ``schema`` specifies illegal column names.
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.

Expand Down Expand Up @@ -217,11 +220,16 @@
TileDBCreateOptions.from_platform_config(platform_config),
context,
)
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

Check warning on line 232 in apis/python/src/tiledbsoma/_dataframe.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_dataframe.py#L232

Added line #L232 was not covered by tests

def keys(self) -> Tuple[str, ...]:
"""Returns the names of the columns when read back as a dataframe.
Expand Down
30 changes: 30 additions & 0 deletions apis/python/src/tiledbsoma/_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ def is_does_not_exist_error(e: tiledb.TileDBError) -> bool:
return False


class AlreadyExistsError(SOMAError):
"""Raised when attempting to create an already existing SOMA object.

Lifecycle: experimental
"""

pass


def is_already_exists_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return true if it indicates the object already exists

Lifecycle: experimental

Example:
try:
tiledb.Array.create(uri, schema, ctx=ctx)
...
except tiledb.TileDBError as e:
if is_already_exists_error(e):
...
raise e
"""
stre = str(e)
# Local-disk, S3, and TileDB Cloud exceptions all have the substring
# "already exists". Here we lower-case the exception message just
# in case someone ever uppercases it on the other end.
return "already exists" in stre.lower()


def is_duplicate_group_key_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return try if it indicates a duplicate member
add request in a tiledb.Group.
Expand Down
40 changes: 20 additions & 20 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
from .._collection import AnyTileDBCollection, CollectionBase
from .._common_nd_array import NDArray
from .._constants import SOMA_JOINID
from .._exception import DoesNotExistError, SOMAError
from .._exception import (
AlreadyExistsError,
DoesNotExistError,
SOMAError,
)
from .._tdb_handles import RawHandle
from .._tiledb_array import TileDBArray
from .._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -984,17 +988,13 @@
additional_metadata: AdditionalMetadata = None,
) -> CollectionBase[_TDBO]:
try:
thing = cls.open(uri, "w", context=context)
except DoesNotExistError:
pass # This is always OK; make a new one.
else:
coll = cls.create(uri, context=context)
except AlreadyExistsError:
# It already exists. Are we resuming?
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{uri} already exists")
add_metadata(thing, additional_metadata)
return thing
coll = cls.open(uri, "w", context=context)

coll = cls.create(uri, context=context)
add_metadata(coll, additional_metadata)
return coll

Expand Down Expand Up @@ -1194,15 +1194,18 @@
)

try:
soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)
except DoesNotExistError:
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

Check warning on line 1205 in apis/python/src/tiledbsoma/io/ingest.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/io/ingest.py#L1205

Added line #L1205 was not covered by tests

soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)

if ingestion_params.skip_existing_nonempty_domain:
storage_ned = _read_nonempty_domain(soma_df)
dim_range = ((int(df.index.min()), int(df.index.max())),)
Expand All @@ -1212,8 +1215,6 @@
_util.format_elapsed(s, f"SKIPPED {soma_df.uri}"),
)
return soma_df
elif ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -1291,10 +1292,6 @@
logging.log_io(None, f"START WRITING {uri}")

try:
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)
except DoesNotExistError:
# A SparseNDArray must be appendable in soma.io.
shape = [None for _ in matrix.shape] if cls.is_sparse else matrix.shape
soma_ndarray = cls.create(
Expand All @@ -1304,9 +1301,12 @@
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_ndarray.uri} already exists")
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -2749,15 +2749,15 @@
logging.log_io(msg, msg)
return
try:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)
except DoesNotExistError:
soma_arr = DenseNDArray.create(
arr_uri,
type=pa_dtype,
shape=value.shape,
platform_config=platform_config,
context=context,
)
except AlreadyExistsError:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)

Check warning on line 2760 in apis/python/src/tiledbsoma/io/ingest.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/io/ingest.py#L2759-L2760

Added lines #L2759 - L2760 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually just directly call DenseNDArray.open here (and same with DataFrame.open above).


# If resume mode: don't re-write existing data. This is the user's explicit request
# that we not re-write things that have already been written.
Expand Down
Loading