Skip to content

Commit

Permalink
Added optional parameter to adapter that defines a validate shape method
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaruland committed Dec 14, 2023
1 parent 8a2d873 commit 17ef7a0
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions databroker/mongo_normalized.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ def populate_columns(keys, min_seq_num, max_seq_num):
if expected_shape and (not is_external):
validated_column = list(
map(
lambda item: _validate_shape(
lambda item: default_validate_shape(
key, numpy.asarray(item), expected_shape
),
result[key],
Expand Down Expand Up @@ -936,7 +936,7 @@ def populate_columns(keys, min_seq_num, max_seq_num):
last_datum_id=None,
)
filled_data = filled_mock_event["data"][key]
validated_filled_data = _validate_shape(
validated_filled_data = default_validate_shape(
key, filled_data, expected_shape
)
filled_column.append(validated_filled_data)
Expand Down Expand Up @@ -1047,6 +1047,7 @@ def from_uri(
access_policy=None,
cache_ttl_complete=60, # seconds
cache_ttl_partial=2, # seconds
validate_shape=None
):
"""
Create a MongoAdapter from MongoDB with the "normalized" (original) layout.
Expand Down Expand Up @@ -1094,6 +1095,9 @@ def from_uri(
cache_ttl_complete : float
Time (in seconds) to cache a *complete* BlueskyRun before checking
the database for updates. Default 60.
validate_shape: func
function that will be used to validate that the shape of the data matches
the shape in the descriptor document
"""
metadatastore_db = _get_database(uri)
if asset_registry_uri is None:
Expand Down Expand Up @@ -1122,6 +1126,7 @@ def from_uri(
cache_of_partial_bluesky_runs=cache_of_partial_bluesky_runs,
metadata=metadata,
access_policy=access_policy,
validate_shape=validate_shape,
)

@classmethod
Expand All @@ -1135,6 +1140,7 @@ def from_mongomock(
access_policy=None,
cache_ttl_complete=60, # seconds
cache_ttl_partial=2, # seconds
validate_shape=None
):
"""
Create a transient MongoAdapter from backed by "mongomock".
Expand Down Expand Up @@ -1178,6 +1184,9 @@ def from_mongomock(
cache_ttl_complete : float
Time (in seconds) to cache a *complete* BlueskyRun before checking
the database for updates. Default 60.
validate_shape: func
function that will be used to validate that the shape of the data matches
the shape in the descriptor document
"""
import mongomock

Expand Down Expand Up @@ -1205,6 +1214,7 @@ def from_mongomock(
cache_of_partial_bluesky_runs=cache_of_partial_bluesky_runs,
metadata=metadata,
access_policy=access_policy,
validate_shape=validate_shape,
)

def __init__(
Expand All @@ -1220,6 +1230,7 @@ def __init__(
queries=None,
sorting=None,
access_policy=None,
validate_shape=None,
):
"This is not user-facing. Use MongoAdapter.from_uri."
self._run_start_collection = metadatastore_db.get_collection("run_start")
Expand Down Expand Up @@ -1249,6 +1260,11 @@ def __init__(
self._sorting = sorting
self.access_policy = access_policy
self._serializer = None
if validate_shape is None:
validate_shape = default_validate_shape
elif isinstance(validate_shape, str):
validate_shape = import_object(validate_shape)
self.validate_shape=validate_shape
super().__init__()

@property
Expand Down Expand Up @@ -2095,7 +2111,7 @@ class BadShapeMetadata(Exception):
pass


def _validate_shape(key, data, expected_shape):
def default_validate_shape(key, data, expected_shape):
"""
Check that data.shape == expected.shape.
Expand Down

0 comments on commit 17ef7a0

Please sign in to comment.