Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rucio consumer #22

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions bluesky_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,115 @@ def process_document(self, topic, name, doc):
self.consumer.commit(asynchronous=False)

return True


class RucioConsumer(BlueskyConsumer):
"""
Subclass of BlueskyConsumer that serializes documents to a msgpack file,
registers them with Rucio, adds a replication rule.
"""

class SerializerFactory(dict):
"""
Like a defaultdict, but it makes a Serializer based on the
key, which in this case is the topic name.
"""

def __init__(self, directory):
from suitcase import msgpack
self._directory = directory

def get_database(self, topic):
return topic.replace(".", "-")

def __missing__(self, topic):
result = self[topic] = msgpack.Serializer(self._directory)
return result



def __init__(self, directory, *args, **kwargs):
self._serializers = self.SerializerFactory(directory)
super().__init__(*args, **kwargs)

def _rucio_register(beamline, uid, filenames):
"""
Register the file in rucio for replication to SDCC.
"""
scope = beamline
container = uid

replica_client = ReplicaClient()
didclient = DIDClient()
scopeclient = ScopeClient()
ruleclient = RuleClient()

for root, ending, filename in filenames:
#size = os.stat(str(filename)).st_size
#adler = adler32(str(filename))
files = [{'scope': scope,
'name': filename.split('/')[-1],
'bytes': 1000,
#'adler32': "unknown",
'pfn': pfn + filename}]

dataset = os.path.join(root, ending)
dataset = '.'.join(dataset.split('/')[1:-1])
print("DATASET", dataset)
breakpoint()
try:
scopeclient.add_scope(account='nsls2data', scope=scope)
except rucio.common.exception.Duplicate:
pass

replica_client.add_replicas(rse=rse, files=files)

# Create a new container if it doesn't exist.
try:
didclient.add_did(scope=scope, name=uid, type='container')
except rucio.common.exception.DataIdentifierAlreadyExists:
pass

# Create a replication rule.
try:
dids = [{'scope': scope, 'name': container}]
ruleclient.add_replication_rule(dids=dids,
copies=1,
rse_expression='SDCC',
lifetime=86400, # Seconds
account='nsls2data',
source_replica_expression='NSLS2',
purge_replicas=True,
comment='purge_replicas in 24 hours')
except rucio.common.exception.DuplicateRule:
pass

# Create a new dataset if it doesn't exist.
try:
didclient.add_did(scope=scope, name=dataset, type='dataset')
except rucio.common.exception.DataIdentifierAlreadyExists:
pass

attachment = {'scope': scope, 'name':uid,
'dids':[{'scope': scope, 'name': dataset}]}

try:
didclient.add_files_to_dataset(scope, dataset, files)
except rucio.common.exception.FileAlreadyExists:
pass

try:
didclient.add_datasets_to_containers([attachment])
except rucio.common.exception.DuplicateContent:
pass

def process_document(self, topic, name, doc):
result_name, result_doc = self._serializers[topic](name, doc)
# TODO: Need to handle run that are missing a stop doc.
# TODO: Need to figure out how to get the filenames out of artifacts.
# file names should be a tuple (root, ending, filename)
if name == "stop":
self._register_rucio(topic.split('.')[0], doc['run_start'], self.serializers[topic].artifacts)
self.consumer.commit(asynchronous=False)

return True
39 changes: 39 additions & 0 deletions bluesky_kafka/rucio_archiver_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from functools import partial
import os

import msgpack
import msgpack_numpy as mpn

from bluesky_kafka import RucioConsumer


bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS")
if bootstrap_servers is None:
raise AttributeError("Environment variable KAFKA_BOOTSTRAP_SERVERS"
"must be set.")


kafka_deserializer = partial(msgpack.loads, object_hook=mpn.decode)
auto_offset_reset = "latest"
topics = ["^.*bluesky.documents"]
group_id = "rucio_archiver"

# These are NSLS2 specific.
rse='NSLS2'
scope='nsls2'
dataset='bluesky-sdcc',
pfn='globus://'

# Create a RucioConsumer that will automatically listen to new beamline topics.
# The parameter metadata.max.age.ms determines how often the consumer will check for
# new topics. The default value is 5000ms.
rucio_consumer = RucioConsumer(
topics=topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
consumer_config={"auto.offset.reset": auto_offset_reset},
polling_duration=1.0,
deserializer=kafka_deserializer,
)

rucio_consumer.start()