Skip to content

Commit

Permalink
[python] Add timestamp slots in SOMATileDBContext and apply through…
Browse files Browse the repository at this point in the history
…out (#892)

#540

Add `read_timestamp` and `write_timestamp` slots to `SOMATileDBContext`. Then, feed these through to all read and write operations (including the C++ `SOMAReader`). `read_timestamp` defaults to the moment when the `SOMATileDBContext` is initialized.
  • Loading branch information
mlin authored Feb 8, 2023
1 parent 662a592 commit a6ec3d1
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 7 deletions.
69 changes: 68 additions & 1 deletion apis/python/src/tiledbsoma/options/soma_tiledb_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Union
import time
from typing import Any, Dict, Optional, Union

import attrs
import tiledb
Expand Down Expand Up @@ -29,3 +30,69 @@ class SOMATileDBContext:
"""

tiledb_ctx: tiledb.Ctx = _build_default_tiledb_ctx()

read_timestamp: int = attrs.field(factory=lambda: int(time.time() * 1000))
"""
Timestamp for operations on SOMA objects open in read mode, in milliseconds since Unix epoch.
Defaults to the time of context initialization. Set to 0xFFFFFFFFFFFFFFFF (UINT64_MAX) to get
the latest revision as of when *each* object is opened.
SOMA objects opened in *write* mode ignore any read timestamp.
"""

read_timestamp_start: int = 0
"""
Timestamp range start for operations on SOMA objects opened in read mode. This is usually zero
except for specific, unusual query requirements.
"""

write_timestamp: Optional[int] = None
"""
Timestamp applied to all SOMA object write operations. If unset, each individual write
operation receives the timestamp as of when the operation executes.
Caution: overlapping writes (of overlapping array slices, or setting the same collection key)
should be avoided when write_timestamp is set. Distinct, overlapping write operations given the
same timestamp may be applied in any order.
"""

@read_timestamp.validator
def _validate_timestamps(self, _: Any, __: Any) -> None:
if not (
self.read_timestamp_start >= 0
and self.read_timestamp >= self.read_timestamp_start
):
raise ValueError("SOMATileDBContext: invalid read timestamp range")
if not (self.write_timestamp is None or self.write_timestamp >= 0):
raise ValueError("SOMATileDBContext: invalid write timestamp")

# (internal) tiledb.Ctx specifically for tiledb.Group operations; unlike arrays, tiledb.Group
# needs its timestamps set in the tiledb.Ctx. We'd like to get rid of this in the future,
# if/when tiledb.Group() takes a timestamp argument like tiledb.Array().
_group_read_tiledb_ctx: tiledb.Ctx = attrs.field(init=False)
_group_write_tiledb_ctx: tiledb.Ctx = attrs.field(init=False)

def __attrs_post_init__(self) -> None:
"""
initialization hook invoked by the attrs-generated __init__; prepares the pair of
timestamped tiledb.Ctx for groups
"""

group_read_config = self.tiledb_ctx.config().dict()
group_read_config["sm.group.timestamp_start"] = self.read_timestamp_start
group_read_config["sm.group.timestamp_end"] = self.read_timestamp
object.__setattr__(
self, "_group_read_tiledb_ctx", tiledb.Ctx(group_read_config)
)
assert isinstance(self._group_read_tiledb_ctx, tiledb.Ctx)

if self.write_timestamp is not None:
group_write_config = self.tiledb_ctx.config().dict()
group_write_config["sm.group.timestamp_start"] = self.write_timestamp
group_write_config["sm.group.timestamp_end"] = self.write_timestamp
object.__setattr__(
self, "_group_write_tiledb_ctx", tiledb.Ctx(group_write_config)
)
else:
object.__setattr__(self, "_group_write_tiledb_ctx", self.tiledb_ctx)
assert isinstance(self._group_write_tiledb_ctx, tiledb.Ctx)
51 changes: 45 additions & 6 deletions apis/python/src/tiledbsoma/tdb_handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ def open(
tdb = cls._opener(uri, mode, context)
handle = cls(uri, mode, context, tdb)
if mode == "w":
with cls._opener(uri, "r", context) as auxiliary_reader:
# Briefly open a read-mode handle to populate metadata/schema/group contents/etc.,
# ignoring any read_timestamp set in the context to get an up-to-date view in
# preparation for writing.
with cls._opener(
uri, "r", context, use_latest_read_timestamp=True
) as auxiliary_reader:
handle._do_initial_reads(auxiliary_reader)
else:
handle._do_initial_reads(tdb)
Expand All @@ -80,7 +85,11 @@ def open(
@classmethod
@abc.abstractmethod
def _opener(
cls, uri: str, mode: options.OpenMode, context: SOMATileDBContext
cls,
uri: str,
mode: options.OpenMode,
context: SOMATileDBContext,
use_latest_read_timestamp: bool = False,
) -> _RawHdl_co:
"""Opens and returns a TileDB object specific to this type."""
raise NotImplementedError()
Expand Down Expand Up @@ -152,9 +161,23 @@ class ArrayWrapper(Wrapper[tiledb.Array]):

@classmethod
def _opener(
cls, uri: str, mode: options.OpenMode, context: SOMATileDBContext
cls,
uri: str,
mode: options.OpenMode,
context: SOMATileDBContext,
use_latest_read_timestamp: bool = False,
) -> tiledb.Array:
return tiledb.open(uri, mode, ctx=context.tiledb_ctx)
if not use_latest_read_timestamp:
timestamp_arg = (
context.write_timestamp
if mode == "w"
else (context.read_timestamp_start, context.read_timestamp)
)
else:
# array opened in write mode should initialize with latest metadata
assert mode == "r"
timestamp_arg = None
return tiledb.open(uri, mode, timestamp=timestamp_arg, ctx=context.tiledb_ctx)

@property
def schema(self) -> tiledb.ArraySchema:
Expand All @@ -180,9 +203,25 @@ class GroupWrapper(Wrapper[tiledb.Group]):

@classmethod
def _opener(
cls, uri: str, mode: options.OpenMode, context: SOMATileDBContext
cls,
uri: str,
mode: options.OpenMode,
context: SOMATileDBContext,
use_latest_read_timestamp: bool = False,
) -> tiledb.Group:
return tiledb.Group(uri, mode, ctx=context.tiledb_ctx)
if not use_latest_read_timestamp:
# As of Feb 2023, tiledb.Group() has no timestamp arg; instead its timestamps must be
# set in the tiledb.Ctx config. SOMATileDBContext prepares the suitable tiledb.Ctx.
ctx_arg = (
context._group_write_tiledb_ctx
if mode == "w"
else context._group_read_tiledb_ctx
)
else:
# Group opened in write mode should initialize with latest contents & metadata
assert mode == "r"
ctx_arg = context.tiledb_ctx
return tiledb.Group(uri, mode, ctx=ctx_arg)

def _do_initial_reads(self, reader: tiledb.Group) -> None:
super()._do_initial_reads(reader)
Expand Down
4 changes: 4 additions & 0 deletions apis/python/src/tiledbsoma/tiledb_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def _soma_reader(
kwargs = {
"name": self.__class__.__name__,
"platform_config": self._ctx.config().dict(),
"timestamp": (
self.context.read_timestamp_start,
self.context.read_timestamp,
),
}
# Leave empty arguments out of kwargs to allow C++ constructor defaults to apply, as
# they're not all wrapped in std::optional<>.
Expand Down
62 changes: 62 additions & 0 deletions apis/python/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tiledbsoma as soma
from tiledbsoma import collection, factory, tiledb_object
from tiledbsoma.exception import DoesNotExistError
from tiledbsoma.options import SOMATileDBContext


# ----------------------------------------------------------------
Expand Down Expand Up @@ -339,3 +340,64 @@ def test_real_class_fail(in_type):
)
def test_sanitize_for_path(key, want):
assert collection._sanitize_for_path(key) == want


def test_timestamped_ops(tmp_path):
"""
When we specify read/write timestamps in SOMATileDBContext supplied to collection, those are
inherited by elements accessed via the collection.
"""

# create collection @ t=10
with soma.Collection.create(
tmp_path.as_uri(), context=SOMATileDBContext(write_timestamp=10)
):
pass

# add array A to it @ t=20
with soma.Collection.open(
tmp_path.as_uri(), mode="w", context=SOMATileDBContext(write_timestamp=20)
) as sc:
sc.add_new_dense_ndarray("A", type=pa.uint8(), shape=(2, 2)).write(
(slice(0, 2), slice(0, 2)),
pa.Tensor.from_numpy(np.zeros((2, 2), dtype=np.uint8)),
)

# access A via collection @ t=30 and write something into it
with soma.Collection.open(
tmp_path.as_uri(), mode="w", context=SOMATileDBContext(write_timestamp=30)
) as sc:
sc["A"].write(
(slice(0, 1), slice(0, 1)),
pa.Tensor.from_numpy(np.ones((1, 1), dtype=np.uint8)),
)

# open A via collection with no timestamp => A should reflect both writes
with soma.Collection.open(tmp_path.as_uri()) as sc:
assert sc["A"].read((slice(None), slice(None))).to_numpy().tolist() == [
[1, 0],
[0, 0],
]

# open A via collection @ t=25 => A should reflect first write only
with soma.Collection.open(
tmp_path.as_uri(), context=SOMATileDBContext(read_timestamp=25)
) as sc:
assert sc["A"].read((slice(None), slice(None))).to_numpy().tolist() == [
[0, 0],
[0, 0],
]

# open collection @ t=15 => A should not even be there
with soma.Collection.open(
tmp_path.as_uri(), context=SOMATileDBContext(read_timestamp=15)
) as sc:
assert "A" not in sc

# confirm timestamp validation in SOMATileDBContext
with pytest.raises(ValueError):
SOMATileDBContext(read_timestamp=-1)
with pytest.raises(ValueError):
SOMATileDBContext(read_timestamp_start=2, read_timestamp=1)
with pytest.raises(ValueError):
SOMATileDBContext(write_timestamp=-1)
49 changes: 49 additions & 0 deletions apis/python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tiledb

import tiledbsoma as soma
from tiledbsoma.options import SOMATileDBContext


@pytest.fixture
Expand Down Expand Up @@ -857,3 +858,51 @@ def test_create_platform_config_overrides(
with tiledb.open(uri) as D:
for k, v in expected_schema_fields.items():
assert getattr(D.schema, k) == v


def test_timestamped_ops(tmp_path):
schema = pa.schema(
[
("soma_joinid", pa.int64()),
("B", pa.float64()),
("C", pa.large_string()),
]
)
with soma.DataFrame.create(
tmp_path.as_posix(),
schema=schema,
index_column_names=["soma_joinid"],
context=SOMATileDBContext(write_timestamp=10),
) as sidf:
data = {
"soma_joinid": [0],
"B": [100.1],
"C": ["foo"],
}
sidf.write(pa.Table.from_pydict(data))

with soma.DataFrame.open(
uri=tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=20)
) as sidf:
data = {
"soma_joinid": [0, 1],
"B": [200.2, 300.3],
"C": ["bar", "bas"],
}
sidf.write(pa.Table.from_pydict(data))

# read without timestamp & see final image
with soma.DataFrame.open(tmp_path.as_posix()) as sidf:
tab = sidf.read().concat()
assert list(x.as_py() for x in tab["soma_joinid"]) == [0, 1]
assert list(x.as_py() for x in tab["B"]) == [200.2, 300.3]
assert list(x.as_py() for x in tab["C"]) == ["bar", "bas"]

# read at t=15 & see only the first write
with soma.DataFrame.open(
tmp_path.as_posix(), context=SOMATileDBContext(read_timestamp=15)
) as sidf:
tab = sidf.read().concat()
assert list(x.as_py() for x in tab["soma_joinid"]) == [0]
assert list(x.as_py() for x in tab["B"]) == [100.1]
assert list(x.as_py() for x in tab["C"]) == ["foo"]
59 changes: 59 additions & 0 deletions apis/python/tests/test_dense_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,62 @@ def test_tile_extents(tmp_path):
with tiledb.open(tmp_path.as_posix()) as A:
assert A.schema.domain.dim(0).tile == 100
assert A.schema.domain.dim(1).tile == 2048


def test_timestamped_ops(tmp_path):
# 2x2 array
with soma.DenseNDArray.create(
tmp_path.as_posix(),
type=pa.uint8(),
shape=(2, 2),
context=SOMATileDBContext(write_timestamp=1),
) as a:
a.write(
(slice(0, 2), slice(0, 2)),
pa.Tensor.from_numpy(np.zeros((2, 2), dtype=np.uint8)),
)

# write 1 into top-left entry @ t=10
with soma.DenseNDArray.open(
tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=10)
) as a:
a.write(
(slice(0, 1), slice(0, 1)),
pa.Tensor.from_numpy(np.ones((1, 1), dtype=np.uint8)),
)

# write 1 into bottom-right entry @ t=20
with soma.DenseNDArray.open(
uri=tmp_path.as_posix(), mode="w", context=SOMATileDBContext(write_timestamp=20)
) as a:
a.write(
(slice(1, 2), slice(1, 2)),
pa.Tensor.from_numpy(np.ones((1, 1), dtype=np.uint8)),
)

# read with no timestamp args & see both 1s
with soma.DenseNDArray.open(tmp_path.as_posix()) as a:
assert a.read((slice(None), slice(None))).to_numpy().tolist() == [
[1, 0],
[0, 1],
]

# read @ t=15 & see only the writes up til then
with soma.DenseNDArray.open(
tmp_path.as_posix(), context=SOMATileDBContext(read_timestamp=15)
) as a:
assert a.read((slice(0, 1), slice(0, 1))).to_numpy().tolist() == [
[1, 0],
[0, 0],
]

# read with (timestamp_start, timestamp_end) = (15, 25) & see only the t=20 write
with soma.DenseNDArray.open(
tmp_path.as_posix(),
context=SOMATileDBContext(read_timestamp_start=15, read_timestamp=25),
) as a:
F = 255 # fill value
assert a.read((slice(0, 1), slice(0, 1))).to_numpy().tolist() == [
[F, F],
[F, 1],
]
Loading

0 comments on commit a6ec3d1

Please sign in to comment.