Skip to content

Commit

Permalink
Merge pull request #155 from realratchet/master
Browse files Browse the repository at this point in the history
update mplite
  • Loading branch information
realratchet authored Mar 27, 2024
2 parents 5e47849 + eff0c4c commit 3737dc6
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 30 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pyexcel-xlsx==0.6.0
pyexcel-xls==0.7.0
pandas>=2.2.1
pyuca>=1.2
mplite>=1.2.5
mplite>=1.3.0
PyYAML==6.0
openpyxl==3.0.10 # newest version breaks pyexcel
h5py>=3.6.0
Expand Down
12 changes: 2 additions & 10 deletions tablite/joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,8 @@ def update(self, n):
tasks.append(task)

if use_mp:
with TaskManager(cpu_count=_vpus(tasks)) as tm:
with TaskManager(cpu_count=_vpus(tasks), error_mode="exception") as tm:
results = tm.execute(tasks, pbar=ProgressBar())
errors = [s for s in results if isinstance(s, str)]

if len(errors) > 0:
raise Exception(errors[0])
else:
results = [t.f(*t.args, **t.kwargs) for t in tasks]

Expand Down Expand Up @@ -551,12 +547,8 @@ def update(self, n):
tasks.append(task)

if use_mp:
with TaskManager(cpu_count=_vpus(tasks)) as tm:
with TaskManager(cpu_count=_vpus(tasks), error_mode="exception") as tm:
results = tm.execute(tasks, pbar=ProgressBar())
errors = [s for s in results if isinstance(s, str)]

if len(errors) > 0:
raise Exception(errors[0])
else:
results = [t.f(*t.args, **t.kwargs) for t in tasks]

Expand Down
4 changes: 1 addition & 3 deletions tablite/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def _mp_lookup(T, other, index):
cpus = 1 # max(psutil.cpu_count(logical=False), 1)
step_size = math.ceil(len(T) / cpus)

with TaskManager(cpu_count=cpus) as tm: # keeps the CPU pool alive during the whole join.
with TaskManager(cpu_count=cpus, error_mode="exception") as tm: # keeps the CPU pool alive during the whole join.
# for table, columns, side in ([T, left_columns, LEFT], [other, right_columns, RIGHT]):

index, index_shm = share_mem(index, np.int64) # <-- this is index
Expand All @@ -149,8 +149,6 @@ def _mp_lookup(T, other, index):
start, end = end, end + step_size
# All CPUS now work on the same column and memory footprint is predetermined.
results = tm.execute(tasks)
if any(i is not None for i in results):
raise Exception("\n".join(filter(lambda x: x is not None, results)))

# As the data and index no longer is needed, then can be closed.
data_shm.close()
Expand Down
12 changes: 2 additions & 10 deletions tablite/nimlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,12 @@ def next_task(task: Task, page_info):

for task in tasks:
page = task.execute()
if isinstance(page, str):
raise Exception(page)

res.append(page)
else:
with TaskManager(cpus) as tm:
with TaskManager(cpus, error_mode="exception") as tm:
res = tm.execute(tasks, pbar=wrapped_pbar)

if not all(isinstance(r, list) for r in res):
raise Exception("failed")

col_path = pid
column_dict = {
cols: Column(col_path)
Expand Down Expand Up @@ -266,12 +261,9 @@ class WrapUpdate:
def update(self, n):
pbar.update(n * step_size)

with TaskManager(min(cpu_count, page_count)) as tm:
with TaskManager(min(cpu_count, page_count), error_mode="exception") as tm:
res = tm.execute(list(tasks), pbar=WrapUpdate())

if any(isinstance(r, str) for r in res):
raise Exception("tasks failed")

converted.extend(res)
else:
for task in tasks:
Expand Down
4 changes: 1 addition & 3 deletions tablite/sortation.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,8 @@ def _mp_reindex(T, index, tqdm=_tqdm, pbar=None):
tasks.append(t)

cpus = min(len(tasks), psutil.cpu_count(logical=False))
with TaskManager(cpu_count=cpus) as tm:
with TaskManager(cpu_count=cpus, error_mode="exception") as tm:
errs = tm.execute(tasks)
if any(errs):
raise Exception("\n".join(filter(lambda x: x is not None, errs)))

shm.close()
shm.unlink()
Expand Down
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 10, 13
major, minor, patch = 2023, 10, 14
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)
2 changes: 1 addition & 1 deletion tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ def test_page_refcount():
assert all(Page.refcounts.get(p.path, 0) == 1 for p in table["A"].pages), "Refcount expected to be 1"
assert all(Page.refcounts.get(p.path, 0) == 1 for p in table["B"].pages), "Refcount expected to be 1"

with TaskManager(1) as tm:
with TaskManager(1, error_mode="exception") as tm:
""" this will cause deep table copy by copying table from main process -> child process -> main process """
tasks = [Task(fn_foo_table, table)]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
def test_main():
args = list(range(10)) * 5
start = time.time()
with TaskManager() as tm:
with TaskManager(error_mode="exception") as tm:
tasks = [Task(f, *(arg / 10,), **{"hello": arg}) for arg in args]
results = tm.execute(tasks)
end = time.time()
Expand Down

0 comments on commit 3737dc6

Please sign in to comment.