From 729c36b06389664fcc5f2382abf6ade7d00bed77 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Thu, 5 Dec 2024 10:47:20 +0100
Subject: [PATCH] feat: Records discard their payload once it was accessed.
---
dabapush/Record.py | 56 +++++++++++++++++++++++++++++++++-------------
1 file changed, 41 insertions(+), 15 deletions(-)
diff --git a/dabapush/Record.py b/dabapush/Record.py
index e20daa8..d668205 100644
--- a/dabapush/Record.py
+++ b/dabapush/Record.py
@@ -2,7 +2,8 @@
This module contains the Record dataclass, which is used to store the data and additional
"""
-import dataclasses
+# pylint: disable=R0917, R0913
+
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
from uuid import uuid4
@@ -14,7 +15,6 @@
RecordState = Literal["done", "error", "start", "rejected"]
-@dataclasses.dataclass
class Record:
"""This dataclass represents a single record in a data set.
It is used to store the data and additional information about the record and helps to keep
@@ -54,15 +54,39 @@ class Record:
"""
- payload: Optional[Any] = None
- source: Optional[Self] = None
- uuid: Optional[str] = uuid4().hex
- processed_at: datetime = datetime.now()
- children: List[Self] = dataclasses.field(default_factory=list)
- event_handlers: Dict[EventType, List[EventHandler]] = dataclasses.field(
- default_factory=dict
- )
- state: RecordState = "start"
+ def __init__(
+ self,
+ payload: Optional[Any] = None,
+ source: Optional[Self] = None,
+ uuid: Optional[str] = None,
+ processed_at: Optional[datetime] = None,
+ children: Optional[List[Self]] = None,
+ event_handlers: Dict[str, List[EventHandler]] = None,
+ ):
+ self._payload_: Optional[Any] = payload
+ self.source: Optional[Self] = source
+ self.uuid: Optional[str] = uuid or uuid4().hex
+ self.processed_at: datetime = processed_at or datetime.now()
+ self.children: List[Self] = children or []
+ self.event_handlers: Dict[str, List[EventHandler]] = event_handlers or {}
+ self._state_: RecordState = "start"
+
+ @property
+ def payload(self):
+ """Get the payload of the record. This will return the payload and delete it from the
+ record.
+
+ """
+ log.trace(f"""Getting payload from record {self.uuid}.""")
+
+ payload = self._payload_
+ self._payload_ = None
+ return payload
+
+ @property
+ def state(self):
+ """Get the state of the record."""
+ return self._state_
def split(
self,
@@ -108,9 +132,11 @@ def split(
return self._handle_key_split_(id_key, key)
def _handle_key_split_(self, id_key, key):
- if key not in self.payload:
+ payload = self.payload # Get the payload, the original payload
+ # will be set to None to free memory.
+ if key not in payload:
return []
- if not isinstance(self.payload[key], list):
+ if not isinstance(payload[key], list):
return []
split_payload = [
Record(
@@ -120,7 +146,7 @@ def _handle_key_split_(self, id_key, key):
"source": self,
}
)
- for value in self.payload[key]
+ for value in payload[key]
]
self.children.extend(split_payload)
return split_payload
@@ -159,7 +185,7 @@ def walk_tree(self, only_leafs=True) -> List[Self]:
def done(self):
"""Call the on_done event handler."""
# Signal parent that this record is done
- self.state = "done"
+ self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
self.source.signal_done()