Skip to content

Commit

Permalink
Merge pull request #67 from Leibniz-HBI/records-one-time-payloads
Browse files Browse the repository at this point in the history
feat: Records discard their payload once it was accessed.
  • Loading branch information
pekasen authored Dec 5, 2024
2 parents c19506e + 729c36b commit c4d3051
Showing 1 changed file with 41 additions and 15 deletions.
56 changes: 41 additions & 15 deletions dabapush/Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
This module contains the Record dataclass, which is used to store the data and additional
"""

import dataclasses
# pylint: disable=R0917, R0913

from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
from uuid import uuid4
Expand All @@ -14,7 +15,6 @@
RecordState = Literal["done", "error", "start", "rejected"]


@dataclasses.dataclass
class Record:
"""This dataclass represents a single record in a data set.
It is used to store the data and additional information about the record and helps to keep
Expand Down Expand Up @@ -54,15 +54,39 @@ class Record:
"""

payload: Optional[Any] = None
source: Optional[Self] = None
uuid: Optional[str] = uuid4().hex
processed_at: datetime = datetime.now()
children: List[Self] = dataclasses.field(default_factory=list)
event_handlers: Dict[EventType, List[EventHandler]] = dataclasses.field(
default_factory=dict
)
state: RecordState = "start"
def __init__(
self,
payload: Optional[Any] = None,
source: Optional[Self] = None,
uuid: Optional[str] = None,
processed_at: Optional[datetime] = None,
children: Optional[List[Self]] = None,
event_handlers: Dict[str, List[EventHandler]] = None,
):
self._payload_: Optional[Any] = payload
self.source: Optional[Self] = source
self.uuid: Optional[str] = uuid or uuid4().hex
self.processed_at: datetime = processed_at or datetime.now()
self.children: List[Self] = children or []
self.event_handlers: Dict[str, List[EventHandler]] = event_handlers or {}
self._state_: RecordState = "start"

@property
def payload(self):
"""Get the payload of the record. This will return the payload and delete it from the
record.
"""
log.trace(f"""Getting payload from record {self.uuid}.""")

payload = self._payload_
self._payload_ = None
return payload

@property
def state(self):
"""Get the state of the record."""
return self._state_

def split(
self,
Expand Down Expand Up @@ -108,9 +132,11 @@ def split(
return self._handle_key_split_(id_key, key)

def _handle_key_split_(self, id_key, key):
if key not in self.payload:
payload = self.payload # Get the payload, the original payload
# will be set to None to free memory.
if key not in payload:
return []
if not isinstance(self.payload[key], list):
if not isinstance(payload[key], list):
return []
split_payload = [
Record(
Expand All @@ -120,7 +146,7 @@ def _handle_key_split_(self, id_key, key):
"source": self,
}
)
for value in self.payload[key]
for value in payload[key]
]
self.children.extend(split_payload)
return split_payload
Expand Down Expand Up @@ -159,7 +185,7 @@ def walk_tree(self, only_leafs=True) -> List[Self]:
def done(self):
"""Call the on_done event handler."""
# Signal parent that this record is done
self.state = "done"
self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
self.source.signal_done()
Expand Down

0 comments on commit c4d3051

Please sign in to comment.