Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Records discard their payload once it was accessed. #67

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions dabapush/Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
This module contains the Record dataclass, which is used to store the data and additional
"""

import dataclasses
# pylint: disable=R0917, R0913

from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
from uuid import uuid4
Expand All @@ -14,7 +15,6 @@
RecordState = Literal["done", "error", "start", "rejected"]


@dataclasses.dataclass
class Record:
"""This dataclass represents a single record in a data set.
It is used to store the data and additional information about the record and helps to keep
Expand Down Expand Up @@ -54,15 +54,39 @@ class Record:

"""

payload: Optional[Any] = None
source: Optional[Self] = None
uuid: Optional[str] = uuid4().hex
processed_at: datetime = datetime.now()
children: List[Self] = dataclasses.field(default_factory=list)
event_handlers: Dict[EventType, List[EventHandler]] = dataclasses.field(
default_factory=dict
)
state: RecordState = "start"
def __init__(
self,
payload: Optional[Any] = None,
source: Optional[Self] = None,
uuid: Optional[str] = None,
processed_at: Optional[datetime] = None,
children: Optional[List[Self]] = None,
event_handlers: Dict[str, List[EventHandler]] = None,
):
self._payload_: Optional[Any] = payload
self.source: Optional[Self] = source
self.uuid: Optional[str] = uuid or uuid4().hex
self.processed_at: datetime = processed_at or datetime.now()
self.children: List[Self] = children or []
self.event_handlers: Dict[str, List[EventHandler]] = event_handlers or {}
self._state_: RecordState = "start"

@property
def payload(self):
"""Get the payload of the record. This will return the payload and delete it from the
record.

"""
log.trace(f"""Getting payload from record {self.uuid}.""")

payload = self._payload_
self._payload_ = None
return payload

@property
def state(self):
"""Get the state of the record."""
return self._state_

def split(
self,
Expand Down Expand Up @@ -108,9 +132,11 @@ def split(
return self._handle_key_split_(id_key, key)

def _handle_key_split_(self, id_key, key):
if key not in self.payload:
payload = self.payload # Get the payload, the original payload
# will be set to None to free memory.
if key not in payload:
return []
if not isinstance(self.payload[key], list):
if not isinstance(payload[key], list):
return []
split_payload = [
Record(
Expand All @@ -120,7 +146,7 @@ def _handle_key_split_(self, id_key, key):
"source": self,
}
)
for value in self.payload[key]
for value in payload[key]
]
self.children.extend(split_payload)
return split_payload
Expand Down Expand Up @@ -159,7 +185,7 @@ def walk_tree(self, only_leafs=True) -> List[Self]:
def done(self):
"""Call the on_done event handler."""
# Signal parent that this record is done
self.state = "done"
self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
self.source.signal_done()
Expand Down
Loading