diff --git a/marctable/__init__.py b/marctable/__init__.py index 634dcdc..627dec6 100644 --- a/marctable/__init__.py +++ b/marctable/__init__.py @@ -37,34 +37,51 @@ def rule_params(f: Callable) -> Callable: return f +def avram_params(f: Callable) -> Callable: + """ + Decorator for selecting an avram schema. + """ + f = click.option( + "--schema", + "-s", + "avram_file", + type=click.File("rb"), + help="Specify avram schema file", + )(f) + return f + + @cli.command() @io_params @rule_params -def csv(infile: BinaryIO, outfile: TextIO, rules: list, batch: int) -> None: +@avram_params +def csv(infile: BinaryIO, outfile: TextIO, rules: list, batch: int, avram_file: BinaryIO) -> None: """ Convert MARC to CSV. """ - to_csv(infile, outfile, rules=rules, batch=batch) + to_csv(infile, outfile, rules=rules, batch=batch, avram_file=avram_file) @cli.command() @io_params @rule_params -def parquet(infile: BinaryIO, outfile: IOBase, rules: list, batch: int) -> None: +@avram_params +def parquet(infile: BinaryIO, outfile: IOBase, rules: list, batch: int, avram_file: BinaryIO) -> None: """ Convert MARC to Parquet. """ - to_parquet(infile, outfile, rules=rules, batch=batch) + to_parquet(infile, outfile, rules=rules, batch=batch, avram_file=avram_file) @cli.command() @io_params @rule_params -def jsonl(infile: BinaryIO, outfile: BinaryIO, rules: list, batch: int) -> None: +@avram_params +def jsonl(infile: BinaryIO, outfile: BinaryIO, rules: list, batch: int, avram_file: BinaryIO) -> None: """ Convert MARC to JSON Lines (JSONL) """ - to_jsonl(infile, outfile, rules=rules, batch=batch) + to_jsonl(infile, outfile, rules=rules, batch=batch, avram_file=avram_file) @cli.command() diff --git a/marctable/utils.py b/marctable/utils.py index a7eaa27..eb10aa6 100644 --- a/marctable/utils.py +++ b/marctable/utils.py @@ -1,6 +1,6 @@ import json from io import IOBase -from typing import BinaryIO, Dict, Generator, List, TextIO, Tuple, Union +from typing import BinaryIO, Dict, Generator, List, Optional, TextIO, Tuple, Union import pyarrow import pymarc @@ -24,12 +24,13 @@ def to_csv( csv_output: TextIO, rules: list = [], batch: int = 1000, + avram_file: Optional[BinaryIO] = None, ) -> None: """ Convert MARC to CSV. """ first_batch = True - for df in dataframe_iter(marc_input, rules=rules, batch=batch): + for df in dataframe_iter(marc_input, rules=rules, batch=batch, avram_file=avram_file): df.to_csv(csv_output, header=first_batch, index=False) first_batch = False @@ -39,11 +40,12 @@ def to_jsonl( jsonl_output: BinaryIO, rules: list = [], batch: int = 1000, + avram_file: Optional[BinaryIO] = None, ) -> None: """ Convert MARC to JSON Lines (JSONL). """ - for records in records_iter(marc_input, rules=rules, batch=batch): + for records in records_iter(marc_input, rules=rules, batch=batch, avram_file=avram_file): for record in records: jsonl_output.write(json.dumps(record).encode("utf8") + b"\n") @@ -53,13 +55,14 @@ def to_parquet( parquet_output: IOBase, rules: list = [], batch: int = 1000, + avram_file: Optional[BinaryIO] = None, ) -> None: """ Convert MARC to Parquet. """ - schema = _make_parquet_schema(rules) + schema = _make_parquet_schema(rules, avram_file) writer = ParquetWriter(parquet_output, schema, compression="SNAPPY") - for records_batch in records_iter(marc_input, rules=rules, batch=batch): + for records_batch in records_iter(marc_input, rules=rules, batch=batch, avram_file=avram_file): table = pyarrow.Table.from_pylist(records_batch, schema) writer.write_table(table) @@ -67,22 +70,22 @@ def to_parquet( def dataframe_iter( - marc_input: BinaryIO, rules: list = [], batch: int = 1000 + marc_input: BinaryIO, rules: list = [], batch: int = 1000, avram_file: Optional[BinaryIO] = None ) -> Generator[DataFrame, None, None]: - columns = _columns(_mapping(rules)) - for records_batch in records_iter(marc_input, rules, batch): + columns = _columns(_mapping(rules, avram_file)) + for records_batch in records_iter(marc_input, rules, batch, avram_file=avram_file): yield DataFrame.from_records(records_batch, columns=columns) def records_iter( - marc_input: BinaryIO, rules: list = [], batch: int = 1000 + marc_input: BinaryIO, rules: list = [], batch: int = 1000, avram_file: Optional[BinaryIO] = None, ) -> Generator[List[Dict], None, None]: """ Read MARC input and generate a list of dictionaries, where each list element represents a MARC record. """ - mapping = _mapping(rules) - marc = MARC.from_avram() + mapping = _mapping(rules, avram_file) + marc = MARC.from_avram(avram_file) # TODO: MARCXML parsing brings all the records into memory if marc_input.name.endswith(".xml"): @@ -155,14 +158,14 @@ def _stringify_field(field: pymarc.Field) -> str: return " ".join([sf.value for sf in field.subfields]) -def _mapping(rules: list) -> dict: +def _mapping(rules: list, avram_file: Optional[BinaryIO] = None) -> dict: """ unpack the mapping rules into a dictionary for easy lookup >>> _mapping(["245", "260ac"]) {'245': None, '260': ['a', 'c']} """ - marc = MARC.from_avram() + marc = MARC.from_avram(avram_file) if rules is None or len(rules) == 0: rules = [field.tag for field in marc.fields] @@ -196,9 +199,9 @@ def _columns(mapping: dict) -> list: return cols -def _make_pandas_schema(rules: list) -> Dict[str, str]: - marc = MARC.from_avram() - mapping = _mapping(rules) +def _make_pandas_schema(rules: list, avram_file: Optional[BinaryIO] = None) -> Dict[str, str]: + marc = MARC.from_avram(avram_file) + mapping = _mapping(rules, avram_file) schema = {} for field_tag, subfields in mapping.items(): if subfields is None: @@ -213,9 +216,9 @@ def _make_pandas_schema(rules: list) -> Dict[str, str]: return schema -def _make_parquet_schema(rules: list) -> pyarrow.Schema: - marc = MARC.from_avram() - mapping = _mapping(rules) +def _make_parquet_schema(rules: list, avram_file: Optional[BinaryIO] = None) -> pyarrow.Schema: + marc = MARC.from_avram(avram_file) + mapping = _mapping(rules, avram_file) pyarrow_str = pyarrow.string() pyarrow_list_of_str = pyarrow.list_(pyarrow.string())