Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c++] Refactor metadata and create to respect timestamps #2180

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions apis/python/src/tiledbsoma/soma_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ namespace py = pybind11;
using namespace py::literals;
using namespace tiledbsoma;

py::dict meta(SOMAArray& array) {
py::dict results;

for (auto [key, val] : array.get_metadata()) {
auto [tdb_type, value_num, value] = val;

if (tdb_type == TILEDB_STRING_UTF8 || tdb_type == TILEDB_STRING_ASCII) {
auto py_buf = py::array(py::dtype("|S1"), value_num, value);
auto res = py_buf.attr("tobytes")().attr("decode")("UTF-8");
results[py::str(key)] = res;
} else {
py::dtype value_type = tdb_to_np_dtype(tdb_type, 1);
auto res = py::array(value_type, value_num, value).attr("item")(0);
results[py::str(key)] = res;
}
}
return results;
}

py::tuple get_enum(SOMAArray& sr, std::string attr_name) {
auto attr_to_enmrs = sr.get_attr_to_enum_mapping();
if (attr_to_enmrs.count(attr_name) == 0)
Expand Down Expand Up @@ -644,34 +663,10 @@ void load_soma_array(py::module& m) {
"get_metadata",
py::overload_cast<const std::string&>(&SOMAArray::get_metadata))

.def_property_readonly(
"meta",
[](SOMAArray& soma_dataframe) -> py::dict {
py::dict results;

for (auto const& [key, val] : soma_dataframe.get_metadata()) {
tiledb_datatype_t tdb_type = std::get<MetadataInfo::dtype>(
val);
uint32_t value_num = std::get<MetadataInfo::num>(val);
const void* value = std::get<MetadataInfo::value>(val);

if (tdb_type == TILEDB_STRING_UTF8) {
results[py::str(key)] = py::str(
std::string((const char*)value, value_num));
} else if (tdb_type == TILEDB_STRING_ASCII) {
results[py::str(key)] = py::bytes(
std::string((const char*)value, value_num));
} else {
py::dtype value_type = tdb_to_np_dtype(tdb_type, 1);
results[py::str(key)] = py::array(
value_type, value_num, value);
}
}
return results;
})
.def_property_readonly("meta", meta)

.def("has_metadata", &SOMAArray::has_metadata)

.def("metadata_num", &SOMAArray::metadata_num);
}
} // namespace libtiledbsomacpp
} // namespace libtiledbsomacpp
144 changes: 92 additions & 52 deletions libtiledbsoma/src/soma/soma_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,40 @@ using namespace tiledb;
//= public static
//===================================================================

void SOMAArray::create(
std::unique_ptr<SOMAArray> SOMAArray::create(
std::shared_ptr<SOMAContext> ctx,
std::string_view uri,
ArraySchema schema,
std::string soma_type) {
std::string soma_type,
std::optional<TimestampRange> timestamp) {
Array::create(std::string(uri), schema);
auto array = Array(*ctx->tiledb_ctx(), std::string(uri), TILEDB_WRITE);
array.put_metadata(
"soma_object_type",

std::shared_ptr<Array> array;
if (timestamp) {
array = std::make_shared<Array>(
*ctx->tiledb_ctx(),
std::string(uri),
TILEDB_WRITE,
TemporalPolicy(
TimestampStartEnd, timestamp->first, timestamp->second));
} else {
array = std::make_shared<Array>(
*ctx->tiledb_ctx(), std::string(uri), TILEDB_WRITE);
}

array->put_metadata(
SOMA_OBJECT_TYPE_KEY,
TILEDB_STRING_UTF8,
static_cast<uint32_t>(soma_type.length()),
soma_type.c_str());
array.close();

array->put_metadata(
ENCODING_VERSION_KEY,
TILEDB_STRING_UTF8,
static_cast<uint32_t>(ENCODING_VERSION_VAL.length()),
ENCODING_VERSION_VAL.c_str());

return std::make_unique<SOMAArray>(ctx, array, timestamp);
}

std::unique_ptr<SOMAArray> SOMAArray::open(
Expand All @@ -64,7 +85,7 @@ std::unique_ptr<SOMAArray> SOMAArray::open(
std::vector<std::string> column_names,
std::string_view batch_size,
ResultOrder result_order,
std::optional<std::pair<uint64_t, uint64_t>> timestamp) {
std::optional<TimestampRange> timestamp) {
LOG_DEBUG(
fmt::format("[SOMAArray] static method 'cfg' opening array '{}'", uri));
return std::make_unique<SOMAArray>(
Expand All @@ -86,7 +107,7 @@ std::unique_ptr<SOMAArray> SOMAArray::open(
std::vector<std::string> column_names,
std::string_view batch_size,
ResultOrder result_order,
std::optional<std::pair<uint64_t, uint64_t>> timestamp) {
std::optional<TimestampRange> timestamp) {
LOG_DEBUG(
fmt::format("[SOMAArray] static method 'ctx' opening array '{}'", uri));
return std::make_unique<SOMAArray>(
Expand All @@ -112,7 +133,7 @@ SOMAArray::SOMAArray(
std::vector<std::string> column_names,
std::string_view batch_size,
ResultOrder result_order,
std::optional<std::pair<uint64_t, uint64_t>> timestamp)
std::optional<TimestampRange> timestamp)
: uri_(util::rstrip_uri(uri))
, result_order_(result_order)
, timestamp_(timestamp) {
Expand All @@ -130,7 +151,7 @@ SOMAArray::SOMAArray(
std::vector<std::string> column_names,
std::string_view batch_size,
ResultOrder result_order,
std::optional<std::pair<uint64_t, uint64_t>> timestamp)
std::optional<TimestampRange> timestamp)
: uri_(util::rstrip_uri(uri))
, ctx_(ctx)
, result_order_(result_order)
Expand All @@ -140,20 +161,39 @@ SOMAArray::SOMAArray(
fill_metadata_cache();
}

SOMAArray::SOMAArray(
std::shared_ptr<SOMAContext> ctx,
std::shared_ptr<Array> arr,
std::optional<TimestampRange> timestamp)
: uri_(util::rstrip_uri(arr->uri()))
, ctx_(ctx)
, batch_size_("auto")
, result_order_(ResultOrder::automatic)
, timestamp_(timestamp)
, mq_(std::make_unique<ManagedQuery>(arr, ctx_->tiledb_ctx(), name_))
, arr_(arr) {
reset({}, batch_size_, result_order_);
fill_metadata_cache();
}

void SOMAArray::fill_metadata_cache() {
std::shared_ptr<Array> array;
if (arr_->query_type() == TILEDB_WRITE) {
array = std::make_shared<Array>(*ctx_->tiledb_ctx(), uri_, TILEDB_READ);
meta_cache_arr_ = std::make_shared<Array>(
*ctx_->tiledb_ctx(),
uri_,
TILEDB_READ,
TemporalPolicy(
TimestampStartEnd, timestamp()->first, timestamp()->second));
} else {
array = arr_;
meta_cache_arr_ = arr_;
}

for (uint64_t idx = 0; idx < array->metadata_num(); ++idx) {
for (uint64_t idx = 0; idx < meta_cache_arr_->metadata_num(); ++idx) {
std::string key;
tiledb_datatype_t value_type;
uint32_t value_num;
const void* value;
array->get_metadata_from_index(
meta_cache_arr_->get_metadata_from_index(
idx, &key, &value_type, &value_num, &value);
MetadataValue mdval(value_type, value_num, value);
std::pair<std::string, const MetadataValue> mdpair(key, mdval);
Expand All @@ -169,26 +209,22 @@ std::shared_ptr<SOMAContext> SOMAArray::ctx() {
return ctx_;
};

void SOMAArray::open(
OpenMode mode, std::optional<std::pair<uint64_t, uint64_t>> timestamp) {
auto tdb_mode = mode == OpenMode::read ? TILEDB_READ : TILEDB_WRITE;
arr_->open(tdb_mode);
if (timestamp) {
if (timestamp->first > timestamp->second) {
throw std::invalid_argument("timestamp start > end");
}
arr_->set_open_timestamp_start(timestamp->first);
arr_->set_open_timestamp_end(timestamp->second);
arr_->close();
arr_->open(tdb_mode);
}
void SOMAArray::open(OpenMode mode, std::optional<TimestampRange> timestamp) {
timestamp_ = timestamp;

validate(mode, name_, timestamp);
reset(column_names(), batch_size_, result_order_);
fill_metadata_cache();
}

void SOMAArray::close() {
if (arr_->query_type() == TILEDB_WRITE)
meta_cache_arr_->close();

// Close the array through the managed query to ensure any pending queries
// are completed.
mq_->close();
metadata_.clear();
}

void SOMAArray::reset(
Expand Down Expand Up @@ -511,35 +547,44 @@ void SOMAArray::set_metadata(
tiledb_datatype_t value_type,
uint32_t value_num,
const void* value) {
if (key.compare("soma_object_type") == 0) {
throw TileDBSOMAError("soma_object_type cannot be modified.");
}
if (key.compare(SOMA_OBJECT_TYPE_KEY) == 0)
throw TileDBSOMAError(SOMA_OBJECT_TYPE_KEY + " cannot be modified.");

if (key.compare(ENCODING_VERSION_KEY) == 0)
throw TileDBSOMAError(ENCODING_VERSION_KEY + " cannot be modified.");

arr_->put_metadata(key, value_type, value_num, value);

MetadataValue mdval(value_type, value_num, value);
std::pair<std::string, const MetadataValue> mdpair(key, mdval);
metadata_.insert(mdpair);
}

void SOMAArray::delete_metadata(const std::string& key) {
if (key.compare("soma_object_type") == 0) {
throw TileDBSOMAError("soma_object_type cannot be deleted.");
if (key.compare(SOMA_OBJECT_TYPE_KEY) == 0) {
throw TileDBSOMAError(SOMA_OBJECT_TYPE_KEY + " cannot be deleted.");
}

if (key.compare(ENCODING_VERSION_KEY) == 0) {
throw TileDBSOMAError(ENCODING_VERSION_KEY + " cannot be deleted.");
}

arr_->delete_metadata(key);
metadata_.erase(key);
}

std::map<std::string, MetadataValue> SOMAArray::get_metadata() {
return metadata_;
}

std::optional<MetadataValue> SOMAArray::get_metadata(const std::string& key) {
if (metadata_.count(key) == 0) {
return std::nullopt;
}

return metadata_[key];
}

std::map<std::string, MetadataValue> SOMAArray::get_metadata() {
return metadata_;
}

bool SOMAArray::has_metadata(const std::string& key) {
return metadata_.count(key) != 0;
}
Expand All @@ -551,26 +596,21 @@ uint64_t SOMAArray::metadata_num() const {
void SOMAArray::validate(
OpenMode mode,
std::string_view name,
std::optional<std::pair<uint64_t, uint64_t>> timestamp) {
std::optional<TimestampRange> timestamp) {
// Validate parameters
auto tdb_mode = mode == OpenMode::read ? TILEDB_READ : TILEDB_WRITE;

try {
LOG_DEBUG(fmt::format("[SOMAArray] opening array '{}'", uri_));
arr_ = std::make_shared<Array>(*ctx_->tiledb_ctx(), uri_, tdb_mode);
if (timestamp) {
if (timestamp->first > timestamp->second) {
throw std::invalid_argument("timestamp start > end");
}
arr_->set_open_timestamp_start(timestamp->first);
arr_->set_open_timestamp_end(timestamp->second);
arr_->close();
arr_->open(tdb_mode);
LOG_DEBUG(fmt::format(
"[SOMAArray] timestamp_start = {}",
arr_->open_timestamp_start()));
LOG_DEBUG(fmt::format(
"[SOMAArray] timestamp_end = {}", arr_->open_timestamp_end()));
arr_ = std::make_shared<Array>(
*ctx_->tiledb_ctx(),
uri_,
tdb_mode,
TemporalPolicy(
TimestampStartEnd, timestamp->first, timestamp->second));
} else {
arr_ = std::make_shared<Array>(*ctx_->tiledb_ctx(), uri_, tdb_mode);
}
LOG_TRACE(fmt::format("[SOMAArray] loading enumerations"));
ArrayExperimental::load_all_enumerations(
Expand All @@ -582,7 +622,7 @@ void SOMAArray::validate(
}
}

std::optional<std::pair<uint64_t, uint64_t>> SOMAArray::timestamp() {
std::optional<TimestampRange> SOMAArray::timestamp() {
return timestamp_;
}

Expand Down
Loading
Loading