Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review StreamResource and StreamDatum Schemas #301

Merged
merged 17 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions docs/user/explanations/data-model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,9 @@ Typical example:
# 'Stream Resource' document
{'data_key': 'detector_1',
'spec': 'AD_HDF5',
'root': '/GPFS/DATA/Andor/',
'resource_path': '2020/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5',
'resource_kwargs': {'frame_per_point': 1},
'path_semantics': 'posix',
'mimetype': 'application/x-hdf5',
'uri': 'file://localhost/GPFS/DATA/Andor/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5',
'parameters': {'frame_per_point': 1},
'uid': '3b300e6f-b431-4750-a635-5630d15c81a8',
'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'}
Expand Down
24 changes: 24 additions & 0 deletions docs/user/explanations/external.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,30 @@ The ``spec`` gives us a hint about the format of this asset, whether it be a
file, multiple files, or something more specialized. The ``resource_kwargs``
provide any additional parameters for reading it.

.. code:: python
# 'Stream Resource' document
{'uid': 'aa10035d-1d2b-41d9-97e6-03e3fe62fa6c',
'mimetype': 'application/x-hdf5',
'uri': 'file://localhost/{path}/GPFS/DATA/Andor/2020/01/03/8ff08ff9.h5',
'parameters': {'frame_per_point': 10},
'uid': '3b300e6f-b431-4750-a635-5630d15c81a8',
'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'}
The ``uri`` specifies the location of the data. It may be a path on the local
filesystem, `file://localhost/{path}`, a path on a shared filesystem
`file://{host}/{path}`, to be remapped at read time via local mount config,
or a non-file-based resource like `s3://...`. The `{path}` part of the `uri`
is typically a relative path, all of which is semantic and should usually not
change during the lifecycle of this asset.

The ``mimetype`` is a recognized standard way to specify the I/O procedures to
read the asset. It gives us a hint about the format of this asset, whether it
be a file, multiple files, or something more specialized. We support standard
mimetypes, such as `image/tiff`, as well as custom ones, e.g.
`application/x-hdf5-smwr-slice`. The ``parameters`` provide any additional
parameters for reading the asset.

Handlers
========

Expand Down
32 changes: 12 additions & 20 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2047,12 +2047,10 @@ class ComposeStreamResource:

def __call__(
self,
spec: str,
root: str,
resource_path: str,
mimetype: str,
uri: str,
data_key: str,
resource_kwargs: Dict[str, Any],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
parameters: Dict[str, Any],
uid: Optional[str] = None,
validate: bool = True,
) -> ComposeStreamResourceBundle:
Expand All @@ -2062,11 +2060,9 @@ def __call__(
doc = StreamResource(
uid=uid,
data_key=data_key,
spec=spec,
root=root,
resource_path=resource_path,
resource_kwargs=resource_kwargs,
path_semantics=path_semantics,
mimetype=mimetype,
uri=uri,
parameters=parameters,
)

if self.start:
Expand All @@ -2086,12 +2082,10 @@ def __call__(

def compose_stream_resource(
*,
spec: str,
root: str,
resource_path: str,
mimetype: str,
uri: str,
data_key: str,
resource_kwargs: Dict[str, Any],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
parameters: Dict[str, Any],
start: Optional[RunStart] = None,
uid: Optional[str] = None,
validate: bool = True,
Expand All @@ -2100,12 +2094,10 @@ def compose_stream_resource(
Here for backwards compatibility, the Compose class is prefered.
"""
return ComposeStreamResource(start=start)(
spec,
root,
resource_path,
mimetype,
uri,
data_key,
resource_kwargs,
path_semantics=path_semantics,
parameters,
uid=uid,
validate=validate,
)
Expand Down
24 changes: 5 additions & 19 deletions event_model/documents/stream_resource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Dict

from typing_extensions import Annotated, Literal, NotRequired, TypedDict
from typing_extensions import Annotated, NotRequired, TypedDict

from .generate.type_wrapper import Field, add_extra_schema

Expand All @@ -14,35 +14,21 @@ class StreamResource(TypedDict):
externally-stored data streams
"""

path_semantics: NotRequired[
Annotated[
Literal["posix", "windows"],
Field(description="Rules for joining paths"),
]
]
data_key: Annotated[
str,
Field(
description="A string to show which data_key of the "
"Descriptor are being streamed"
),
]
resource_kwargs: Annotated[
parameters: Annotated[
Dict[str, Any],
Field(
description="Additional argument to pass to the Handler to read a "
description="Additional keyword arguments to pass to the Handler to read a "
"Stream Resource",
),
]
resource_path: Annotated[
str, Field(description="Filepath or URI for locating this resource")
]
root: Annotated[
str,
Field(
description="Subset of resource_path that is a local detail, not semantic."
),
]
uri: Annotated[str, Field(description="URI for locating this resource")]
run_start: NotRequired[
Annotated[
str,
Expand All @@ -52,7 +38,7 @@ class StreamResource(TypedDict):
),
]
]
spec: Annotated[
mimetype: Annotated[
str,
Field(
description="String identifying the format/type of this Stream Resource, "
Expand Down
47 changes: 16 additions & 31 deletions event_model/schemas/stream_resource.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,38 @@
"description": "A string to show which data_key of the Descriptor are being streamed",
"type": "string"
},
"path_semantics": {
"title": "Path Semantics",
"description": "Rules for joining paths",
"type": "string",
"enum": [
"posix",
"windows"
]
},
"resource_kwargs": {
"title": "Resource Kwargs",
"description": "Additional argument to pass to the Handler to read a Stream Resource",
"type": "object"
},
"resource_path": {
"title": "Resource Path",
"description": "Filepath or URI for locating this resource",
"mimetype": {
"title": "Mimetype",
"description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler",
"type": "string"
},
"root": {
"title": "Root",
"description": "Subset of resource_path that is a local detail, not semantic.",
"type": "string"
"parameters": {
"title": "Parameters",
"description": "Additional keyword arguments to pass to the Handler to read a Stream Resource",
"type": "object"
},
"run_start": {
"title": "Run Start",
"description": "Globally unique ID to the run_start document this Stream Resource is associated with.",
"type": "string"
},
"spec": {
"title": "Spec",
"description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler",
"type": "string"
},
"uid": {
"title": "Uid",
"description": "Globally unique identifier for this Stream Resource",
"type": "string"
},
"uri": {
"title": "Uri",
"description": "URI for locating this resource",
"type": "string"
}
},
"required": [
"data_key",
"resource_kwargs",
"resource_path",
"root",
"spec",
"uid"
"mimetype",
"parameters",
"uid",
"uri"
],
"additionalProperties": false
}
14 changes: 6 additions & 8 deletions event_model/tests/test_em.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,10 @@ def test_compose_stream_resource(tmp_path):
compose_stream_resource = bundle.compose_stream_resource
assert bundle.compose_stream_resource is compose_stream_resource
bundle = compose_stream_resource(
spec="TIFF_STREAM",
root=str(tmp_path),
mimetype="image/tiff",
uri="file://localhost" + str(tmp_path) + "/test_streams",
data_key="det1",
resource_path="test_streams",
resource_kwargs={},
parameters={},
)
resource_doc, compose_stream_datum = bundle
assert bundle.stream_resource_doc is resource_doc
Expand Down Expand Up @@ -388,11 +387,10 @@ def test_document_router_streams_smoke_test(tmp_path):
start = run_bundle.start_doc
dr("start", start)
stream_resource_doc, compose_stream_datum = compose_stream_resource(
spec="TIFF_STREAM",
mimetype="image/tiff",
data_key="det1",
root=str(tmp_path),
resource_path="test_streams",
resource_kwargs={},
uri="file://localhost" + str(tmp_path) + "/test_streams",
parameters={},
)
dr("stream_resource", stream_resource_doc)
datum_doc = compose_stream_datum(
Expand Down
7 changes: 3 additions & 4 deletions event_model/tests/test_run_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,10 @@ def test_run_router_streams(tmp_path):
)
docs.append(("start", start_doc))
stream_resource_doc, compose_stream_datum = compose_stream_resource(
spec="TIFF_STREAM",
mimetype="image/tiff",
data_key="det1",
root=str(tmp_path),
resource_path="test_streams",
resource_kwargs={},
uri="file://localhost" + str(tmp_path) + "/test_streams",
parameters={},
)
docs.append(("stream_resource", stream_resource_doc))
datum_doc = compose_stream_datum(
Expand Down
Loading