From 17ef7a0a3e02e9154e446dc3020dce8273a8113f Mon Sep 17 00:00:00 2001 From: Juan Marulanda Date: Thu, 14 Dec 2023 15:08:37 -0500 Subject: [PATCH] Added optional parameter to adapter that defines a validate shape method --- databroker/mongo_normalized.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/databroker/mongo_normalized.py b/databroker/mongo_normalized.py index 06af7462a..814172db3 100644 --- a/databroker/mongo_normalized.py +++ b/databroker/mongo_normalized.py @@ -851,7 +851,7 @@ def populate_columns(keys, min_seq_num, max_seq_num): if expected_shape and (not is_external): validated_column = list( map( - lambda item: _validate_shape( + lambda item: default_validate_shape( key, numpy.asarray(item), expected_shape ), result[key], @@ -936,7 +936,7 @@ def populate_columns(keys, min_seq_num, max_seq_num): last_datum_id=None, ) filled_data = filled_mock_event["data"][key] - validated_filled_data = _validate_shape( + validated_filled_data = default_validate_shape( key, filled_data, expected_shape ) filled_column.append(validated_filled_data) @@ -1047,6 +1047,7 @@ def from_uri( access_policy=None, cache_ttl_complete=60, # seconds cache_ttl_partial=2, # seconds + validate_shape=None ): """ Create a MongoAdapter from MongoDB with the "normalized" (original) layout. @@ -1094,6 +1095,9 @@ def from_uri( cache_ttl_complete : float Time (in seconds) to cache a *complete* BlueskyRun before checking the database for updates. Default 60. + validate_shape: func + function that will be used to validate that the shape of the data matches + the shape in the descriptor document """ metadatastore_db = _get_database(uri) if asset_registry_uri is None: @@ -1122,6 +1126,7 @@ def from_uri( cache_of_partial_bluesky_runs=cache_of_partial_bluesky_runs, metadata=metadata, access_policy=access_policy, + validate_shape=validate_shape, ) @classmethod @@ -1135,6 +1140,7 @@ def from_mongomock( access_policy=None, cache_ttl_complete=60, # seconds cache_ttl_partial=2, # seconds + validate_shape=None ): """ Create a transient MongoAdapter from backed by "mongomock". @@ -1178,6 +1184,9 @@ def from_mongomock( cache_ttl_complete : float Time (in seconds) to cache a *complete* BlueskyRun before checking the database for updates. Default 60. + validate_shape: func + function that will be used to validate that the shape of the data matches + the shape in the descriptor document """ import mongomock @@ -1205,6 +1214,7 @@ def from_mongomock( cache_of_partial_bluesky_runs=cache_of_partial_bluesky_runs, metadata=metadata, access_policy=access_policy, + validate_shape=validate_shape, ) def __init__( @@ -1220,6 +1230,7 @@ def __init__( queries=None, sorting=None, access_policy=None, + validate_shape=None, ): "This is not user-facing. Use MongoAdapter.from_uri." self._run_start_collection = metadatastore_db.get_collection("run_start") @@ -1249,6 +1260,11 @@ def __init__( self._sorting = sorting self.access_policy = access_policy self._serializer = None + if validate_shape is None: + validate_shape = default_validate_shape + elif isinstance(validate_shape, str): + validate_shape = import_object(validate_shape) + self.validate_shape=validate_shape super().__init__() @property @@ -2095,7 +2111,7 @@ class BadShapeMetadata(Exception): pass -def _validate_shape(key, data, expected_shape): +def default_validate_shape(key, data, expected_shape): """ Check that data.shape == expected.shape.