Skip to content

Commit

Permalink
Make'm tests pass.
Browse files Browse the repository at this point in the history
  • Loading branch information
pekasen committed Nov 25, 2024
1 parent 3130681 commit ed6cb0e
Show file tree
Hide file tree
Showing 16 changed files with 163 additions and 108 deletions.
20 changes: 12 additions & 8 deletions dabapush/Configuration/PlugInConfiguration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
"""PlugInConfiguration module.
"""

import abc
from uuid import uuid4
from uuid import uuid4

import yaml

# pylint: disable=W0622


class PlugInConfiguration(yaml.YAMLObject):
""" """
"""Abstract Base class for all PlugInConfigurations."""

yaml_tag = "!dabapush:PluginConfiguration"

Expand All @@ -14,9 +21,6 @@ def __init__(self, name: str, id: str or None) -> None:
self.id = id if id is not None else str(uuid4())

@classmethod
@abc.abstractclassmethod
def get_instance(self) -> object or None:
"""Get a configured instance of the appropriate reader or writer plugin.
"""

@abc.abstractmethod
def get_instance(cls) -> object or None:
"""Get a configured instance of the appropriate reader or writer plugin."""
9 changes: 7 additions & 2 deletions dabapush/Configuration/ReaderConfiguration.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""Reader Interface.
"""

from abc import ABC
import abc

from .PlugInConfiguration import PlugInConfiguration

# pylint: disable=W0622


class ReaderConfiguration(PlugInConfiguration, ABC):
class ReaderConfiguration(PlugInConfiguration):
"""Abstract Base class for all ReaderConfigurations."""

yaml_tag = "!dabapush:ReaderConfiguration"
Expand All @@ -17,3 +17,8 @@ def __init__(self, name, id, read_path: str or None, pattern: str or None) -> No
super().__init__(name, id=id)
self.read_path = read_path if read_path is not None else "."
self.pattern = pattern if pattern is not None else "*.json"

@classmethod
@abc.abstractmethod
def get_instance(cls) -> object or None:
"""Get a configured instance of the appropriate reader or writer plugin."""
5 changes: 3 additions & 2 deletions dabapush/Configuration/Registry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""fetching plug-ins from entrypoints and helper methods"""

# pylint: disable=W0622
from importlib.metadata import entry_points
from typing import Any, List, Optional
Expand All @@ -10,8 +11,8 @@
class Registry:
"""receive plug-ins from entry point"""

readers = entry_points()["dabapush_readers"]
writers = entry_points()["dabapush_writers"]
readers = entry_points(group="dabapush_readers")
writers = entry_points(group="dabapush_writers")

# --- static methods --- #

Expand Down
2 changes: 1 addition & 1 deletion dabapush/Dabapush.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(
self,
working_dir: Path = Path(),
):
self.working_dir = working_dir
self.working_dir = working_dir.resolve()
self.config = None
self.global_config = Registry()
if not self.project_read():
Expand Down
11 changes: 5 additions & 6 deletions dabapush/Reader/NDJSONReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
from ..Configuration.ReaderConfiguration import ReaderConfiguration
from ..Record import Record
from ..utils import flatten
from .Reader import Reader
from .Reader import FileReader


def read_and_split(
record: Record,
flatten_records: bool = False,
) -> List[Record]:
"""Reads a file and splits it into records by line."""
with record.source.open("rt", encoding="utf8") as file:
with record.payload.open("rt", encoding="utf8") as file:
return [
Record(
uuid=f"{str(record.source)}:{str(line_number)}",
uuid=f"{str(record.uuid)}:{str(line_number)}",
payload=(
ujson.loads(line)
if not flatten_records
Expand All @@ -31,7 +31,7 @@ def read_and_split(
]


class NDJSONReader(Reader):
class NDJSONReader(FileReader):
"""Reader to read ready to read NDJSON data.
It matches files in the path-tree against the pattern and reads all
files and all lines in these files as JSON.
Expand All @@ -48,8 +48,7 @@ def __init__(self, config: "NDJSONReaderConfiguration") -> None:

def read(self) -> Iterator[Record]:
"""reads multiple NDJSON files and emits them line by line"""
for file_path in self.files:
file_record = Record(uuid=str(file_path), source=file_path)
for file_record in self.records:
yield from file_record.split(
func=read_and_split, flatten_records=self.config.flatten_dicts
)
Expand Down
70 changes: 48 additions & 22 deletions dabapush/Reader/Reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module contains the abstract base class for all reader plugins."""

import abc
from itertools import tee
from pathlib import Path
from typing import Iterator

Expand All @@ -10,6 +11,8 @@
from ..Configuration.ReaderConfiguration import ReaderConfiguration
from ..Record import Record

# pylint: disable=I1101


class Reader(abc.ABC):
"""Abstract base class for all reader plugins.
Expand All @@ -30,11 +33,12 @@ def __init__(self, config: ReaderConfiguration):
be a subclass of ReaderConfiguration.
"""
self.config = config
self.back_log = []
# initialize file log
if not Path(".dabapush/").exists():
Path(".dabapush/").mkdir()

self.log_path = Path(".dabapush/log.jsonl")
self.log_path = Path(f".dabapush/{config.name}.jsonl")

@abc.abstractmethod
def read(self) -> Iterator[Record]:
Expand All @@ -46,31 +50,53 @@ def read(self) -> Iterator[Record]:
type: Iterator[Record]
Generator which _should_ be one item per element.
"""
return

@abc.abstractmethod
@property
def files(self) -> Iterator[Path]:
def records(self) -> Iterator[Record]:
"""Subclasses **must** implement this abstract method and implement
their reading logic here.
Returns
-------
type: Iterator[Record]
Generator which _should_ be one item per element.
"""


class FileReader(Reader):
"""Reader to read files from a path.
It matches files in the path-tree against the pattern.
"""

@abc.abstractmethod
def read(self) -> Iterator[Record]:
"""Reads all files matching the pattern in the read_path."""

@property
def records(self) -> Iterator[Record]:
"""Generator for all files matching the pattern in the read_path."""
fresh = Path(self.config.read_path).rglob(self.config.pattern)
old_stock_dir = Path("./.dabapush")
old_stock = []

if old_stock_dir.exists() and (old_stock_dir / "log.jsonl").exists():
with (old_stock_dir / "log.jsonl").open("r") as ff:
old_stock = [
ujson.loads(_) for _ in ff.readlines() # pylint: disable=I1101
] # pylint: disable=I1101

return (
self._log(a)
for a in (_ for _ in fresh if str(_) not in [f["file"] for f in old_stock])
if self.log_path.exists():
with self.log_path.open("rt", encoding="utf8") as f:
self.back_log = (ujson.loads(_) for _ in f.readlines())
else:
self.log_path.touch()

fresh = (
Record(uuid=str(a), payload=a)
for a in Path(self.config.read_path).rglob(self.config.pattern)
)
fresh, back = tee(fresh)
yield from fresh

for record in back:
for sub_record in record.walk_tree(only_leafs=True):
self._log_(sub_record)

def _log(self, file: Path) -> Path:
def _log_(self, record: Record) -> Record:
with self.log_path.open("a", encoding="utf8") as f:
ujson.dump( # pylint: disable=I1101
{"file": str(file), "status": "read"}, f
)
ujson.dump(record.to_log(), f)
f.write("\n")
log.debug(f"Done with {str(file)}")
return file
log.debug(f"Done with {record.uuid}")
return record
51 changes: 27 additions & 24 deletions dabapush/Reader/TwacapicReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from ..Configuration.ReaderConfiguration import ReaderConfiguration
from ..Record import Record
from ..utils import flatten, safe_access, safe_write, unpack
from .Reader import Reader
from .Reader import FileReader


class TwacapicReader(Reader):
class TwacapicReader(FileReader):
"""Reader to read ready to read Twitter json data.
It matches files in the path-tree against the pattern and reads all files as JSON.
Expand All @@ -31,6 +31,7 @@ def __init__(self, config: "TwacapicReaderConfiguration"):
Configuration with all the values TwacapicReader needs for it's thang.
"""
super().__init__(config)
self.config = config

@staticmethod
def unpack_tweet( # pylint: disable=W0102
Expand Down Expand Up @@ -68,16 +69,18 @@ def unpack_tweet( # pylint: disable=W0102
},
}

def handle_item(job_item, job):
tweet_id_field: Optional[str] = job["tweet_id_field"]
id = job_item[tweet_id_field] if tweet_id_field is not None else job_item
def handle_item(_job_item, _job):
tweet_id_field: Optional[str] = _job["tweet_id_field"]
id = _job_item[tweet_id_field] if tweet_id_field is not None else _job_item
if id is None:
raise f"id cannot be None in {job} and {job_item}"
includes_key = safe_access(job, ["includes_field"])
raise ValueError(f"ID cannot be None in {_job} and {_job_item}")
includes_key = safe_access(_job, ["includes_field"])
if includes_key not in includes:
log.warning(f"key not present in additional information dict in: {job}")
log.warning(
f"key not present in additional information dict in: {_job}"
)
return
return unpack(id, includes[includes_key], job["includes_id_field"])
return unpack(id, includes[includes_key], _job["includes_id_field"])

for key in keys:
if key in possible_keys:
Expand Down Expand Up @@ -116,27 +119,27 @@ def read(self) -> Iterator[Record]:
type: Iterator[Record]
"""

config: TwacapicReaderConfiguration = self.config

for file_path in self.files:
with file_path.open() as file:
if config.lines is True:
for record in self.records:
with record.payload.open() as file:
if self.config.lines is True:
results = (loads(line) for line in file)
else:
results = [load(file)]
for res in results:
data: List[Dict] = safe_access(res, ["data"])
includes: Optional[Dict] = safe_access(res, ["includes"])
if data is not None:
if config.emit_references:
# If we emit references we need to join the data
data.extend(includes.get("tweets", []))
for post in data:
post = TwacapicReader.unpack_tweet(post, includes)
if config.flatten is True:
post = flatten(post)

yield Record(payload=post, source=file_path)
if data is None:
log.warning(f"No data in {res}")
continue
if self.config.emit_references:
# If we emit references we need to join the data
data.extend(includes.get("tweets", []))
for post in data:
post = TwacapicReader.unpack_tweet(post, includes)
if self.config.flatten is True:
post = flatten(post)

yield Record(payload=post, source=record)


class TwacapicReaderConfiguration(ReaderConfiguration):
Expand Down
Loading

0 comments on commit ed6cb0e

Please sign in to comment.