diff --git a/.gitignore b/.gitignore index feb37a13..6b65f4fc 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ docs/ # personal notes *scratch*.py +build_nim.sh # local confidential data tests/ndata/* \ No newline at end of file diff --git a/tablite/_nimlite/table.nim b/tablite/_nimlite/table.nim index 20b3627e..f5763a56 100644 --- a/tablite/_nimlite/table.nim +++ b/tablite/_nimlite/table.nim @@ -27,6 +27,7 @@ type TabliteTasks* = object dialect*: TabliteDialect tasks*: seq[TabliteTask] import_fields*: seq[uint] + import_field_names*: seq[string] page_size*: uint guess_dtypes*: bool @@ -62,13 +63,14 @@ proc newTabliteTask*(pages: seq[string], offset: uint, count: uint): TabliteTask proc newTabliteTasks*( path: string, encoding: string, dialect: Dialect, - tasks: seq[TabliteTask], import_fields: seq[uint], page_size: uint, guess_dtypes: bool): TabliteTasks = + tasks: seq[TabliteTask], import_fields: seq[uint], import_field_names: seq[string], page_size: uint, guess_dtypes: bool): TabliteTasks = TabliteTasks( path: path, encoding: encoding, dialect: dialect.newTabliteDialect, tasks: tasks, import_fields: import_fields, + import_field_names: import_field_names, page_size: page_size, guess_dtypes: guess_dtypes ) diff --git a/tablite/_nimlite/textreader.nim b/tablite/_nimlite/textreader.nim index 68086620..058a1674 100644 --- a/tablite/_nimlite/textreader.nim +++ b/tablite/_nimlite/textreader.nim @@ -116,7 +116,9 @@ proc importTextFile*( for ix, name in enumerate(fields): if name in imp_columns: {uint ix: name} + let import_fields = collect: (for k in field_relation.keys: k) + let import_field_names = collect: (for v in field_relation.values: v) var field_relation_inv = collect(initOrderedTable()): for (ix, name) in field_relation.pairs: @@ -174,6 +176,7 @@ proc importTextFile*( dialect = dia, tasks = task_list, import_fields = import_fields, + import_field_names = import_field_names, page_size = page_size, guess_dtypes = guess_dtypes ) diff --git a/tablite/nimlite.py b/tablite/nimlite.py index 1d68c410..11f4f4c4 100644 --- a/tablite/nimlite.py +++ b/tablite/nimlite.py @@ -1,22 +1,14 @@ import sys -import json import psutil import platform -import subprocess as sp from pathlib import Path from tqdm import tqdm as _tqdm from tablite.config import Config from mplite import Task, TaskManager from tablite.utils import load_numpy -from tablite.utils import generate_random_string from tablite.base import SimplePage, Column, pytype_from_iterable -IS_WINDOWS = platform.system() == "Windows" -USE_CLI_BACKEND = IS_WINDOWS - -CLI_BACKEND_PATH = Path(__file__).parent / f"_nimlite/nimlite{'.exe' if IS_WINDOWS else ''}" - -if not USE_CLI_BACKEND: +if True: paths = sys.argv[:] if Config.USE_NIMPORTER: import nimporter @@ -37,52 +29,34 @@ def __init__(self, id, path, data) -> None: def text_reader_task(*, pid, path, encoding, dialect, task, import_fields, guess_dtypes): - if not USE_CLI_BACKEND: - nl.text_reader_task( - path=path, - encoding=encoding, - dia_delimiter=dialect["delimiter"], - dia_quotechar=dialect["quotechar"], - dia_escapechar=dialect["escapechar"], - dia_doublequote=dialect["doublequote"], - dia_quoting=dialect["quoting"], - dia_skipinitialspace=dialect["skipinitialspace"], - dia_skiptrailingspace=dialect["skiptrailingspace"], - dia_lineterminator=dialect["lineterminator"], - dia_strict=dialect["strict"], - tsk_pages=task["pages"], - tsk_offset=task["offset"], - tsk_count=task["count"], - import_fields=import_fields, - guess_dtypes=guess_dtypes - ) - else: - args = [ - str(CLI_BACKEND_PATH), - f"--encoding={encoding}", - f"--guess_dtypes={'true' if guess_dtypes else 'false'}", - f"--delimiter=\"{dialect['delimiter']}\"", - f"--quotechar=\"{dialect['quotechar']}\"", - f"--lineterminator=\"{dialect['lineterminator']}\"", - f"--skipinitialspace={'true' if dialect['skipinitialspace'] else 'false'}", - f"--skiptrailingspace={'true' if dialect['skiptrailingspace'] else 'false'}", - f"--quoting={dialect['quoting']}", - f"task", - f"{wrap(str(path))}", - f"{task['offset']}", - f"{task['count']}", - f"--pages={','.join(task['pages'])}", - f"--fields={','.join((str(v) for v in import_fields))}" - ] - - sp.run(" ".join(args), shell=True, check=True) + nl.text_reader_task( + path=path, + encoding=encoding, + dia_delimiter=dialect["delimiter"], + dia_quotechar=dialect["quotechar"], + dia_escapechar=dialect["escapechar"], + dia_doublequote=dialect["doublequote"], + dia_quoting=dialect["quoting"], + dia_skipinitialspace=dialect["skipinitialspace"], + dia_skiptrailingspace=dialect["skiptrailingspace"], + dia_lineterminator=dialect["lineterminator"], + dia_strict=dialect["strict"], + tsk_pages=task["pages"], + tsk_offset=task["offset"], + tsk_count=task["count"], + import_fields=import_fields, + guess_dtypes=guess_dtypes + ) pages = [] for p in (Path(p) for p in task["pages"]): - id = int(p.name.replace(p.suffix, "")) - arr = load_numpy(p) - page = NimPage(id, pid, arr) - pages.append(page) + try: + id = int(p.name.replace(p.suffix, "")) + arr = load_numpy(p) + page = NimPage(id, pid, arr) + pages.append(page) + except: + raise return pages @@ -103,65 +77,19 @@ def text_reader( assert isinstance(path, Path) assert isinstance(pid, Path) - if not USE_CLI_BACKEND: - table = nl.text_reader( - pid=str(pid), - path=str(path), - encoding=encoding, - first_row_has_headers=first_row_has_headers, header_row_index=header_row_index, - columns=columns, - start=start, limit=limit, - guess_datatypes=guess_datatypes, - newline=newline, delimiter=delimiter, text_qualifier=text_qualifier, - quoting=quoting, - strip_leading_and_tailing_whitespace=strip_leading_and_tailing_whitespace, - page_size=Config.PAGE_SIZE - ) - else: - taskname = generate_random_string(5) - - while (pid / "pages" / taskname / ".json").exists(): - taskname = generate_random_string(5) - - args = [ - str(CLI_BACKEND_PATH), - f"--encoding={encoding}", - f"--guess_dtypes={'true' if guess_datatypes else 'false'}", - f"--delimiter={wrap(delimiter)}", - f"--quotechar={wrap(text_qualifier)}", - f"--lineterminator={wrap(newline)}", - f"--skipinitialspace={'true' if strip_leading_and_tailing_whitespace else 'false'}", - f"--skiptrailingspace={'true' if strip_leading_and_tailing_whitespace else 'false'}", - f"--quoting={quoting}", - f"import", - f"{wrap(str(path))}", - f"--pid={wrap(str(pid))}", - f"--first_row_has_headers={'true' if first_row_has_headers else 'false'}", - f"--header_row_index={header_row_index}", - f"--page_size={Config.PAGE_SIZE}", - f"--execute=false", - f"--use_json=true", - f"--name={taskname}" - ] - - if start is not None: - args.append(f"--start={start}") - - if limit is not None: - args.append(f"--limit={limit}") - - if columns is not None: - assert isinstance(columns, list) - assert all(isinstance(v, str) for v in columns) - - args.append(f"--columns='[{','.join([c for c in columns])}]'") - - sp.run(" ".join(args), shell=True, check=True) - - table_path = pid / "pages" / (taskname + ".json") - - with open(table_path) as f: - table = json.loads(f.read()) + table = nl.text_reader( + pid=str(pid), + path=str(path), + encoding=encoding, + first_row_has_headers=first_row_has_headers, header_row_index=header_row_index, + columns=columns, + start=start, limit=limit, + guess_datatypes=guess_datatypes, + newline=newline, delimiter=delimiter, text_qualifier=text_qualifier, + quoting=quoting, + strip_leading_and_tailing_whitespace=strip_leading_and_tailing_whitespace, + page_size=Config.PAGE_SIZE + ) task_info = table["task"] task_columns = table["columns"] @@ -172,6 +100,7 @@ def text_reader( ti_guess_dtypes = task_info["guess_dtypes"] ti_tasks = task_info["tasks"] ti_import_fields = task_info["import_fields"] + ti_import_field_names = task_info["import_field_names"] is_windows = platform.system() == "Windows" use_logical = False if is_windows else True @@ -200,34 +129,42 @@ def text_reader( elif Config.MULTIPROCESSING_MODE == Config.FORCE: is_sp = False - try: - if is_sp: - res = [ - task.f(*task.args, **task.kwargs) - for task in tqdm(tasks, "importing file") - ] - else: - with TaskManager(cpus) as tm: - res = tm.execute(tasks, tqdm) - - if not all(isinstance(r, list) for r in res): - raise Exception("failed") - finally: - if USE_CLI_BACKEND and table_path.exists(): - table_path.unlink() + if is_sp: + res = [ + task.f(*task.args, **task.kwargs) + for task in tqdm(tasks, "importing file") + ] + else: + with TaskManager(cpus) as tm: + res = tm.execute(tasks, tqdm) + + if not all(isinstance(r, list) for r in res): + raise Exception("failed") col_path = pid column_dict = { - cols["name"]: Column(col_path) - for cols in task_columns + cols: Column(col_path) + for cols in ti_import_field_names } - columns = list(column_dict.values()) for res_pages in res: - for c, p in zip(columns, res_pages): - c.pages.append(p) + col_map = { + n: res_pages[i] + for i, n in enumerate(ti_import_field_names) + } + + for k, c in column_dict.items(): + c.pages.append(col_map[k]) + + if columns is None: + columns = [c["name"] for c in task_columns] + + table_dict = { + a["name"]: column_dict[b] + for a, b in zip(task_columns, columns) + } - table = T(columns=column_dict) + table = T(columns=table_dict) return table diff --git a/tablite/version.py b/tablite/version.py index ad6ec90b..c15e1ea5 100644 --- a/tablite/version.py +++ b/tablite/version.py @@ -1,3 +1,3 @@ -major, minor, patch = 2023, 8, 2 +major, minor, patch = 2023, 8, 3 __version_info__ = (major, minor, patch) __version__ = ".".join(str(i) for i in __version_info__)