diff --git a/bluesky_kafka/__init__.py b/bluesky_kafka/__init__.py index a75ac11..218d5fd 100644 --- a/bluesky_kafka/__init__.py +++ b/bluesky_kafka/__init__.py @@ -603,3 +603,115 @@ def process_document(self, topic, name, doc): self.consumer.commit(asynchronous=False) return True + + +class RucioConsumer(BlueskyConsumer): + """ + Subclass of BlueskyConsumer that serializes documents to a msgpack file, + registers them with Rucio, adds a replication rule. + """ + + class SerializerFactory(dict): + """ + Like a defaultdict, but it makes a Serializer based on the + key, which in this case is the topic name. + """ + + def __init__(self, directory): + from suitcase import msgpack + self._directory = directory + + def get_database(self, topic): + return topic.replace(".", "-") + + def __missing__(self, topic): + result = self[topic] = msgpack.Serializer(self._directory) + return result + + + + def __init__(self, directory, *args, **kwargs): + self._serializers = self.SerializerFactory(directory) + super().__init__(*args, **kwargs) + + def _rucio_register(beamline, uid, filenames): + """ + Register the file in rucio for replication to SDCC. + """ + scope = beamline + container = uid + + replica_client = ReplicaClient() + didclient = DIDClient() + scopeclient = ScopeClient() + ruleclient = RuleClient() + + for root, ending, filename in filenames: + #size = os.stat(str(filename)).st_size + #adler = adler32(str(filename)) + files = [{'scope': scope, + 'name': filename.split('/')[-1], + 'bytes': 1000, + #'adler32': "unknown", + 'pfn': pfn + filename}] + + dataset = os.path.join(root, ending) + dataset = '.'.join(dataset.split('/')[1:-1]) + print("DATASET", dataset) + breakpoint() + try: + scopeclient.add_scope(account='nsls2data', scope=scope) + except rucio.common.exception.Duplicate: + pass + + replica_client.add_replicas(rse=rse, files=files) + + # Create a new container if it doesn't exist. + try: + didclient.add_did(scope=scope, name=uid, type='container') + except rucio.common.exception.DataIdentifierAlreadyExists: + pass + + # Create a replication rule. + try: + dids = [{'scope': scope, 'name': container}] + ruleclient.add_replication_rule(dids=dids, + copies=1, + rse_expression='SDCC', + lifetime=86400, # Seconds + account='nsls2data', + source_replica_expression='NSLS2', + purge_replicas=True, + comment='purge_replicas in 24 hours') + except rucio.common.exception.DuplicateRule: + pass + + # Create a new dataset if it doesn't exist. + try: + didclient.add_did(scope=scope, name=dataset, type='dataset') + except rucio.common.exception.DataIdentifierAlreadyExists: + pass + + attachment = {'scope': scope, 'name':uid, + 'dids':[{'scope': scope, 'name': dataset}]} + + try: + didclient.add_files_to_dataset(scope, dataset, files) + except rucio.common.exception.FileAlreadyExists: + pass + + try: + didclient.add_datasets_to_containers([attachment]) + except rucio.common.exception.DuplicateContent: + pass + + def process_document(self, topic, name, doc): + result_name, result_doc = self._serializers[topic](name, doc) + # TODO: Need to handle run that are missing a stop doc. + # TODO: Need to figure out how to get the filenames out of artifacts. + # file names should be a tuple (root, ending, filename) + if name == "stop": + self._register_rucio(topic.split('.')[0], doc['run_start'], self.serializers[topic].artifacts) + self.consumer.commit(asynchronous=False) + + return True diff --git a/bluesky_kafka/rucio_archiver_consumer.py b/bluesky_kafka/rucio_archiver_consumer.py new file mode 100644 index 0000000..7c2e592 --- /dev/null +++ b/bluesky_kafka/rucio_archiver_consumer.py @@ -0,0 +1,39 @@ +from functools import partial +import os + +import msgpack +import msgpack_numpy as mpn + +from bluesky_kafka import RucioConsumer + + +bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS") +if bootstrap_servers is None: + raise AttributeError("Environment variable KAFKA_BOOTSTRAP_SERVERS" + "must be set.") + + +kafka_deserializer = partial(msgpack.loads, object_hook=mpn.decode) +auto_offset_reset = "latest" +topics = ["^.*bluesky.documents"] +group_id = "rucio_archiver" + +# These are NSLS2 specific. +rse='NSLS2' +scope='nsls2' +dataset='bluesky-sdcc', +pfn='globus://' + +# Create a RucioConsumer that will automatically listen to new beamline topics. +# The parameter metadata.max.age.ms determines how often the consumer will check for +# new topics. The default value is 5000ms. +rucio_consumer = RucioConsumer( + topics=topics, + bootstrap_servers=bootstrap_servers, + group_id=group_id, + consumer_config={"auto.offset.reset": auto_offset_reset}, + polling_duration=1.0, + deserializer=kafka_deserializer, +) + +rucio_consumer.start()