diff --git a/odo/backends/dask.py b/odo/backends/dask.py index 8220bea3..62258623 100644 --- a/odo/backends/dask.py +++ b/odo/backends/dask.py @@ -1,23 +1,19 @@ from __future__ import absolute_import, division, print_function -from operator import add from collections import Iterator import numpy as np -from toolz import merge, accumulate +from toolz import keyfilter from datashape.dispatch import dispatch from datashape import DataShape -from dask.array.core import rec_concatenate, Array, getem, get, names, from_array +from dask.array.core import rec_concatenate, Array, get, from_array from dask.bag.core import Bag -from dask.core import flatten +import dask.bag as db from dask.compatibility import long -from odo import append, chunks, convert, discover, into, TextFile +from odo import append, chunks, convert, discover, TextFile from ..utils import keywords -############## -# dask.Array # -############## @discover.register(Array) def discover_dask_array(a, **kwargs): @@ -26,8 +22,12 @@ def discover_dask_array(a, **kwargs): arrays = [np.ndarray] + try: import h5py +except ImportError: + pass +else: arrays.append(h5py.Dataset) @dispatch(h5py.Dataset, (int, long)) @@ -39,24 +39,24 @@ def resize(x, size): @dispatch(h5py.Dataset, tuple) def resize(x, shape): return x.resize(shape) -except ImportError: - pass + try: import bcolz +except ImportError: + pass +else: arrays.append(bcolz.carray) @dispatch(bcolz.carray, (int, long)) def resize(x, size): return x.resize(size) -except ImportError: - pass @convert.register(Array, tuple(arrays), cost=1.) -def array_to_dask(x, name=None, blockshape=None, **kwargs): - if blockshape is None: - raise NotImplementedError("blockshape cannot be None") - return from_array(x, blockshape=blockshape, name=name, **kwargs) +def array_to_dask(x, name=None, chunks=None, **kwargs): + if chunks is None: + raise ValueError("chunks cannot be None") + return from_array(x, chunks=chunks, name=name, **kwargs) @convert.register(np.ndarray, Array, cost=10.) @@ -75,11 +75,9 @@ def store_Array_in_ooc_data(out, arr, inplace=False, **kwargs): # Resize output dataset to accept new data assert out.shape[1:] == arr.shape[1:] resize(out, out.shape[0] + arr.shape[0]) # elongate - return arr.store(out) + arr.store(out) + return out -############ -# dask.bag # -############ @convert.register(Iterator, Bag) def bag_to_iterator(x, **kwargs): @@ -88,11 +86,10 @@ def bag_to_iterator(x, **kwargs): @convert.register(Bag, chunks(TextFile)) def bag_to_iterator(x, **kwargs): - return Bag.from_filenames([tf.path for tf in x]) + return db.from_filenames([tf.path for tf in x]) @convert.register(Bag, list) def bag_to_iterator(x, **kwargs): - keys = keywords(Bag.from_sequence) - kwargs2 = dict((k, v) for k, v in kwargs.items() if k in keys) - return Bag.from_sequence(x, **kwargs2) + kwargs = keyfilter(keywords(db.from_sequence).__contains__, kwargs) + return db.from_sequence(x, **kwargs) diff --git a/odo/backends/tests/test_dask_array.py b/odo/backends/tests/test_dask_array.py index 4bdccaec..c16808b7 100644 --- a/odo/backends/tests/test_dask_array.py +++ b/odo/backends/tests/test_dask_array.py @@ -8,7 +8,7 @@ from odo.backends.dask import append from dask.array.core import insert_to_ooc, Array from dask import core -from odo import convert, into +from odo import convert, into, odo from odo.utils import tmpfile import numpy as np @@ -42,8 +42,8 @@ def test_append_to_array(): append(b, a) assert eq(b[:], x) - with tmpfile('hdf5') as fn: - h = into(fn + '::/data', a) + with tmpfile('.hdf5') as fn: + h = odo(a, fn + '::/data') assert eq(h[:], x) h.file.close()