diff --git a/odo/backends/dask.py b/odo/backends/dask.py index 8220bea3..62258623 100644 --- a/odo/backends/dask.py +++ b/odo/backends/dask.py @@ -1,23 +1,19 @@ from __future__ import absolute_import, division, print_function -from operator import add from collections import Iterator import numpy as np -from toolz import merge, accumulate +from toolz import keyfilter from datashape.dispatch import dispatch from datashape import DataShape -from dask.array.core import rec_concatenate, Array, getem, get, names, from_array +from dask.array.core import rec_concatenate, Array, get, from_array from dask.bag.core import Bag -from dask.core import flatten +import dask.bag as db from dask.compatibility import long -from odo import append, chunks, convert, discover, into, TextFile +from odo import append, chunks, convert, discover, TextFile from ..utils import keywords -############## -# dask.Array # -############## @discover.register(Array) def discover_dask_array(a, **kwargs): @@ -26,8 +22,12 @@ def discover_dask_array(a, **kwargs): arrays = [np.ndarray] + try: import h5py +except ImportError: + pass +else: arrays.append(h5py.Dataset) @dispatch(h5py.Dataset, (int, long)) @@ -39,24 +39,24 @@ def resize(x, size): @dispatch(h5py.Dataset, tuple) def resize(x, shape): return x.resize(shape) -except ImportError: - pass + try: import bcolz +except ImportError: + pass +else: arrays.append(bcolz.carray) @dispatch(bcolz.carray, (int, long)) def resize(x, size): return x.resize(size) -except ImportError: - pass @convert.register(Array, tuple(arrays), cost=1.) -def array_to_dask(x, name=None, blockshape=None, **kwargs): - if blockshape is None: - raise NotImplementedError("blockshape cannot be None") - return from_array(x, blockshape=blockshape, name=name, **kwargs) +def array_to_dask(x, name=None, chunks=None, **kwargs): + if chunks is None: + raise ValueError("chunks cannot be None") + return from_array(x, chunks=chunks, name=name, **kwargs) @convert.register(np.ndarray, Array, cost=10.) @@ -75,11 +75,9 @@ def store_Array_in_ooc_data(out, arr, inplace=False, **kwargs): # Resize output dataset to accept new data assert out.shape[1:] == arr.shape[1:] resize(out, out.shape[0] + arr.shape[0]) # elongate - return arr.store(out) + arr.store(out) + return out -############ -# dask.bag # -############ @convert.register(Iterator, Bag) def bag_to_iterator(x, **kwargs): @@ -88,11 +86,10 @@ def bag_to_iterator(x, **kwargs): @convert.register(Bag, chunks(TextFile)) def bag_to_iterator(x, **kwargs): - return Bag.from_filenames([tf.path for tf in x]) + return db.from_filenames([tf.path for tf in x]) @convert.register(Bag, list) def bag_to_iterator(x, **kwargs): - keys = keywords(Bag.from_sequence) - kwargs2 = dict((k, v) for k, v in kwargs.items() if k in keys) - return Bag.from_sequence(x, **kwargs2) + kwargs = keyfilter(keywords(db.from_sequence).__contains__, kwargs) + return db.from_sequence(x, **kwargs) diff --git a/odo/backends/tests/test_dask.py b/odo/backends/tests/test_dask.py deleted file mode 100644 index 0d959f53..00000000 --- a/odo/backends/tests/test_dask.py +++ /dev/null @@ -1,113 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import pytest -pytest.importorskip('dask') -pytest.importorskip('dask.bag') - -from odo.backends.dask import append, merge -from dask.array.core import insert_to_ooc, Array -from dask.bag.core import Bag -from dask import core -from odo import chunks, convert, into, TextFile -from odo.utils import tmpfile, filetexts -import numpy as np - -#################### -# dask.array tests # -#################### - -def eq(a, b): - c = a == b - if isinstance(c, np.ndarray): - c = c.all() - return c - - -def test_convert(): - x = np.arange(600).reshape((20, 30)) - d = convert(Array, x, blockshape=(4, 5)) - - assert isinstance(d, Array) - - -def test_convert_to_numpy_array(): - x = np.arange(600).reshape((20, 30)) - d = convert(Array, x, blockshape=(4, 5)) - x2 = convert(np.ndarray, d) - - assert eq(x, x2) - - -def test_append_to_array(): - bcolz = pytest.importorskip('bcolz') - x = np.arange(600).reshape((20, 30)) - a = into(Array, x, blockshape=(4, 5)) - b = bcolz.zeros(shape=(0, 30), dtype=x.dtype) - - append(b, a) - assert eq(b[:], x) - - with tmpfile('hdf5') as fn: - h = into(fn+'::/data', a) - assert eq(h[:], x) - h.file.close() - - -def test_into_inplace(): - bcolz = pytest.importorskip('bcolz') - x = np.arange(600).reshape((20, 30)) - a = into(Array, x, blockshape=(4, 5)) - b = bcolz.zeros(shape=(20, 30), dtype=x.dtype) - - append(b, a, inplace=True) - assert eq(b[:], x) - - -def test_insert_to_ooc(): - x = np.arange(600).reshape((20, 30)) - y = np.empty(shape=x.shape, dtype=x.dtype) - a = convert(Array, x, blockshape=(4, 5)) - - dsk = insert_to_ooc(y, a) - core.get(merge(dsk, a.dask), list(dsk.keys())) - - assert eq(y, x) - - -def test__array__(): - x = np.arange(600).reshape((20, 30)) - d = convert(Array, x, blockshape=(4, 5)) - - assert eq(x, np.array(d)) - -################## -# dask.bag tests # -################## - - -def inc(x): - return x + 1 - -dsk = {('x', 0): (range, 5), - ('x', 1): (range, 5), - ('x', 2): (range, 5)} - -L = list(range(5)) * 3 - -b = Bag(dsk, 'x', 3) - -def test_convert_bag_to_list(): - assert convert(list, b) == L - -def test_convert_logfiles_to_bag(): - with filetexts({'a1.log': 'Hello\nWorld', 'a2.log': 'Hola\nMundo'}) as fns: - logs = chunks(TextFile)(list(map(TextFile, fns))) - b = convert(Bag, logs) - assert isinstance(b, Bag) - assert 'a1.log' in str(b.dask.values()) - assert convert(list, b) == convert(list, logs) - - -def test_sequence(): - b = into(Bag, [1, 2, 3]) - assert set(b.map(inc)) == set([2, 3, 4]) diff --git a/odo/backends/tests/test_dask_array.py b/odo/backends/tests/test_dask_array.py new file mode 100644 index 00000000..c16808b7 --- /dev/null +++ b/odo/backends/tests/test_dask_array.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import, division, print_function + +import pytest + +pytest.importorskip('dask') + +from toolz import merge +from odo.backends.dask import append +from dask.array.core import insert_to_ooc, Array +from dask import core +from odo import convert, into, odo +from odo.utils import tmpfile +import numpy as np + + +def eq(a, b): + c = a == b + if isinstance(c, np.ndarray): + c = c.all() + return c + + +def test_convert(): + x = np.arange(600).reshape((20, 30)) + d = convert(Array, x, chunks=(4, 5)) + assert isinstance(d, Array) + + +def test_convert_to_numpy_array(): + x = np.arange(600).reshape((20, 30)) + d = convert(Array, x, chunks=(4, 5)) + x2 = convert(np.ndarray, d) + assert eq(x, x2) + + +def test_append_to_array(): + bcolz = pytest.importorskip('bcolz') + x = np.arange(600).reshape((20, 30)) + a = into(Array, x, chunks=(4, 5)) + b = bcolz.zeros(shape=(0, 30), dtype=x.dtype) + + append(b, a) + assert eq(b[:], x) + + with tmpfile('.hdf5') as fn: + h = odo(a, fn + '::/data') + assert eq(h[:], x) + h.file.close() + + +def test_into_inplace(): + bcolz = pytest.importorskip('bcolz') + x = np.arange(600).reshape((20, 30)) + a = into(Array, x, chunks=(4, 5)) + b = bcolz.zeros(shape=(20, 30), dtype=x.dtype) + + append(b, a, inplace=True) + assert eq(b[:], x) + + +def test_insert_to_ooc(): + x = np.arange(600).reshape((20, 30)) + y = np.empty(shape=x.shape, dtype=x.dtype) + a = convert(Array, x, chunks=(4, 5)) + + dsk = insert_to_ooc(y, a) + core.get(merge(dsk, a.dask), list(dsk.keys())) + + assert eq(y, x) + + +def test_array_interface(): + x = np.arange(600).reshape((20, 30)) + d = convert(Array, x, chunks=(4, 5)) + + assert eq(x, np.array(d)) diff --git a/odo/backends/tests/test_dask_bag.py b/odo/backends/tests/test_dask_bag.py new file mode 100644 index 00000000..4a4e39dd --- /dev/null +++ b/odo/backends/tests/test_dask_bag.py @@ -0,0 +1,38 @@ +import pytest + +pytest.importorskip('dask.bag') + +from odo import chunks, TextFile, odo +from dask.bag import Bag +from odo.utils import filetexts + + +def inc(x): + return x + 1 + + +dsk = {('x', 0): (range, 5), + ('x', 1): (range, 5), + ('x', 2): (range, 5)} + +L = list(range(5)) * 3 + +b = Bag(dsk, 'x', 3) + + +def test_convert_bag_to_list(): + assert odo(b, list) == L + + +def test_convert_logfiles_to_bag(): + with filetexts({'a1.log': 'Hello\nWorld', 'a2.log': 'Hola\nMundo'}) as fns: + logs = chunks(TextFile)(list(map(TextFile, fns))) + b = odo(logs, Bag) + assert isinstance(b, Bag) + assert 'a1.log' in str(b.dask.values()) + assert odo(b, list) == odo(logs, list) + + +def test_sequence(): + b = odo([1, 2, 3], Bag) + assert set(b.map(inc)) == set([2, 3, 4])