Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with repeating columns #100

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ docs/

# personal notes
*scratch*.py
build_nim.sh

# local confidential data
tests/ndata/*
4 changes: 3 additions & 1 deletion tablite/_nimlite/table.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TabliteTasks* = object
dialect*: TabliteDialect
tasks*: seq[TabliteTask]
import_fields*: seq[uint]
import_field_names*: seq[string]
page_size*: uint
guess_dtypes*: bool

Expand Down Expand Up @@ -62,13 +63,14 @@ proc newTabliteTask*(pages: seq[string], offset: uint, count: uint): TabliteTask

proc newTabliteTasks*(
path: string, encoding: string, dialect: Dialect,
tasks: seq[TabliteTask], import_fields: seq[uint], page_size: uint, guess_dtypes: bool): TabliteTasks =
tasks: seq[TabliteTask], import_fields: seq[uint], import_field_names: seq[string], page_size: uint, guess_dtypes: bool): TabliteTasks =
TabliteTasks(
path: path,
encoding: encoding,
dialect: dialect.newTabliteDialect,
tasks: tasks,
import_fields: import_fields,
import_field_names: import_field_names,
page_size: page_size,
guess_dtypes: guess_dtypes
)
Expand Down
3 changes: 3 additions & 0 deletions tablite/_nimlite/textreader.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ proc importTextFile*(
for ix, name in enumerate(fields):
if name in imp_columns:
{uint ix: name}

let import_fields = collect: (for k in field_relation.keys: k)
let import_field_names = collect: (for v in field_relation.values: v)

var field_relation_inv = collect(initOrderedTable()):
for (ix, name) in field_relation.pairs:
Expand Down Expand Up @@ -174,6 +176,7 @@ proc importTextFile*(
dialect = dia,
tasks = task_list,
import_fields = import_fields,
import_field_names = import_field_names,
page_size = page_size,
guess_dtypes = guess_dtypes
)
Expand Down
201 changes: 69 additions & 132 deletions tablite/nimlite.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
import sys
import json
import psutil
import platform
import subprocess as sp
from pathlib import Path
from tqdm import tqdm as _tqdm
from tablite.config import Config
from mplite import Task, TaskManager
from tablite.utils import load_numpy
from tablite.utils import generate_random_string
from tablite.base import SimplePage, Column, pytype_from_iterable

IS_WINDOWS = platform.system() == "Windows"
USE_CLI_BACKEND = IS_WINDOWS

CLI_BACKEND_PATH = Path(__file__).parent / f"_nimlite/nimlite{'.exe' if IS_WINDOWS else ''}"

if not USE_CLI_BACKEND:
if True:
paths = sys.argv[:]
if Config.USE_NIMPORTER:
import nimporter
Expand All @@ -37,52 +29,34 @@ def __init__(self, id, path, data) -> None:


def text_reader_task(*, pid, path, encoding, dialect, task, import_fields, guess_dtypes):
if not USE_CLI_BACKEND:
nl.text_reader_task(
path=path,
encoding=encoding,
dia_delimiter=dialect["delimiter"],
dia_quotechar=dialect["quotechar"],
dia_escapechar=dialect["escapechar"],
dia_doublequote=dialect["doublequote"],
dia_quoting=dialect["quoting"],
dia_skipinitialspace=dialect["skipinitialspace"],
dia_skiptrailingspace=dialect["skiptrailingspace"],
dia_lineterminator=dialect["lineterminator"],
dia_strict=dialect["strict"],
tsk_pages=task["pages"],
tsk_offset=task["offset"],
tsk_count=task["count"],
import_fields=import_fields,
guess_dtypes=guess_dtypes
)
else:
args = [
str(CLI_BACKEND_PATH),
f"--encoding={encoding}",
f"--guess_dtypes={'true' if guess_dtypes else 'false'}",
f"--delimiter=\"{dialect['delimiter']}\"",
f"--quotechar=\"{dialect['quotechar']}\"",
f"--lineterminator=\"{dialect['lineterminator']}\"",
f"--skipinitialspace={'true' if dialect['skipinitialspace'] else 'false'}",
f"--skiptrailingspace={'true' if dialect['skiptrailingspace'] else 'false'}",
f"--quoting={dialect['quoting']}",
f"task",
f"{wrap(str(path))}",
f"{task['offset']}",
f"{task['count']}",
f"--pages={','.join(task['pages'])}",
f"--fields={','.join((str(v) for v in import_fields))}"
]

sp.run(" ".join(args), shell=True, check=True)
nl.text_reader_task(
path=path,
encoding=encoding,
dia_delimiter=dialect["delimiter"],
dia_quotechar=dialect["quotechar"],
dia_escapechar=dialect["escapechar"],
dia_doublequote=dialect["doublequote"],
dia_quoting=dialect["quoting"],
dia_skipinitialspace=dialect["skipinitialspace"],
dia_skiptrailingspace=dialect["skiptrailingspace"],
dia_lineterminator=dialect["lineterminator"],
dia_strict=dialect["strict"],
tsk_pages=task["pages"],
tsk_offset=task["offset"],
tsk_count=task["count"],
import_fields=import_fields,
guess_dtypes=guess_dtypes
)

pages = []
for p in (Path(p) for p in task["pages"]):
id = int(p.name.replace(p.suffix, ""))
arr = load_numpy(p)
page = NimPage(id, pid, arr)
pages.append(page)
try:
id = int(p.name.replace(p.suffix, ""))
arr = load_numpy(p)
page = NimPage(id, pid, arr)
pages.append(page)
except:
raise

return pages

Expand All @@ -103,65 +77,19 @@ def text_reader(
assert isinstance(path, Path)
assert isinstance(pid, Path)

if not USE_CLI_BACKEND:
table = nl.text_reader(
pid=str(pid),
path=str(path),
encoding=encoding,
first_row_has_headers=first_row_has_headers, header_row_index=header_row_index,
columns=columns,
start=start, limit=limit,
guess_datatypes=guess_datatypes,
newline=newline, delimiter=delimiter, text_qualifier=text_qualifier,
quoting=quoting,
strip_leading_and_tailing_whitespace=strip_leading_and_tailing_whitespace,
page_size=Config.PAGE_SIZE
)
else:
taskname = generate_random_string(5)

while (pid / "pages" / taskname / ".json").exists():
taskname = generate_random_string(5)

args = [
str(CLI_BACKEND_PATH),
f"--encoding={encoding}",
f"--guess_dtypes={'true' if guess_datatypes else 'false'}",
f"--delimiter={wrap(delimiter)}",
f"--quotechar={wrap(text_qualifier)}",
f"--lineterminator={wrap(newline)}",
f"--skipinitialspace={'true' if strip_leading_and_tailing_whitespace else 'false'}",
f"--skiptrailingspace={'true' if strip_leading_and_tailing_whitespace else 'false'}",
f"--quoting={quoting}",
f"import",
f"{wrap(str(path))}",
f"--pid={wrap(str(pid))}",
f"--first_row_has_headers={'true' if first_row_has_headers else 'false'}",
f"--header_row_index={header_row_index}",
f"--page_size={Config.PAGE_SIZE}",
f"--execute=false",
f"--use_json=true",
f"--name={taskname}"
]

if start is not None:
args.append(f"--start={start}")

if limit is not None:
args.append(f"--limit={limit}")

if columns is not None:
assert isinstance(columns, list)
assert all(isinstance(v, str) for v in columns)

args.append(f"--columns='[{','.join([c for c in columns])}]'")

sp.run(" ".join(args), shell=True, check=True)

table_path = pid / "pages" / (taskname + ".json")

with open(table_path) as f:
table = json.loads(f.read())
table = nl.text_reader(
pid=str(pid),
path=str(path),
encoding=encoding,
first_row_has_headers=first_row_has_headers, header_row_index=header_row_index,
columns=columns,
start=start, limit=limit,
guess_datatypes=guess_datatypes,
newline=newline, delimiter=delimiter, text_qualifier=text_qualifier,
quoting=quoting,
strip_leading_and_tailing_whitespace=strip_leading_and_tailing_whitespace,
page_size=Config.PAGE_SIZE
)

task_info = table["task"]
task_columns = table["columns"]
Expand All @@ -172,6 +100,7 @@ def text_reader(
ti_guess_dtypes = task_info["guess_dtypes"]
ti_tasks = task_info["tasks"]
ti_import_fields = task_info["import_fields"]
ti_import_field_names = task_info["import_field_names"]

is_windows = platform.system() == "Windows"
use_logical = False if is_windows else True
Expand Down Expand Up @@ -200,34 +129,42 @@ def text_reader(
elif Config.MULTIPROCESSING_MODE == Config.FORCE:
is_sp = False

try:
if is_sp:
res = [
task.f(*task.args, **task.kwargs)
for task in tqdm(tasks, "importing file")
]
else:
with TaskManager(cpus) as tm:
res = tm.execute(tasks, tqdm)

if not all(isinstance(r, list) for r in res):
raise Exception("failed")
finally:
if USE_CLI_BACKEND and table_path.exists():
table_path.unlink()
if is_sp:
res = [
task.f(*task.args, **task.kwargs)
for task in tqdm(tasks, "importing file")
]
else:
with TaskManager(cpus) as tm:
res = tm.execute(tasks, tqdm)

if not all(isinstance(r, list) for r in res):
raise Exception("failed")

col_path = pid
column_dict = {
cols["name"]: Column(col_path)
for cols in task_columns
cols: Column(col_path)
for cols in ti_import_field_names
}

columns = list(column_dict.values())
for res_pages in res:
for c, p in zip(columns, res_pages):
c.pages.append(p)
col_map = {
n: res_pages[i]
for i, n in zip(ti_import_fields, ti_import_field_names)
}

for k, c in column_dict.items():
c.pages.append(col_map[k])

if columns is None:
columns = [c["name"] for c in task_columns]

table_dict = {
a["name"]: column_dict[b]
for a, b in zip(task_columns, columns)
}

table = T(columns=column_dict)
table = T(columns=table_dict)

return table

Expand Down
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 8, 2
major, minor, patch = 2023, 8, 3
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)