Skip to content

Commit

Permalink
fix: no weakrefs so that Records survive long enough to be persisted.
Browse files Browse the repository at this point in the history
  • Loading branch information
pekasen committed Dec 2, 2024
1 parent b5ca4f1 commit d5504f3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 22 deletions.
4 changes: 1 addition & 3 deletions dabapush/Reader/NDJSONReader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""NDJSON Writer plug-in for dabapush"""

import weakref

# pylint: disable=R,I1101
from typing import Iterator, List

Expand All @@ -27,7 +25,7 @@ def read_and_split(
if not flatten_records
else flatten(ujson.loads(line))
),
source=weakref.ref(record),
source=record,
)
for line_number, line in enumerate(file)
]
Expand Down
25 changes: 8 additions & 17 deletions dabapush/Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import dataclasses
import weakref
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
from uuid import uuid4
Expand Down Expand Up @@ -56,7 +55,7 @@ class Record:
"""

payload: Optional[Any] = None
source: Optional[weakref.ReferenceType] = None
source: Optional[Self] = None
uuid: Optional[str] = uuid4().hex
processed_at: datetime = datetime.now()
children: List[Self] = dataclasses.field(default_factory=list)
Expand Down Expand Up @@ -118,7 +117,7 @@ def _handle_key_split_(self, id_key, key):
**{
"payload": value,
"uuid": value.get(id_key) if id_key else uuid4().hex,
"source": weakref.ref(self),
"source": self,
}
)
for value in self.payload[key]
Expand All @@ -129,19 +128,15 @@ def _handle_key_split_(self, id_key, key):
def to_log(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
"""Return a loggable representation of the record."""
log.debug(f"Logging record {self.uuid}.")
if self.source:
source = self.source()
if not source:
log.critical(f"Source of record {self.uuid} is not available")
raise ValueError(f"Source of record {self.uuid} is not available")
else:
source = None

return {
"uuid": str(self.uuid),
"processed_at": self.processed_at.isoformat(),
# We cannot allow the source to be a Record, as it would create a circular reference
# while serializing the dataclass to JSON.
"source": (source if not isinstance(source, Record) else source.uuid),
"source": (
self.source if not isinstance(self.source, Record) else self.source.uuid
),
"children": [child.to_log() for child in self.children],
}

Expand All @@ -167,12 +162,8 @@ def done(self):
self.state = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
parent: Record = self.source()
if not parent:
log.critical(f"Source of record {self.uuid} is not available")
raise ValueError(f"Source of record {self.uuid} is not available")
parent.signal_done()
log.debug(f"Signaled parent {parent.uuid} of record {self.uuid}.")
self.source.signal_done()
log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
self.__dispatch_event__("on_done")

def signal_done(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_splitting_record():
assert _record_.uuid
assert _record_.processed_at
assert _record_.payload == {"key": "value"}
assert _record_.source() == record
assert _record_.source == record
assert _record_ in record.children


Expand All @@ -62,7 +62,7 @@ def test_splitting_record_with_children_ids():
assert _record_.uuid == n
assert _record_.processed_at
assert _record_.payload == {"key": "value", "id": n}
assert _record_.source() is record
assert _record_.source is record
assert _record_ in record.children


Expand Down

0 comments on commit d5504f3

Please sign in to comment.