From a6ec3d1b89670282009a9223332b800cdc956f4b Mon Sep 17 00:00:00 2001 From: Mike Lin Date: Tue, 7 Feb 2023 19:20:32 -1000 Subject: [PATCH] [python] Add timestamp slots in `SOMATileDBContext` and apply throughout (#892) #540 Add `read_timestamp` and `write_timestamp` slots to `SOMATileDBContext`. Then, feed these through to all read and write operations (including the C++ `SOMAReader`). `read_timestamp` defaults to the moment when the `SOMATileDBContext` is initialized. --- .../tiledbsoma/options/soma_tiledb_context.py | 69 ++++++++++++++++++- apis/python/src/tiledbsoma/tdb_handles.py | 51 ++++++++++++-- apis/python/src/tiledbsoma/tiledb_array.py | 4 ++ apis/python/tests/test_collection.py | 62 +++++++++++++++++ apis/python/tests/test_dataframe.py | 49 +++++++++++++ apis/python/tests/test_dense_nd_array.py | 59 ++++++++++++++++ apis/python/tests/test_sparse_nd_array.py | 56 +++++++++++++++ 7 files changed, 343 insertions(+), 7 deletions(-) diff --git a/apis/python/src/tiledbsoma/options/soma_tiledb_context.py b/apis/python/src/tiledbsoma/options/soma_tiledb_context.py index af1341caf9..c8f49c8393 100644 --- a/apis/python/src/tiledbsoma/options/soma_tiledb_context.py +++ b/apis/python/src/tiledbsoma/options/soma_tiledb_context.py @@ -1,4 +1,5 @@ -from typing import Dict, Union +import time +from typing import Any, Dict, Optional, Union import attrs import tiledb @@ -29,3 +30,69 @@ class SOMATileDBContext: """ tiledb_ctx: tiledb.Ctx = _build_default_tiledb_ctx() + + read_timestamp: int = attrs.field(factory=lambda: int(time.time() * 1000)) + """ + Timestamp for operations on SOMA objects open in read mode, in milliseconds since Unix epoch. + Defaults to the time of context initialization. Set to 0xFFFFFFFFFFFFFFFF (UINT64_MAX) to get + the latest revision as of when *each* object is opened. + + SOMA objects opened in *write* mode ignore any read timestamp. + """ + + read_timestamp_start: int = 0 + """ + Timestamp range start for operations on SOMA objects opened in read mode. This is usually zero + except for specific, unusual query requirements. + """ + + write_timestamp: Optional[int] = None + """ + Timestamp applied to all SOMA object write operations. If unset, each individual write + operation receives the timestamp as of when the operation executes. + + Caution: overlapping writes (of overlapping array slices, or setting the same collection key) + should be avoided when write_timestamp is set. Distinct, overlapping write operations given the + same timestamp may be applied in any order. + """ + + @read_timestamp.validator + def _validate_timestamps(self, _: Any, __: Any) -> None: + if not ( + self.read_timestamp_start >= 0 + and self.read_timestamp >= self.read_timestamp_start + ): + raise ValueError("SOMATileDBContext: invalid read timestamp range") + if not (self.write_timestamp is None or self.write_timestamp >= 0): + raise ValueError("SOMATileDBContext: invalid write timestamp") + + # (internal) tiledb.Ctx specifically for tiledb.Group operations; unlike arrays, tiledb.Group + # needs its timestamps set in the tiledb.Ctx. We'd like to get rid of this in the future, + # if/when tiledb.Group() takes a timestamp argument like tiledb.Array(). + _group_read_tiledb_ctx: tiledb.Ctx = attrs.field(init=False) + _group_write_tiledb_ctx: tiledb.Ctx = attrs.field(init=False) + + def __attrs_post_init__(self) -> None: + """ + initialization hook invoked by the attrs-generated __init__; prepares the pair of + timestamped tiledb.Ctx for groups + """ + + group_read_config = self.tiledb_ctx.config().dict() + group_read_config["sm.group.timestamp_start"] = self.read_timestamp_start + group_read_config["sm.group.timestamp_end"] = self.read_timestamp + object.__setattr__( + self, "_group_read_tiledb_ctx", tiledb.Ctx(group_read_config) + ) + assert isinstance(self._group_read_tiledb_ctx, tiledb.Ctx) + + if self.write_timestamp is not None: + group_write_config = self.tiledb_ctx.config().dict() + group_write_config["sm.group.timestamp_start"] = self.write_timestamp + group_write_config["sm.group.timestamp_end"] = self.write_timestamp + object.__setattr__( + self, "_group_write_tiledb_ctx", tiledb.Ctx(group_write_config) + ) + else: + object.__setattr__(self, "_group_write_tiledb_ctx", self.tiledb_ctx) + assert isinstance(self._group_write_tiledb_ctx, tiledb.Ctx) diff --git a/apis/python/src/tiledbsoma/tdb_handles.py b/apis/python/src/tiledbsoma/tdb_handles.py index 5fda47a611..fbed94ede5 100644 --- a/apis/python/src/tiledbsoma/tdb_handles.py +++ b/apis/python/src/tiledbsoma/tdb_handles.py @@ -67,7 +67,12 @@ def open( tdb = cls._opener(uri, mode, context) handle = cls(uri, mode, context, tdb) if mode == "w": - with cls._opener(uri, "r", context) as auxiliary_reader: + # Briefly open a read-mode handle to populate metadata/schema/group contents/etc., + # ignoring any read_timestamp set in the context to get an up-to-date view in + # preparation for writing. + with cls._opener( + uri, "r", context, use_latest_read_timestamp=True + ) as auxiliary_reader: handle._do_initial_reads(auxiliary_reader) else: handle._do_initial_reads(tdb) @@ -80,7 +85,11 @@ def open( @classmethod @abc.abstractmethod def _opener( - cls, uri: str, mode: options.OpenMode, context: SOMATileDBContext + cls, + uri: str, + mode: options.OpenMode, + context: SOMATileDBContext, + use_latest_read_timestamp: bool = False, ) -> _RawHdl_co: """Opens and returns a TileDB object specific to this type.""" raise NotImplementedError() @@ -152,9 +161,23 @@ class ArrayWrapper(Wrapper[tiledb.Array]): @classmethod def _opener( - cls, uri: str, mode: options.OpenMode, context: SOMATileDBContext + cls, + uri: str, + mode: options.OpenMode, + context: SOMATileDBContext, + use_latest_read_timestamp: bool = False, ) -> tiledb.Array: - return tiledb.open(uri, mode, ctx=context.tiledb_ctx) + if not use_latest_read_timestamp: + timestamp_arg = ( + context.write_timestamp + if mode == "w" + else (context.read_timestamp_start, context.read_timestamp) + ) + else: + # array opened in write mode should initialize with latest metadata + assert mode == "r" + timestamp_arg = None + return tiledb.open(uri, mode, timestamp=timestamp_arg, ctx=context.tiledb_ctx) @property def schema(self) -> tiledb.ArraySchema: @@ -180,9 +203,25 @@ class GroupWrapper(Wrapper[tiledb.Group]): @classmethod def _opener( - cls, uri: str, mode: options.OpenMode, context: SOMATileDBContext + cls, + uri: str, + mode: options.OpenMode, + context: SOMATileDBContext, + use_latest_read_timestamp: bool = False, ) -> tiledb.Group: - return tiledb.Group(uri, mode, ctx=context.tiledb_ctx) + if not use_latest_read_timestamp: + # As of Feb 2023, tiledb.Group() has no timestamp arg; instead its timestamps must be + # set in the tiledb.Ctx config. SOMATileDBContext prepares the suitable tiledb.Ctx. + ctx_arg = ( + context._group_write_tiledb_ctx + if mode == "w" + else context._group_read_tiledb_ctx + ) + else: + # Group opened in write mode should initialize with latest contents & metadata + assert mode == "r" + ctx_arg = context.tiledb_ctx + return tiledb.Group(uri, mode, ctx=ctx_arg) def _do_initial_reads(self, reader: tiledb.Group) -> None: super()._do_initial_reads(reader) diff --git a/apis/python/src/tiledbsoma/tiledb_array.py b/apis/python/src/tiledbsoma/tiledb_array.py index ea4e348466..a4967b488d 100644 --- a/apis/python/src/tiledbsoma/tiledb_array.py +++ b/apis/python/src/tiledbsoma/tiledb_array.py @@ -67,6 +67,10 @@ def _soma_reader( kwargs = { "name": self.__class__.__name__, "platform_config": self._ctx.config().dict(), + "timestamp": ( + self.context.read_timestamp_start, + self.context.read_timestamp, + ), } # Leave empty arguments out of kwargs to allow C++ constructor defaults to apply, as # they're not all wrapped in std::optional<>. diff --git a/apis/python/tests/test_collection.py b/apis/python/tests/test_collection.py index 1f1761984c..1f2107e346 100644 --- a/apis/python/tests/test_collection.py +++ b/apis/python/tests/test_collection.py @@ -10,6 +10,7 @@ import tiledbsoma as soma from tiledbsoma import collection, factory, tiledb_object from tiledbsoma.exception import DoesNotExistError +from tiledbsoma.options import SOMATileDBContext # ---------------------------------------------------------------- @@ -339,3 +340,64 @@ def test_real_class_fail(in_type): ) def test_sanitize_for_path(key, want): assert collection._sanitize_for_path(key) == want + + +def test_timestamped_ops(tmp_path): + """ + When we specify read/write timestamps in SOMATileDBContext supplied to collection, those are + inherited by elements accessed via the collection. + """ + + # create collection @ t=10 + with soma.Collection.create( + tmp_path.as_uri(), context=SOMATileDBContext(write_timestamp=10) + ): + pass + + # add array A to it @ t=20 + with soma.Collection.open( + tmp_path.as_uri(), mode="w", context=SOMATileDBContext(write_timestamp=20) + ) as sc: + sc.add_new_dense_ndarray("A", type=pa.uint8(), shape=(2, 2)).write( + (slice(0, 2), slice(0, 2)), + pa.Tensor.from_numpy(np.zeros((2, 2), dtype=np.uint8)), + ) + + # access A via collection @ t=30 and write something into it + with soma.Collection.open( + tmp_path.as_uri(), mode="w", context=SOMATileDBContext(write_timestamp=30) + ) as sc: + sc["A"].write( + (slice(0, 1), slice(0, 1)), + pa.Tensor.from_numpy(np.ones((1, 1), dtype=np.uint8)), + ) + + # open A via collection with no timestamp => A should reflect both writes + with soma.Collection.open(tmp_path.as_uri()) as sc: + assert sc["A"].read((slice(None), slice(None))).to_numpy().tolist() == [ + [1, 0], + [0, 0], + ] + + # open A via collection @ t=25 => A should reflect first write only + with soma.Collection.open( + tmp_path.as_uri(), context=SOMATileDBContext(read_timestamp=25) + ) as sc: + assert sc["A"].read((slice(None), slice(None))).to_numpy().tolist() == [ + [0, 0], + [0, 0], + ] + + # open collection @ t=15 => A should not even be there + with soma.Collection.open( + tmp_path.as_uri(), context=SOMATileDBContext(read_timestamp=15) + ) as sc: + assert "A" not in sc + + # confirm timestamp validation in SOMATileDBContext + with pytest.raises(ValueError): + SOMATileDBContext(read_timestamp=-1) + with pytest.raises(ValueError): + SOMATileDBContext(read_timestamp_start=2, read_timestamp=1) + with pytest.raises(ValueError): + SOMATileDBContext(write_timestamp=-1) diff --git a/apis/python/tests/test_dataframe.py b/apis/python/tests/test_dataframe.py index 21df4a762c..a93eb87d8c 100644 --- a/apis/python/tests/test_dataframe.py +++ b/apis/python/tests/test_dataframe.py @@ -7,6 +7,7 @@ import tiledb import tiledbsoma as soma +from tiledbsoma.options import SOMATileDBContext @pytest.fixture @@ -857,3 +858,51 @@ def test_create_platform_config_overrides( with tiledb.open(uri) as D: for k, v in expected_schema_fields.items(): assert getattr(D.schema, k) == v + + +def test_timestamped_ops(tmp_path): + schema = pa.schema( + [ + ("soma_joinid", pa.int64()), + ("B", pa.float64()), + ("C", pa.large_string()), + ] + ) + with soma.DataFrame.create( + tmp_path.as_posix(), + schema=schema, + index_column_names=["soma_joinid"], + context=SOMATileDBContext(write_timestamp=10), + ) as sidf: + data = { + "soma_joinid": [0], + "B": [100.1], + "C": ["foo"], + } + sidf.write(pa.Table.from_pydict(data)) + + with soma.DataFrame.open( + uri=tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=20) + ) as sidf: + data = { + "soma_joinid": [0, 1], + "B": [200.2, 300.3], + "C": ["bar", "bas"], + } + sidf.write(pa.Table.from_pydict(data)) + + # read without timestamp & see final image + with soma.DataFrame.open(tmp_path.as_posix()) as sidf: + tab = sidf.read().concat() + assert list(x.as_py() for x in tab["soma_joinid"]) == [0, 1] + assert list(x.as_py() for x in tab["B"]) == [200.2, 300.3] + assert list(x.as_py() for x in tab["C"]) == ["bar", "bas"] + + # read at t=15 & see only the first write + with soma.DataFrame.open( + tmp_path.as_posix(), context=SOMATileDBContext(read_timestamp=15) + ) as sidf: + tab = sidf.read().concat() + assert list(x.as_py() for x in tab["soma_joinid"]) == [0] + assert list(x.as_py() for x in tab["B"]) == [100.1] + assert list(x.as_py() for x in tab["C"]) == ["foo"] diff --git a/apis/python/tests/test_dense_nd_array.py b/apis/python/tests/test_dense_nd_array.py index e335c5851e..1e2ec6b1c8 100644 --- a/apis/python/tests/test_dense_nd_array.py +++ b/apis/python/tests/test_dense_nd_array.py @@ -285,3 +285,62 @@ def test_tile_extents(tmp_path): with tiledb.open(tmp_path.as_posix()) as A: assert A.schema.domain.dim(0).tile == 100 assert A.schema.domain.dim(1).tile == 2048 + + +def test_timestamped_ops(tmp_path): + # 2x2 array + with soma.DenseNDArray.create( + tmp_path.as_posix(), + type=pa.uint8(), + shape=(2, 2), + context=SOMATileDBContext(write_timestamp=1), + ) as a: + a.write( + (slice(0, 2), slice(0, 2)), + pa.Tensor.from_numpy(np.zeros((2, 2), dtype=np.uint8)), + ) + + # write 1 into top-left entry @ t=10 + with soma.DenseNDArray.open( + tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=10) + ) as a: + a.write( + (slice(0, 1), slice(0, 1)), + pa.Tensor.from_numpy(np.ones((1, 1), dtype=np.uint8)), + ) + + # write 1 into bottom-right entry @ t=20 + with soma.DenseNDArray.open( + uri=tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=20) + ) as a: + a.write( + (slice(1, 2), slice(1, 2)), + pa.Tensor.from_numpy(np.ones((1, 1), dtype=np.uint8)), + ) + + # read with no timestamp args & see both 1s + with soma.DenseNDArray.open(tmp_path.as_posix()) as a: + assert a.read((slice(None), slice(None))).to_numpy().tolist() == [ + [1, 0], + [0, 1], + ] + + # read @ t=15 & see only the writes up til then + with soma.DenseNDArray.open( + tmp_path.as_posix(), context=SOMATileDBContext(read_timestamp=15) + ) as a: + assert a.read((slice(0, 1), slice(0, 1))).to_numpy().tolist() == [ + [1, 0], + [0, 0], + ] + + # read with (timestamp_start, timestamp_end) = (15, 25) & see only the t=20 write + with soma.DenseNDArray.open( + tmp_path.as_posix(), + context=SOMATileDBContext(read_timestamp_start=15, read_timestamp=25), + ) as a: + F = 255 # fill value + assert a.read((slice(0, 1), slice(0, 1))).to_numpy().tolist() == [ + [F, F], + [F, 1], + ] diff --git a/apis/python/tests/test_sparse_nd_array.py b/apis/python/tests/test_sparse_nd_array.py index 1c1c1cb00c..a844b3ba76 100644 --- a/apis/python/tests/test_sparse_nd_array.py +++ b/apis/python/tests/test_sparse_nd_array.py @@ -9,6 +9,7 @@ import tiledbsoma as soma from tiledbsoma import factory +from tiledbsoma.options import SOMATileDBContext from . import NDARRAY_ARROW_TYPES_NOT_SUPPORTED, NDARRAY_ARROW_TYPES_SUPPORTED @@ -930,3 +931,58 @@ def test_create_platform_config_overrides( with tiledb.open(uri) as D: for k, v in expected_schema_fields.items(): assert getattr(D.schema, k) == v + + +def test_timestamped_ops(tmp_path): + # 2x2 array + with soma.SparseNDArray.create( + tmp_path.as_posix(), + type=pa.uint16(), + shape=(2, 2), + context=SOMATileDBContext(write_timestamp=10), + ) as a: + # write 1 into top-left entry @ t=10 + a.write( + pa.SparseCOOTensor.from_scipy( + sparse.coo_matrix(([1], ([0], [0])), shape=a.shape) + ) + ) + + # write 1 into bottom-right entry @ t=20 + with soma.SparseNDArray.open( + tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=20) + ) as a: + a.write( + pa.SparseCOOTensor.from_scipy( + sparse.coo_matrix(([1], ([1], [1])), shape=a.shape) + ) + ) + + # read with no timestamp args & see both 1s + with soma.SparseNDArray.open(tmp_path.as_posix()) as a: + assert a.read().coos().concat().to_scipy().todense().tolist() == [ + [1, 0], + [0, 1], + ] + assert a.nnz == 2 + + # read @ t=15 & see only the first write + with soma.SparseNDArray.open( + tmp_path.as_posix(), context=SOMATileDBContext(read_timestamp=15) + ) as a: + assert a.read().coos().concat().to_scipy().todense().tolist() == [ + [1, 0], + [0, 0], + ] + assert a.nnz == 1 + + # read with (timestamp_start, timestamp_end) = (15, 25) & see only the second write + with soma.SparseNDArray.open( + tmp_path.as_posix(), + context=SOMATileDBContext(read_timestamp_start=15, read_timestamp=25), + ) as a: + assert a.read().coos().concat().to_scipy().todense().tolist() == [ + [0, 0], + [0, 1], + ] + assert a.nnz == 1