Skip to content

Commit

Permalink
Forbid Collection operations unsupported by TileDB Groups.
Browse files Browse the repository at this point in the history
The hack we were previously using, where we would immediately write out
group contents changes to disk and use delete-then-add to effect
replacing a member, meant that the actual results on disk were not
guaranteed to be consistent. Instead, we now simply forbid operations
unsupported (for now) by TileDB Groups.

This doesn't use the same workaround as metadata, since metadata *does*
support replacement, meaning that all dictionary operations work for
metadata. There is (currently) no consistent way to make all operations
work for Groups.
  • Loading branch information
thetorpedodog committed Feb 17, 2023
1 parent 67442d7 commit b66e9eb
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 74 deletions.
58 changes: 16 additions & 42 deletions apis/python/src/tiledbsoma/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import itertools
import re
import time
from typing import (
Any,
Callable,
Expand All @@ -11,6 +10,7 @@
Iterable,
Iterator,
Optional,
Set,
Tuple,
Type,
TypeVar,
Expand All @@ -29,7 +29,7 @@
from ._common_nd_array import NDArray
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import is_does_not_exist_error, is_duplicate_group_key_error
from ._exception import SOMAError, is_does_not_exist_error
from ._sparse_nd_array import SparseNDArray
from ._tiledb_object import AnyTileDBObject, TileDBObject
from ._util import is_relative_uri, make_relative_path, uri_joinpath
Expand Down Expand Up @@ -61,7 +61,7 @@ class CollectionBase(
``DataFrame``, ``DenseNDArray``, ``SparseNDArray`` or ``Experiment``.
"""

__slots__ = ("_contents",)
__slots__ = ("_contents", "_mutated_keys")
_wrapper_type = _tdb_handles.GroupWrapper

# TODO: Implement additional creation of members on collection subclasses.
Expand Down Expand Up @@ -116,6 +116,7 @@ def __init__(
This is loaded at startup when we have a read handle.
"""
self._mutated_keys: Set[str] = set()

# Overloads to allow type inference to work when doing:
#
Expand Down Expand Up @@ -172,9 +173,7 @@ def add_new_collection(
creating this sub-collection. This is passed directly to
``[CurrentCollectionType].create()``.
"""
child_cls: Type[AnyTileDBCollection] = (
cls or Collection # type: ignore[assignment]
)
child_cls: Type[AnyTileDBCollection] = cls or Collection
return self._add_new_element(
key,
child_cls,
Expand Down Expand Up @@ -416,53 +415,28 @@ def _set_element(

self._check_allows_child(key, type(soma_object))

# Set has update semantics. Add if missing, delete/add if not. The TileDB Group
# API only has add/delete. Assume add will succeed, and deal with delete/retry
# if we get an error on add.

for retry in [True, False]:
try:
self._handle.writer.add(name=key, uri=uri, relative=relative)
break
except tiledb.TileDBError as e:
if not is_duplicate_group_key_error(e):
raise
if retry:
self._del_element(key)

# There can be timestamp overlap in a very-rapid-fire unit-test environment. When
# that happens, we effectively fall back to filesystem file order, which will be the
# lexical ordering of the group-metadata filenames. Since the timestamp components
# are the same, that will be the lexical order of the UUIDs. So if the new metadata
# file is sorted before the old one, the group will look like the old state.
#
# The standard solution is a negligible but non-zero delay.
time.sleep(0.001)
# HACK: There is no way to change a group entry without deleting it and
# re-adding it, but you can't do both of those in the same transaction.
# You get "member already set for removal" in an error.
#
# This also means that if, in one transaction, you do
# grp["x"] = y
# del grp["x"]
# you would also get an error without this hack.
self._handle._flush_hack()

if key in self._mutated_keys.union(self._contents):
# TileDB groups currently do not support replacing elements.
# If we use a hack to flush writes, corruption is possible.
raise SOMAError(f"replacing key {key!r} is unsupported")
self._handle.writer.add(name=key, uri=uri, relative=relative)
self._contents[key] = _CachedElement(
entry=_tdb_handles.GroupEntry(soma_object.uri, soma_object._wrapper_type),
soma=soma_object,
)
self._mutated_keys.add(key)

def _del_element(self, key: str) -> None:
if key in self._mutated_keys:
raise SOMAError(f"cannot delete previously-mutated key {key!r}")
try:
self._handle.writer.remove(key)
# HACK: see note above
self._handle._flush_hack()
self._contents.pop(key, None)
except tiledb.TileDBError as tdbe:
if is_does_not_exist_error(tdbe):
raise KeyError(f"{key!r} does not exist in {self}") from tdbe
raise
self._contents.pop(key, None)
self._mutated_keys.add(key)

def _new_child_uri(self, *, key: str, user_uri: Optional[str]) -> "_ChildURI":
maybe_relative_uri = user_uri or _sanitize_for_path(key)
Expand Down Expand Up @@ -502,7 +476,7 @@ def _check_allows_child(cls, key: str, child_cls: type) -> None:
)


AnyTileDBCollection = CollectionBase[AnyTileDBObject]
AnyTileDBCollection = CollectionBase[Any]


class Collection(
Expand Down
4 changes: 3 additions & 1 deletion apis/python/src/tiledbsoma/_tiledb_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ def _set_create_metadata(cls, handle: _tdb_handles.AnyWrapper) -> None:
)
# Semi-hack: flush the metadata immediately upon creation so that the
# backing storage isn't half-created (i.e., there is a tiledb object
# on disk, but its type is not stored). This is immutable, so it's fine.
# on disk, but its type is not stored).
# TODO: We should probably write this metadata at time 0.
# Doing so would eliminate this last _flush_hack call.
handle._flush_hack()

def _check_open_read(self) -> None:
Expand Down
89 changes: 65 additions & 24 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
eta,
logging,
)
from .._collection import AnyTileDBCollection
from .._common_nd_array import NDArray
from .._constants import SOMA_JOINID
from .._exception import DoesNotExistError, SOMAError
from .._funcs import typeguard_ignore
from .._tdb_handles import RawHandle
from .._tiledb_array import TileDBArray
from .._tiledb_object import TileDBObject
from .._tiledb_object import AnyTileDBObject, TileDBObject
from .._types import INGEST_MODES, IngestMode, NPNDArray, Path
from ..options import SOMATileDBContext
from ..options.tiledb_create_options import TileDBCreateOptions
Expand Down Expand Up @@ -175,21 +176,23 @@ def from_anndata(
platform_config=platform_config,
ingest_mode=ingest_mode,
) as obs:
experiment.set("obs", obs, use_relative_uri=use_relative_uri)
_maybe_set(experiment, "obs", obs, use_relative_uri=use_relative_uri)

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# MS
with _create_or_open_coll(
Collection[Measurement], _util.uri_joinpath(experiment.uri, "ms"), ingest_mode
) as ms:
experiment.set("ms", ms, use_relative_uri=use_relative_uri)
_maybe_set(experiment, "ms", ms, use_relative_uri=use_relative_uri)

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# MS/meas
with _create_or_open_coll(
Measurement, f"{experiment.ms.uri}/{measurement_name}", ingest_mode
) as measurement:
ms.set(measurement_name, measurement, use_relative_uri=use_relative_uri)
_maybe_set(
ms, measurement_name, measurement, use_relative_uri=use_relative_uri
)

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# MS/meas/VAR
Expand All @@ -200,15 +203,15 @@ def from_anndata(
platform_config=platform_config,
ingest_mode=ingest_mode,
) as var:
measurement.set("var", var, use_relative_uri=use_relative_uri)
_maybe_set(measurement, "var", var, use_relative_uri=use_relative_uri)

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# MS/meas/X/DATA

with _create_or_open_coll(
Collection, _util.uri_joinpath(measurement.uri, "X"), ingest_mode
) as x:
measurement.set("X", x, use_relative_uri=use_relative_uri)
_maybe_set(measurement, "X", x, use_relative_uri=use_relative_uri)

# Since we did `anndata = ad.read_h5ad(path_to_h5ad, "r")` with the "r":
# * If we do `anndata.X[:]` we're loading all of a CSR/CSC/etc into memory.
Expand All @@ -227,7 +230,7 @@ def from_anndata(
platform_config,
ingest_mode,
) as data:
x.set("data", data, use_relative_uri=use_relative_uri)
_maybe_set(x, "data", data, use_relative_uri=use_relative_uri)

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# MS/meas/OBSM,VARM,OBSP,VARP
Expand All @@ -237,7 +240,9 @@ def from_anndata(
_util.uri_joinpath(measurement.uri, "obsm"),
ingest_mode,
) as obsm:
measurement.set("obsm", obsm, use_relative_uri=use_relative_uri)
_maybe_set(
measurement, "obsm", obsm, use_relative_uri=use_relative_uri
)
for key in anndata.obsm.keys():
with create_from_matrix(
DenseNDArray,
Expand All @@ -248,7 +253,9 @@ def from_anndata(
platform_config,
ingest_mode,
) as arr:
obsm.set(key, arr, use_relative_uri=use_relative_uri)
_maybe_set(
obsm, key, arr, use_relative_uri=use_relative_uri
)
arr.close()
measurement.obsm.close()

Expand All @@ -258,7 +265,9 @@ def from_anndata(
_util.uri_joinpath(measurement.uri, "varm"),
ingest_mode,
) as varm:
measurement.set("varm", varm, use_relative_uri=use_relative_uri)
_maybe_set(
measurement, "varm", varm, use_relative_uri=use_relative_uri
)
for key in anndata.varm.keys():
with create_from_matrix(
DenseNDArray,
Expand All @@ -269,7 +278,8 @@ def from_anndata(
platform_config,
ingest_mode,
) as darr:
varm.set(
_maybe_set(
varm,
key,
darr,
use_relative_uri=use_relative_uri,
Expand All @@ -281,7 +291,9 @@ def from_anndata(
_util.uri_joinpath(measurement.uri, "obsp"),
ingest_mode,
) as obsp:
measurement.set("obsp", obsp, use_relative_uri=use_relative_uri)
_maybe_set(
measurement, "obsp", obsp, use_relative_uri=use_relative_uri
)
for key in anndata.obsp.keys():
with create_from_matrix(
SparseNDArray,
Expand All @@ -292,7 +304,8 @@ def from_anndata(
platform_config,
ingest_mode,
) as sarr:
obsp.set(
_maybe_set(
obsp,
key,
sarr,
use_relative_uri=use_relative_uri,
Expand All @@ -304,7 +317,9 @@ def from_anndata(
_util.uri_joinpath(measurement.uri, "varp"),
ingest_mode,
) as varp:
measurement.set("varp", varp, use_relative_uri=use_relative_uri)
_maybe_set(
measurement, "varp", varp, use_relative_uri=use_relative_uri
)
for key in anndata.varp.keys():
with create_from_matrix(
SparseNDArray,
Expand All @@ -315,7 +330,8 @@ def from_anndata(
platform_config,
ingest_mode,
) as sarr:
varp.set(
_maybe_set(
varp,
key,
sarr,
use_relative_uri=use_relative_uri,
Expand All @@ -329,7 +345,8 @@ def from_anndata(
_util.uri_joinpath(experiment.ms.uri, "raw"),
ingest_mode,
) as raw_measurement:
ms.set(
_maybe_set(
ms,
"raw",
raw_measurement,
use_relative_uri=use_relative_uri,
Expand All @@ -342,17 +359,23 @@ def from_anndata(
platform_config=platform_config,
ingest_mode=ingest_mode,
) as var:
raw_measurement.set(
"var", var, use_relative_uri=use_relative_uri
_maybe_set(
raw_measurement,
"var",
var,
use_relative_uri=use_relative_uri,
)

with _create_or_open_coll(
Collection,
_util.uri_joinpath(raw_measurement.uri, "X"),
ingest_mode,
) as rm_x:
raw_measurement.set(
"X", rm_x, use_relative_uri=use_relative_uri
_maybe_set(
raw_measurement,
"X",
rm_x,
use_relative_uri=use_relative_uri,
)

with create_from_matrix(
Expand All @@ -362,7 +385,8 @@ def from_anndata(
platform_config,
ingest_mode,
) as rm_x_data:
rm_x.set(
_maybe_set(
rm_x,
"data",
rm_x_data,
use_relative_uri=use_relative_uri,
Expand All @@ -375,6 +399,20 @@ def from_anndata(
return experiment


def _maybe_set(
coll: AnyTileDBCollection,
key: str,
value: AnyTileDBObject,
*,
use_relative_uri: Optional[bool],
) -> None:
try:
coll.set(key, value, use_relative_uri=use_relative_uri)
except SOMAError:
# This is already a member of the collection.
pass


@overload
def _create_or_open_coll(
cls: Type[Experiment], uri: str, ingest_mode: str
Expand Down Expand Up @@ -586,14 +624,17 @@ def add_matrix_to_collection(
coll = _create_or_open_coll(
Collection, f"{meas.uri}/{collection_name}", ingest_mode
)
meas.set(collection_name, coll, use_relative_uri=use_relative_uri)
_maybe_set(meas, collection_name, coll, use_relative_uri=use_relative_uri)
with coll:
uri = f"{coll.uri}/{matrix_name}"
with create_from_matrix(
SparseNDArray, uri, matrix_data, ingest_mode=ingest_mode
) as sparse_nd_array:
coll.set(
matrix_name, sparse_nd_array, use_relative_uri=use_relative_uri
_maybe_set(
coll,
matrix_name,
sparse_nd_array,
use_relative_uri=use_relative_uri,
)


Expand Down
Loading

0 comments on commit b66e9eb

Please sign in to comment.