Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make schema selectable from cli #12

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions marctable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,51 @@ def rule_params(f: Callable) -> Callable:
return f


def avram_params(f: Callable) -> Callable:
"""
Decorator for selecting an avram schema.
"""
f = click.option(
"--schema",
"-s",
"avram_file",
type=click.File("rb"),
help="Specify avram schema file",
)(f)
return f


@cli.command()
@io_params
@rule_params
def csv(infile: BinaryIO, outfile: TextIO, rules: list, batch: int) -> None:
@avram_params
def csv(infile: BinaryIO, outfile: TextIO, rules: list, batch: int, avram_file: BinaryIO) -> None:
"""
Convert MARC to CSV.
"""
to_csv(infile, outfile, rules=rules, batch=batch)
to_csv(infile, outfile, rules=rules, batch=batch, avram_file=avram_file)


@cli.command()
@io_params
@rule_params
def parquet(infile: BinaryIO, outfile: IOBase, rules: list, batch: int) -> None:
@avram_params
def parquet(infile: BinaryIO, outfile: IOBase, rules: list, batch: int, avram_file: BinaryIO) -> None:
"""
Convert MARC to Parquet.
"""
to_parquet(infile, outfile, rules=rules, batch=batch)
to_parquet(infile, outfile, rules=rules, batch=batch, avram_file=avram_file)


@cli.command()
@io_params
@rule_params
def jsonl(infile: BinaryIO, outfile: BinaryIO, rules: list, batch: int) -> None:
@avram_params
def jsonl(infile: BinaryIO, outfile: BinaryIO, rules: list, batch: int, avram_file: BinaryIO) -> None:
"""
Convert MARC to JSON Lines (JSONL)
"""
to_jsonl(infile, outfile, rules=rules, batch=batch)
to_jsonl(infile, outfile, rules=rules, batch=batch, avram_file=avram_file)


@cli.command()
Expand Down
41 changes: 22 additions & 19 deletions marctable/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from io import IOBase
from typing import BinaryIO, Dict, Generator, List, TextIO, Tuple, Union
from typing import BinaryIO, Dict, Generator, List, Optional, TextIO, Tuple, Union

import pyarrow
import pymarc
Expand All @@ -24,12 +24,13 @@ def to_csv(
csv_output: TextIO,
rules: list = [],
batch: int = 1000,
avram_file: Optional[BinaryIO] = None,
) -> None:
"""
Convert MARC to CSV.
"""
first_batch = True
for df in dataframe_iter(marc_input, rules=rules, batch=batch):
for df in dataframe_iter(marc_input, rules=rules, batch=batch, avram_file=avram_file):
df.to_csv(csv_output, header=first_batch, index=False)
first_batch = False

Expand All @@ -39,11 +40,12 @@ def to_jsonl(
jsonl_output: BinaryIO,
rules: list = [],
batch: int = 1000,
avram_file: Optional[BinaryIO] = None,
) -> None:
"""
Convert MARC to JSON Lines (JSONL).
"""
for records in records_iter(marc_input, rules=rules, batch=batch):
for records in records_iter(marc_input, rules=rules, batch=batch, avram_file=avram_file):
for record in records:
jsonl_output.write(json.dumps(record).encode("utf8") + b"\n")

Expand All @@ -53,36 +55,37 @@ def to_parquet(
parquet_output: IOBase,
rules: list = [],
batch: int = 1000,
avram_file: Optional[BinaryIO] = None,
) -> None:
"""
Convert MARC to Parquet.
"""
schema = _make_parquet_schema(rules)
schema = _make_parquet_schema(rules, avram_file)
writer = ParquetWriter(parquet_output, schema, compression="SNAPPY")
for records_batch in records_iter(marc_input, rules=rules, batch=batch):
for records_batch in records_iter(marc_input, rules=rules, batch=batch, avram_file=avram_file):
table = pyarrow.Table.from_pylist(records_batch, schema)
writer.write_table(table)

writer.close()


def dataframe_iter(
marc_input: BinaryIO, rules: list = [], batch: int = 1000
marc_input: BinaryIO, rules: list = [], batch: int = 1000, avram_file: Optional[BinaryIO] = None
) -> Generator[DataFrame, None, None]:
columns = _columns(_mapping(rules))
for records_batch in records_iter(marc_input, rules, batch):
columns = _columns(_mapping(rules, avram_file))
for records_batch in records_iter(marc_input, rules, batch, avram_file=avram_file):
yield DataFrame.from_records(records_batch, columns=columns)


def records_iter(
marc_input: BinaryIO, rules: list = [], batch: int = 1000
marc_input: BinaryIO, rules: list = [], batch: int = 1000, avram_file: Optional[BinaryIO] = None,
) -> Generator[List[Dict], None, None]:
"""
Read MARC input and generate a list of dictionaries, where each list element
represents a MARC record.
"""
mapping = _mapping(rules)
marc = MARC.from_avram()
mapping = _mapping(rules, avram_file)
marc = MARC.from_avram(avram_file)

# TODO: MARCXML parsing brings all the records into memory
if marc_input.name.endswith(".xml"):
Expand Down Expand Up @@ -155,14 +158,14 @@ def _stringify_field(field: pymarc.Field) -> str:
return " ".join([sf.value for sf in field.subfields])


def _mapping(rules: list) -> dict:
def _mapping(rules: list, avram_file: Optional[BinaryIO] = None) -> dict:
"""
unpack the mapping rules into a dictionary for easy lookup

>>> _mapping(["245", "260ac"])
{'245': None, '260': ['a', 'c']}
"""
marc = MARC.from_avram()
marc = MARC.from_avram(avram_file)
if rules is None or len(rules) == 0:
rules = [field.tag for field in marc.fields]

Expand Down Expand Up @@ -196,9 +199,9 @@ def _columns(mapping: dict) -> list:
return cols


def _make_pandas_schema(rules: list) -> Dict[str, str]:
marc = MARC.from_avram()
mapping = _mapping(rules)
def _make_pandas_schema(rules: list, avram_file: Optional[BinaryIO] = None) -> Dict[str, str]:
marc = MARC.from_avram(avram_file)
mapping = _mapping(rules, avram_file)
schema = {}
for field_tag, subfields in mapping.items():
if subfields is None:
Expand All @@ -213,9 +216,9 @@ def _make_pandas_schema(rules: list) -> Dict[str, str]:
return schema


def _make_parquet_schema(rules: list) -> pyarrow.Schema:
marc = MARC.from_avram()
mapping = _mapping(rules)
def _make_parquet_schema(rules: list, avram_file: Optional[BinaryIO] = None) -> pyarrow.Schema:
marc = MARC.from_avram(avram_file)
mapping = _mapping(rules, avram_file)

pyarrow_str = pyarrow.string()
pyarrow_list_of_str = pyarrow.list_(pyarrow.string())
Expand Down