Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: run tomo with provenance #259

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"viz_server",
"analysis_server",
"qoi_server",
"tomo_server",
]

entry_points = {
Expand Down
5 changes: 4 additions & 1 deletion xpdan/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from xpdan.io import pdf_saver, dump_yml
from xpdan.vend.callbacks.core import Retrieve
from xpdtools.dev_utils import _timestampstr
import mayavi.mlab as mlab
try:
import mayavi.mlab as mlab
except:
pass


class StartStopCallback(CallbackBase):
Expand Down
30 changes: 17 additions & 13 deletions xpdan/pipelines/to_event_model.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""Event Model mirror of xpdtools.pipelines.raw_pipeline meant to accept nodes
from the raw pipeline and convert them to Event Model"""
from shed.simple import SimpleToEventStream, AlignEventStreams
from shed.translation import DBFriendly
from xpdtools.tools import overlay_mask, splay_tuple


def to_event_stream_with_ind(raw_stripped, *nodes, publisher, **kwargs):
for node in nodes:
merge = AlignEventStreams(raw_stripped, node,
event_stream_name='primary')
merge = AlignEventStreams(
raw_stripped, node, event_stream_name="primary"
)
merge.starsink(publisher)
return locals()


def to_event_stream_no_ind(*nodes, publisher, **kwargs):
for node in nodes:
node.starsink(publisher)
node.DBFriendly().starsink(publisher)
return locals()


Expand All @@ -41,9 +43,11 @@ def gen_mask(mask, pol_corrected_img, **kwargs):
mask_tes = SimpleToEventStream(mask, ("mask",), analysis_stage="mask")

mask_overlay_tes = SimpleToEventStream(
# This is registered later than the mask creation, thus we can emit on
# This is registered later than the mask creation, thus we can emit on
# the img, since the mask is gaurenteed to exist
pol_corrected_img.combine_latest(mask, emit_on=0).starmap(overlay_mask),
pol_corrected_img.combine_latest(mask, emit_on=0).starmap(
overlay_mask
),
("mask_overlay",),
analysis_stage="mask_overlay",
)
Expand All @@ -53,15 +57,15 @@ def gen_mask(mask, pol_corrected_img, **kwargs):
def integration(mean, q, tth, std=None, median=None, **kwargs):
merge_names = ["mean"]
merge_streams = []
data_key_md = {'mean': {'units': 'intensity'}}
data_key_md = {"mean": {"units": "intensity"}}
if std:
merge_names += ["std"]
merge_streams += [std]
data_key_md.update({'std': {'units': 'std intensity'}})
data_key_md.update({"std": {"units": "std intensity"}})
if median:
merge_names += ["median"]
merge_streams += [median]
data_key_md.update({'std': {'units': 'intensity'}})
data_key_md.update({"std": {"units": "intensity"}})
if merge_streams:
merge = mean.zip(*merge_streams)
# need to splay so we have everything at the same level
Expand All @@ -73,15 +77,15 @@ def integration(mean, q, tth, std=None, median=None, **kwargs):
integration_merge = merge.combine_latest(q, tth, emit_on=0)

merge_names += ["q", "tth"]
data_key_md.update({'q': {'units': '1/A'}, 'tth': {'units': 'Degrees'}})
data_key_md.update({"q": {"units": "1/A"}, "tth": {"units": "Degrees"}})

integration_tes = SimpleToEventStream(
integration_merge,
merge_names,
analysis_stage="integration",
# TODO: might push q/tth into the same list
hints=dict(dimensions=[(["q"], "primary"), (["tth"], "primary")]),
data_key_md=data_key_md
data_key_md=data_key_md,
)
return locals()

Expand All @@ -92,22 +96,22 @@ def pdf_gen(fq, sq, pdf, **kwargs):
("q", "fq", "config"),
analysis_stage="fq",
hints=dict(dimensions=[(["q"], "primary")]),
data_key_md={'q': {'units': '1/A'}, 'fq': {'units': 'arb'}}
data_key_md={"q": {"units": "1/A"}, "fq": {"units": "arb"}},
)
sq_tes = SimpleToEventStream(
fq,
("q", "sq", "config"),
analysis_stage="sq",
hints=dict(dimensions=[(["q"], "primary")]),
data_key_md={'q': {'units': '1/A'}, 'sq': {'units': 'arb'}}
data_key_md={"q": {"units": "1/A"}, "sq": {"units": "arb"}},
)

pdf_tes = SimpleToEventStream(
pdf,
("r", "gr", "config"),
analysis_stage="pdf",
hints=dict(dimensions=[(["r"], "primary")]),
data_key_md={'r': {'units': 'A'}, 'gr': {'units': '1/A**2'}}
data_key_md={"r": {"units": "A"}, "gr": {"units": "1/A**2"}},
)
return locals()

Expand Down
44 changes: 21 additions & 23 deletions xpdan/pipelines/tomo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import operator as op

from rapidz import Stream, move_to_first
from shed import SimpleToEventStream, SimpleFromEventStream
from shed import ToEventStream, FromEventStream
from xpdan.vend.callbacks.core import StripDepVar
import numpy as np

Expand All @@ -22,63 +22,61 @@ def pencil_tomo(source: Stream, qoi_name, translation, rotation, **kwargs):
dict :
The namespace
"""
x = SimpleFromEventStream("event", ("data", translation), upstream=source)
th = SimpleFromEventStream("event", ("data", rotation), upstream=source)
x = FromEventStream("event", ("data", translation), upstream=source)
th = FromEventStream("event", ("data", rotation), upstream=source)

# Extract the index for the translation and rotation so we can
# extract the dimensions and extents
# TODO: turn into proper function
translation_position = SimpleFromEventStream(
translation_position = FromEventStream(
"start", ("motors",), upstream=source
).map(lambda x: x.index(translation))
rotation_position = SimpleFromEventStream(
rotation_position = FromEventStream(
"start", ("motors",), upstream=source
).map(lambda x: x.index(rotation))

dims = SimpleFromEventStream("start", ("shape",), upstream=source)
dims = FromEventStream("start", ("shape",), upstream=source)
th_dim = dims.zip(rotation_position).starmap(op.getitem)
x_dim = dims.zip(translation_position).starmap(op.getitem)

extents = SimpleFromEventStream("start", ("extents",), upstream=source)
extents = FromEventStream("start", ("extents",), upstream=source)
th_extents = extents.zip(rotation_position).starmap(op.getitem)
x_extents = extents.zip(translation_position).starmap(op.getitem)

qoi = SimpleFromEventStream(
qoi = FromEventStream(
"event", ("data", qoi_name), upstream=source, principle=True
)
center = SimpleFromEventStream(
"start", ("tomo", "center"), upstream=source
)
center = FromEventStream("start", ("tomo", "center"), upstream=source)
return locals()


def full_field_tomo(source: Stream, qoi_name, rotation, **kwargs):
theta = SimpleFromEventStream(
"event", ("data", rotation), upstream=source
).map(np.deg2rad)
theta = FromEventStream("event", ("data", rotation), upstream=source).map(
np.deg2rad
)

center = FromEventStream("start", ("tomo", "center"), upstream=source)

qoi = SimpleFromEventStream(
# QOI needs to be last so that we have all the translation nodes primed
# with start uids when we issue start documents
qoi = FromEventStream(
"event", ("data", qoi_name), upstream=source, principle=True
)
center = SimpleFromEventStream(
"start", ("tomo", "center"), upstream=source
)
return locals()


def tomo_event_stream(source, rec, sinogram, *, qoi_name, **kwargs):
raw_stripped = move_to_first(source.starmap(StripDepVar()))

rec_tes = SimpleToEventStream(
rec_tes = ToEventStream(
rec, (f"{qoi_name}_tomo",), analysis_stage="{}_tomo".format(qoi_name)
)
).LastCache()

# Don't run the sinogram for now, since it can produce issues with the viz
sinogram.map(np.shape).sink(print)
sinogram_tes = SimpleToEventStream(
sinogram_tes = ToEventStream(
sinogram,
(f"{qoi_name}_sinogram",),
analysis_stage="{}_sinogram".format(qoi_name),
)
).LastCache()

return locals()
42 changes: 42 additions & 0 deletions xpdan/startup/es_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import fire
from bluesky.utils import install_qt_kicker
from xpdan.vend.callbacks.core import RunRouter
from xpdan.vend.callbacks.zmq import RemoteDispatcher
from xpdconf.conf import glbl_dict
from databroker_elasticsearch import callback_from_name


def run_server(
prefix=None,
outbound_proxy_address=glbl_dict["outbound_proxy_address"],
server_name="xpd",
):
"""Start up the visualization server

Parameters
----------
prefix : bytes or list of bytes, optional
The Publisher channels to listen to. Defaults to
``[b"an", b"raw"]``
outbound_proxy_address : str, optional
The address and port of the zmq proxy. Defaults to
``glbl_dict["outbound_proxy_address"]``
server_name : str
The name of the elasticsearch server, this is used with
``databroker_elasticsearch.callback_from_name`` to produce the
elasticsearch callback
"""
# TODO: allow handing in of callbacks (for direct python use)
d = RemoteDispatcher(outbound_proxy_address, prefix=prefix)

d.subscribe(callback_from_name(server_name))
print("Starting Elastic Search Server")
d.start()


def run_main():
fire.Fire(run_server)


if __name__ == "__main__":
run_main()
9 changes: 3 additions & 6 deletions xpdan/startup/portable_db_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import os
from pprint import pprint

import fire
import yaml
Expand Down Expand Up @@ -63,8 +64,6 @@ def run_server(
# TODO: convert to bytestrings if needed
# TODO: maybe separate this into different processes?
# TODO: support multiple locations for folders
if prefix is None:
prefix = [b"an", b"raw"]
d = RemoteDispatcher(outbound_proxy_address, prefix=prefix)
portable_folder = folder
portable_configs = {}
Expand Down Expand Up @@ -97,6 +96,7 @@ def run_server(
),
NpyWriter,
)
zed.sink(pprint)
zed.starsink(an_broker.insert)

raw_broker = Broker.from_config(portable_configs["raw"])
Expand Down Expand Up @@ -124,10 +124,7 @@ def run_server(
]
+ [
lambda x: (lambda *nd: an_source.emit(nd))
if x.get("analysis_stage", None) == "pdf"
else None,
lambda x: (lambda *nd: an_source.emit(nd))
if x.get("analysis_stage", None) == "integration"
if x.get("analysis_stage", None) != "raw"
else None,
]
)
Expand Down
Loading