Skip to content

Commit

Permalink
Merge pull request #101 from realratchet/master
Browse files Browse the repository at this point in the history
Fix issue with repeating columns
  • Loading branch information
realratchet authored Oct 26, 2023
2 parents 59e18cf + b8fb472 commit 46b10cb
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 134 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ docs/

# personal notes
*scratch*.py
build_nim.sh

# local confidential data
tests/ndata/*
4 changes: 3 additions & 1 deletion tablite/_nimlite/table.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TabliteTasks* = object
dialect*: TabliteDialect
tasks*: seq[TabliteTask]
import_fields*: seq[uint]
import_field_names*: seq[string]
page_size*: uint
guess_dtypes*: bool

Expand Down Expand Up @@ -62,13 +63,14 @@ proc newTabliteTask*(pages: seq[string], offset: uint, count: uint): TabliteTask

proc newTabliteTasks*(
path: string, encoding: string, dialect: Dialect,
tasks: seq[TabliteTask], import_fields: seq[uint], page_size: uint, guess_dtypes: bool): TabliteTasks =
tasks: seq[TabliteTask], import_fields: seq[uint], import_field_names: seq[string], page_size: uint, guess_dtypes: bool): TabliteTasks =
TabliteTasks(
path: path,
encoding: encoding,
dialect: dialect.newTabliteDialect,
tasks: tasks,
import_fields: import_fields,
import_field_names: import_field_names,
page_size: page_size,
guess_dtypes: guess_dtypes
)
Expand Down
3 changes: 3 additions & 0 deletions tablite/_nimlite/textreader.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ proc importTextFile*(
for ix, name in enumerate(fields):
if name in imp_columns:
{uint ix: name}

let import_fields = collect: (for k in field_relation.keys: k)
let import_field_names = collect: (for v in field_relation.values: v)

var field_relation_inv = collect(initOrderedTable()):
for (ix, name) in field_relation.pairs:
Expand Down Expand Up @@ -174,6 +176,7 @@ proc importTextFile*(
dialect = dia,
tasks = task_list,
import_fields = import_fields,
import_field_names = import_field_names,
page_size = page_size,
guess_dtypes = guess_dtypes
)
Expand Down
201 changes: 69 additions & 132 deletions tablite/nimlite.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
import sys
import json
import psutil
import platform
import subprocess as sp
from pathlib import Path
from tqdm import tqdm as _tqdm
from tablite.config import Config
from mplite import Task, TaskManager
from tablite.utils import load_numpy
from tablite.utils import generate_random_string
from tablite.base import SimplePage, Column, pytype_from_iterable

IS_WINDOWS = platform.system() == "Windows"
USE_CLI_BACKEND = IS_WINDOWS

CLI_BACKEND_PATH = Path(__file__).parent / f"_nimlite/nimlite{'.exe' if IS_WINDOWS else ''}"

if not USE_CLI_BACKEND:
if True:
paths = sys.argv[:]
if Config.USE_NIMPORTER:
import nimporter
Expand All @@ -37,52 +29,34 @@ def __init__(self, id, path, data) -> None:


def text_reader_task(*, pid, path, encoding, dialect, task, import_fields, guess_dtypes):
if not USE_CLI_BACKEND:
nl.text_reader_task(
path=path,
encoding=encoding,
dia_delimiter=dialect["delimiter"],
dia_quotechar=dialect["quotechar"],
dia_escapechar=dialect["escapechar"],
dia_doublequote=dialect["doublequote"],
dia_quoting=dialect["quoting"],
dia_skipinitialspace=dialect["skipinitialspace"],
dia_skiptrailingspace=dialect["skiptrailingspace"],
dia_lineterminator=dialect["lineterminator"],
dia_strict=dialect["strict"],
tsk_pages=task["pages"],
tsk_offset=task["offset"],
tsk_count=task["count"],
import_fields=import_fields,
guess_dtypes=guess_dtypes
)
else:
args = [
str(CLI_BACKEND_PATH),
f"--encoding={encoding}",
f"--guess_dtypes={'true' if guess_dtypes else 'false'}",
f"--delimiter=\"{dialect['delimiter']}\"",
f"--quotechar=\"{dialect['quotechar']}\"",
f"--lineterminator=\"{dialect['lineterminator']}\"",
f"--skipinitialspace={'true' if dialect['skipinitialspace'] else 'false'}",
f"--skiptrailingspace={'true' if dialect['skiptrailingspace'] else 'false'}",
f"--quoting={dialect['quoting']}",
f"task",
f"{wrap(str(path))}",
f"{task['offset']}",
f"{task['count']}",
f"--pages={','.join(task['pages'])}",
f"--fields={','.join((str(v) for v in import_fields))}"
]

sp.run(" ".join(args), shell=True, check=True)
nl.text_reader_task(
path=path,
encoding=encoding,
dia_delimiter=dialect["delimiter"],
dia_quotechar=dialect["quotechar"],
dia_escapechar=dialect["escapechar"],
dia_doublequote=dialect["doublequote"],
dia_quoting=dialect["quoting"],
dia_skipinitialspace=dialect["skipinitialspace"],
dia_skiptrailingspace=dialect["skiptrailingspace"],
dia_lineterminator=dialect["lineterminator"],
dia_strict=dialect["strict"],
tsk_pages=task["pages"],
tsk_offset=task["offset"],
tsk_count=task["count"],
import_fields=import_fields,
guess_dtypes=guess_dtypes
)

pages = []
for p in (Path(p) for p in task["pages"]):
id = int(p.name.replace(p.suffix, ""))
arr = load_numpy(p)
page = NimPage(id, pid, arr)
pages.append(page)
try:
id = int(p.name.replace(p.suffix, ""))
arr = load_numpy(p)
page = NimPage(id, pid, arr)
pages.append(page)
except:
raise

return pages

Expand All @@ -103,65 +77,19 @@ def text_reader(
assert isinstance(path, Path)
assert isinstance(pid, Path)

if not USE_CLI_BACKEND:
table = nl.text_reader(
pid=str(pid),
path=str(path),
encoding=encoding,
first_row_has_headers=first_row_has_headers, header_row_index=header_row_index,
columns=columns,
start=start, limit=limit,
guess_datatypes=guess_datatypes,
newline=newline, delimiter=delimiter, text_qualifier=text_qualifier,
quoting=quoting,
strip_leading_and_tailing_whitespace=strip_leading_and_tailing_whitespace,
page_size=Config.PAGE_SIZE
)
else:
taskname = generate_random_string(5)

while (pid / "pages" / taskname / ".json").exists():
taskname = generate_random_string(5)

args = [
str(CLI_BACKEND_PATH),
f"--encoding={encoding}",
f"--guess_dtypes={'true' if guess_datatypes else 'false'}",
f"--delimiter={wrap(delimiter)}",
f"--quotechar={wrap(text_qualifier)}",
f"--lineterminator={wrap(newline)}",
f"--skipinitialspace={'true' if strip_leading_and_tailing_whitespace else 'false'}",
f"--skiptrailingspace={'true' if strip_leading_and_tailing_whitespace else 'false'}",
f"--quoting={quoting}",
f"import",
f"{wrap(str(path))}",
f"--pid={wrap(str(pid))}",
f"--first_row_has_headers={'true' if first_row_has_headers else 'false'}",
f"--header_row_index={header_row_index}",
f"--page_size={Config.PAGE_SIZE}",
f"--execute=false",
f"--use_json=true",
f"--name={taskname}"
]

if start is not None:
args.append(f"--start={start}")

if limit is not None:
args.append(f"--limit={limit}")

if columns is not None:
assert isinstance(columns, list)
assert all(isinstance(v, str) for v in columns)

args.append(f"--columns='[{','.join([c for c in columns])}]'")

sp.run(" ".join(args), shell=True, check=True)

table_path = pid / "pages" / (taskname + ".json")

with open(table_path) as f:
table = json.loads(f.read())
table = nl.text_reader(
pid=str(pid),
path=str(path),
encoding=encoding,
first_row_has_headers=first_row_has_headers, header_row_index=header_row_index,
columns=columns,
start=start, limit=limit,
guess_datatypes=guess_datatypes,
newline=newline, delimiter=delimiter, text_qualifier=text_qualifier,
quoting=quoting,
strip_leading_and_tailing_whitespace=strip_leading_and_tailing_whitespace,
page_size=Config.PAGE_SIZE
)

task_info = table["task"]
task_columns = table["columns"]
Expand All @@ -172,6 +100,7 @@ def text_reader(
ti_guess_dtypes = task_info["guess_dtypes"]
ti_tasks = task_info["tasks"]
ti_import_fields = task_info["import_fields"]
ti_import_field_names = task_info["import_field_names"]

is_windows = platform.system() == "Windows"
use_logical = False if is_windows else True
Expand Down Expand Up @@ -200,34 +129,42 @@ def text_reader(
elif Config.MULTIPROCESSING_MODE == Config.FORCE:
is_sp = False

try:
if is_sp:
res = [
task.f(*task.args, **task.kwargs)
for task in tqdm(tasks, "importing file")
]
else:
with TaskManager(cpus) as tm:
res = tm.execute(tasks, tqdm)

if not all(isinstance(r, list) for r in res):
raise Exception("failed")
finally:
if USE_CLI_BACKEND and table_path.exists():
table_path.unlink()
if is_sp:
res = [
task.f(*task.args, **task.kwargs)
for task in tqdm(tasks, "importing file")
]
else:
with TaskManager(cpus) as tm:
res = tm.execute(tasks, tqdm)

if not all(isinstance(r, list) for r in res):
raise Exception("failed")

col_path = pid
column_dict = {
cols["name"]: Column(col_path)
for cols in task_columns
cols: Column(col_path)
for cols in ti_import_field_names
}

columns = list(column_dict.values())
for res_pages in res:
for c, p in zip(columns, res_pages):
c.pages.append(p)
col_map = {
n: res_pages[i]
for i, n in enumerate(ti_import_field_names)
}

for k, c in column_dict.items():
c.pages.append(col_map[k])

if columns is None:
columns = [c["name"] for c in task_columns]

table_dict = {
a["name"]: column_dict[b]
for a, b in zip(task_columns, columns)
}

table = T(columns=column_dict)
table = T(columns=table_dict)

return table

Expand Down
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 8, 2
major, minor, patch = 2023, 8, 3
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)

0 comments on commit 46b10cb

Please sign in to comment.