Skip to content

Commit

Permalink
WIP: run tomo with provenance
Browse files Browse the repository at this point in the history
  • Loading branch information
CJ-Wright committed Mar 10, 2019
1 parent 3c30fa8 commit e58b13e
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 47 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"viz_server",
"analysis_server",
"qoi_server",
"tomo_server",
]

entry_points = {
Expand Down
30 changes: 17 additions & 13 deletions xpdan/pipelines/to_event_model.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""Event Model mirror of xpdtools.pipelines.raw_pipeline meant to accept nodes
from the raw pipeline and convert them to Event Model"""
from shed.simple import SimpleToEventStream, AlignEventStreams
from shed.translation import DBFriendly
from xpdtools.tools import overlay_mask, splay_tuple


def to_event_stream_with_ind(raw_stripped, *nodes, publisher, **kwargs):
for node in nodes:
merge = AlignEventStreams(raw_stripped, node,
event_stream_name='primary')
merge = AlignEventStreams(
raw_stripped, node, event_stream_name="primary"
)
merge.starsink(publisher)
return locals()


def to_event_stream_no_ind(*nodes, publisher, **kwargs):
for node in nodes:
node.starsink(publisher)
node.DBFriendly().starsink(publisher)
return locals()


Expand All @@ -41,9 +43,11 @@ def gen_mask(mask, pol_corrected_img, **kwargs):
mask_tes = SimpleToEventStream(mask, ("mask",), analysis_stage="mask")

mask_overlay_tes = SimpleToEventStream(
# This is registered later than the mask creation, thus we can emit on
# This is registered later than the mask creation, thus we can emit on
# the img, since the mask is gaurenteed to exist
pol_corrected_img.combine_latest(mask, emit_on=0).starmap(overlay_mask),
pol_corrected_img.combine_latest(mask, emit_on=0).starmap(
overlay_mask
),
("mask_overlay",),
analysis_stage="mask_overlay",
)
Expand All @@ -53,15 +57,15 @@ def gen_mask(mask, pol_corrected_img, **kwargs):
def integration(mean, q, tth, std=None, median=None, **kwargs):
merge_names = ["mean"]
merge_streams = []
data_key_md = {'mean': {'units': 'intensity'}}
data_key_md = {"mean": {"units": "intensity"}}
if std:
merge_names += ["std"]
merge_streams += [std]
data_key_md.update({'std': {'units': 'std intensity'}})
data_key_md.update({"std": {"units": "std intensity"}})
if median:
merge_names += ["median"]
merge_streams += [median]
data_key_md.update({'std': {'units': 'intensity'}})
data_key_md.update({"std": {"units": "intensity"}})
if merge_streams:
merge = mean.zip(*merge_streams)
# need to splay so we have everything at the same level
Expand All @@ -73,15 +77,15 @@ def integration(mean, q, tth, std=None, median=None, **kwargs):
integration_merge = merge.combine_latest(q, tth, emit_on=0)

merge_names += ["q", "tth"]
data_key_md.update({'q': {'units': '1/A'}, 'tth': {'units': 'Degrees'}})
data_key_md.update({"q": {"units": "1/A"}, "tth": {"units": "Degrees"}})

integration_tes = SimpleToEventStream(
integration_merge,
merge_names,
analysis_stage="integration",
# TODO: might push q/tth into the same list
hints=dict(dimensions=[(["q"], "primary"), (["tth"], "primary")]),
data_key_md=data_key_md
data_key_md=data_key_md,
)
return locals()

Expand All @@ -92,22 +96,22 @@ def pdf_gen(fq, sq, pdf, **kwargs):
("q", "fq", "config"),
analysis_stage="fq",
hints=dict(dimensions=[(["q"], "primary")]),
data_key_md={'q': {'units': '1/A'}, 'fq': {'units': 'arb'}}
data_key_md={"q": {"units": "1/A"}, "fq": {"units": "arb"}},
)
sq_tes = SimpleToEventStream(
fq,
("q", "sq", "config"),
analysis_stage="sq",
hints=dict(dimensions=[(["q"], "primary")]),
data_key_md={'q': {'units': '1/A'}, 'sq': {'units': 'arb'}}
data_key_md={"q": {"units": "1/A"}, "sq": {"units": "arb"}},
)

pdf_tes = SimpleToEventStream(
pdf,
("r", "gr", "config"),
analysis_stage="pdf",
hints=dict(dimensions=[(["r"], "primary")]),
data_key_md={'r': {'units': 'A'}, 'gr': {'units': '1/A**2'}}
data_key_md={"r": {"units": "A"}, "gr": {"units": "1/A**2"}},
)
return locals()

Expand Down
36 changes: 16 additions & 20 deletions xpdan/pipelines/tomo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import operator as op

from rapidz import Stream, move_to_first
from shed import SimpleToEventStream, SimpleFromEventStream
from shed import ToEventStream, FromEventStream
from xpdan.vend.callbacks.core import StripDepVar
import numpy as np

Expand All @@ -22,60 +22,56 @@ def pencil_tomo(source: Stream, qoi_name, translation, rotation, **kwargs):
dict :
The namespace
"""
x = SimpleFromEventStream("event", ("data", translation), upstream=source)
th = SimpleFromEventStream("event", ("data", rotation), upstream=source)
x = FromEventStream("event", ("data", translation), upstream=source)
th = FromEventStream("event", ("data", rotation), upstream=source)

# Extract the index for the translation and rotation so we can
# extract the dimensions and extents
# TODO: turn into proper function
translation_position = SimpleFromEventStream(
translation_position = FromEventStream(
"start", ("motors",), upstream=source
).map(lambda x: x.index(translation))
rotation_position = SimpleFromEventStream(
rotation_position = FromEventStream(
"start", ("motors",), upstream=source
).map(lambda x: x.index(rotation))

dims = SimpleFromEventStream("start", ("shape",), upstream=source)
dims = FromEventStream("start", ("shape",), upstream=source)
th_dim = dims.zip(rotation_position).starmap(op.getitem)
x_dim = dims.zip(translation_position).starmap(op.getitem)

extents = SimpleFromEventStream("start", ("extents",), upstream=source)
extents = FromEventStream("start", ("extents",), upstream=source)
th_extents = extents.zip(rotation_position).starmap(op.getitem)
x_extents = extents.zip(translation_position).starmap(op.getitem)

qoi = SimpleFromEventStream(
qoi = FromEventStream(
"event", ("data", qoi_name), upstream=source, principle=True
)
center = SimpleFromEventStream(
"start", ("tomo", "center"), upstream=source
)
center = FromEventStream("start", ("tomo", "center"), upstream=source)
return locals()


def full_field_tomo(source: Stream, qoi_name, rotation, **kwargs):
theta = SimpleFromEventStream(
"event", ("data", rotation), upstream=source
).map(np.deg2rad)
theta = FromEventStream("event", ("data", rotation), upstream=source).map(
np.deg2rad
)

qoi = SimpleFromEventStream(
qoi = FromEventStream(
"event", ("data", qoi_name), upstream=source, principle=True
)
center = SimpleFromEventStream(
"start", ("tomo", "center"), upstream=source
)
center = FromEventStream("start", ("tomo", "center"), upstream=source)
return locals()


def tomo_event_stream(source, rec, sinogram, *, qoi_name, **kwargs):
raw_stripped = move_to_first(source.starmap(StripDepVar()))

rec_tes = SimpleToEventStream(
rec_tes = ToEventStream(
rec, (f"{qoi_name}_tomo",), analysis_stage="{}_tomo".format(qoi_name)
)

# Don't run the sinogram for now, since it can produce issues with the viz
sinogram.map(np.shape).sink(print)
sinogram_tes = SimpleToEventStream(
sinogram_tes = ToEventStream(
sinogram,
(f"{qoi_name}_sinogram",),
analysis_stage="{}_sinogram".format(qoi_name),
Expand Down
43 changes: 43 additions & 0 deletions xpdan/startup/es_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import fire
from bluesky.utils import install_qt_kicker
from xpdan.vend.callbacks.core import RunRouter
from xpdan.vend.callbacks.zmq import RemoteDispatcher
from xpdconf.conf import glbl_dict
from databroker_elasticsearch import callback_from_name


def run_server(
prefix=None,
outbound_proxy_address=glbl_dict["outbound_proxy_address"],
server_name="xpd",
):
"""Start up the visualization server
Parameters
----------
prefix : bytes or list of bytes, optional
The Publisher channels to listen to. Defaults to
``[b"an", b"raw"]``
outbound_proxy_address : str, optional
The address and port of the zmq proxy. Defaults to
``glbl_dict["outbound_proxy_address"]``
server_name : str
The name of the elasticsearch server, this is used with
``databroker_elasticsearch.callback_from_name`` to produce the
elasticsearch callback
"""
# TODO: allow handing in of callbacks (for direct python use)
d = RemoteDispatcher(outbound_proxy_address, prefix=prefix)
install_qt_kicker(loop=d.loop)

d.subscribe(callback_from_name(server_name))
print("Starting Elastic Search Server")
d.start()


def run_main():
fire.Fire(run_server)


if __name__ == "__main__":
run_main()
9 changes: 3 additions & 6 deletions xpdan/startup/portable_db_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import os
from pprint import pprint

import fire
import yaml
Expand Down Expand Up @@ -63,8 +64,6 @@ def run_server(
# TODO: convert to bytestrings if needed
# TODO: maybe separate this into different processes?
# TODO: support multiple locations for folders
if prefix is None:
prefix = [b"an", b"raw"]
d = RemoteDispatcher(outbound_proxy_address, prefix=prefix)
portable_folder = folder
portable_configs = {}
Expand Down Expand Up @@ -97,6 +96,7 @@ def run_server(
),
NpyWriter,
)
zed.sink(pprint)
zed.starsink(an_broker.insert)

raw_broker = Broker.from_config(portable_configs["raw"])
Expand Down Expand Up @@ -124,10 +124,7 @@ def run_server(
]
+ [
lambda x: (lambda *nd: an_source.emit(nd))
if x.get("analysis_stage", None) == "pdf"
else None,
lambda x: (lambda *nd: an_source.emit(nd))
if x.get("analysis_stage", None) == "integration"
if x.get("analysis_stage", None) != "raw"
else None,
]
)
Expand Down
23 changes: 17 additions & 6 deletions xpdan/startup/tomo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from bluesky.utils import install_qt_kicker
from rapidz import Stream, move_to_first
from rapidz.link import link
from xpdan.pipelines.pipeline_utils import Filler
from xpdan.pipelines.to_event_model import (
to_event_stream_no_ind,
to_event_stream_with_ind,
Expand All @@ -15,7 +16,7 @@
full_field_tomo,
)
from xpdan.vend.callbacks import CallbackBase
from xpdan.vend.callbacks.core import RunRouter
from xpdan.vend.callbacks.core import RunRouter, Retrieve
from xpdan.vend.callbacks.zmq import Publisher, RemoteDispatcher
from xpdconf.conf import glbl_dict
from xpdtools.pipelines.tomo import (
Expand All @@ -34,6 +35,7 @@
full_field_order = [full_field_tomo, tomo_pipeline_theta, tomo_event_stream]


# TODO: pass sources through Retrieve/Filler
class PencilTomoCallback(CallbackBase):
"""This class caches and passes documents into the pencil tomography
pipeline.
Expand Down Expand Up @@ -114,7 +116,7 @@ def stop(self, doc):
# Need to destroy pipeline


class FullFieldTomoCallback(CallbackBase):
class FullFieldTomoCallback(Retrieve):
"""This class caches and passes documents into the pencil tomography
pipeline.
Expand All @@ -123,7 +125,9 @@ class FullFieldTomoCallback(CallbackBase):
This class acts as a descriptor router for documents"""

def __init__(self, pipeline_factory, publisher, **kwargs):
def __init__(self, pipeline_factory, publisher, handler_reg,
root_map=None, executor=None, **kwargs):
super().__init__(handler_reg, root_map, executor)
self.pipeline_factory = pipeline_factory
self.publisher = publisher

Expand All @@ -134,6 +138,7 @@ def __init__(self, pipeline_factory, publisher, **kwargs):
self.kwargs = kwargs

def start(self, doc):
super().start(doc)
self.start_doc = doc
self.dim_names = [
d[0][0]
Expand Down Expand Up @@ -161,7 +166,8 @@ def descriptor(self, doc):
self.sources = [Stream(stream_name=str(qoi)) for qoi in qois]
pipelines = [
self.pipeline_factory(
source=s, qoi_name=qoi, rotation=self.rotation, **self.kwargs
source=s,
qoi_name=qoi, rotation=self.rotation, **self.kwargs
)
for s, qoi in zip(self.sources, qois)
]
Expand All @@ -175,6 +181,7 @@ def descriptor(self, doc):
s.emit(("descriptor", doc))

def event(self, doc):
doc = super().event(doc)
for s in self.sources:
s.emit(("event", doc))

Expand All @@ -184,7 +191,7 @@ def stop(self, doc):
# Need to destroy pipeline


def tomo_callback_factory(doc, publisher, **kwargs):
def tomo_callback_factory(doc, publisher, handler_reg, **kwargs):
# TODO: Eventually extract from plan hints?
if doc.get("tomo", {}).get("type", None) == "pencil":
return PencilTomoCallback(
Expand All @@ -196,6 +203,7 @@ def tomo_callback_factory(doc, publisher, **kwargs):
return FullFieldTomoCallback(
lambda **inner_kwargs: link(*full_field_order, **inner_kwargs),
publisher,
handler_reg=handler_reg,
**kwargs,
)

Expand Down Expand Up @@ -228,10 +236,13 @@ def run_server(
"""
print(kwargs)
db = glbl_dict['exp_db']
handler_reg = db.reg.handler_reg
publisher = Publisher(inbound_proxy_address, prefix=inbound_prefix)

rr = RunRouter(
[lambda x: tomo_callback_factory(x, publisher=publisher, **kwargs)]
[lambda x: tomo_callback_factory(x, publisher=publisher,
handler_reg=handler_reg, **kwargs)]
)

d = RemoteDispatcher(outbound_proxy_address, prefix=outbound_prefix)
Expand Down
1 change: 1 addition & 0 deletions xpdan/startup/viz_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def run_server(
cmap="viridis",
norm=SymLogNorm(1),
limit_func=lambda x: (np.nanmin(x), np.nanmax(x)),
aspect='auto',
),
x,
),
Expand Down
Loading

0 comments on commit e58b13e

Please sign in to comment.