Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API parameter to cap thread usage #46

Merged
merged 9 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/zarrv2-compressed-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ int main() {
.s3_settings = &s3,
.compression_settings = &compression,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, c, y, x)
Expand Down
3 changes: 2 additions & 1 deletion examples/zarrv2-raw-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ int main() {
.s3_settings = NULL,
.compression_settings = NULL,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-compressed-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ main()
.compression_settings = &compression,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-compressed-multiscale-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ main()
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.multiscale = true,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, z, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-compressed-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ main()
.compression_settings = &compression,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-raw-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ main()
.compression_settings = NULL,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-raw-multiscale-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ main()
.multiscale = true,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up 5D array (t, c, z, y, x)
Expand Down
3 changes: 2 additions & 1 deletion examples/zarrv3-raw-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ int main() {
.s3_settings = &s3,
.compression_settings = NULL, // No compression
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, z, y, x)
Expand Down
1 change: 1 addition & 0 deletions include/acquire.zarr.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C"
bool multiscale; /**< Whether to stream to multiple levels of detail. */
ZarrDataType data_type; /**< The pixel data type of the dataset. */
ZarrVersion version; /**< The version of the Zarr format to use. 2 or 3. */
unsigned int max_threads; /**< The maximum number of threads to use in the stream. Set to 0 to use the supported number of concurrent threads. */
} ZarrStreamSettings;

typedef struct ZarrStream_s ZarrStream;
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "acquire-zarr"
version = "0.0.5"
description = "Python bindings for acquire-zarr"
version = "0.1.0"
description = "Performant streaming to Zarr storage, on filesystem or cloud"
authors = [
{name = "Alan Liddell", email = "[email protected]"}
]
Expand Down
14 changes: 13 additions & 1 deletion python/acquire-zarr-py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ class PyZarrStreamSettings
ZarrVersion version() const { return version_; }
void set_version(ZarrVersion version) { version_ = version; }

unsigned int max_threads() const { return max_threads_; }
void set_max_threads(unsigned int max_threads)
{
max_threads_ = max_threads;
}

private:
std::string store_path_;
std::optional<std::string> custom_metadata_{ std::nullopt };
Expand All @@ -284,6 +290,7 @@ class PyZarrStreamSettings
bool multiscale_ = false;
ZarrDataType data_type_{ ZarrDataType_uint8 };
ZarrVersion version_{ ZarrVersion_2 };
unsigned int max_threads_{ std::thread::hardware_concurrency() };
};

class PyZarrStream
Expand All @@ -304,6 +311,7 @@ class PyZarrStream
.multiscale = settings.multiscale(),
.data_type = settings.data_type(),
.version = settings.version(),
.max_threads = settings.max_threads(),
};

store_path_ = settings.store_path();
Expand Down Expand Up @@ -632,6 +640,7 @@ PYBIND11_MODULE(acquire_zarr, m)
std::string(data_type_to_str(self.data_type())) +
", version=ZarrVersion." +
std::string(self.version() == ZarrVersion_2 ? "V2" : "V3") +
", max_threads=" + std::to_string(self.max_threads()) +
")";
return repr;
})
Expand Down Expand Up @@ -692,7 +701,10 @@ PYBIND11_MODULE(acquire_zarr, m)
&PyZarrStreamSettings::set_data_type)
.def_property("version",
&PyZarrStreamSettings::version,
&PyZarrStreamSettings::set_version);
&PyZarrStreamSettings::set_version)
.def_property("max_threads",
&PyZarrStreamSettings::max_threads,
&PyZarrStreamSettings::set_max_threads);

py::class_<PyZarrStream>(m, "ZarrStream")
.def(py::init<PyZarrStreamSettings>())
Expand Down
1 change: 1 addition & 0 deletions python/acquire_zarr.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ class StreamSettings:
s3: Optional[S3Settings]
store_path: str
version: ZarrVersion
max_threads: int

def __init__(self, **kwargs) -> None:
...
Expand Down
7 changes: 7 additions & 0 deletions python/tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,10 @@ def test_set_version(settings):
settings.version = acquire_zarr.ZarrVersion.V3

assert settings.version == acquire_zarr.ZarrVersion.V3


def test_set_max_threads(settings):
assert settings.max_threads > 0 # depends on your system, but will be nonzero

settings.max_threads = 4
assert settings.max_threads == 4
16 changes: 14 additions & 2 deletions src/streaming/zarr.stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,21 @@ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings)
commit_settings_(settings);

// spin up thread pool
unsigned int max_threads = settings->max_threads;
const auto hardware_concurrency = std::thread::hardware_concurrency();

if (max_threads == 0) {
if (hardware_concurrency > 0) {
LOG_DEBUG("Using ", hardware_concurrency, " threads");
max_threads = hardware_concurrency;
} else {
LOG_WARNING(
"Unable to determine hardware concurrency, using 1 thread");
max_threads = 1;
}
}
thread_pool_ = std::make_shared<zarr::ThreadPool>(
std::thread::hardware_concurrency(),
[this](const std::string& err) { this->set_error_(err); });
max_threads, [this](const std::string& err) { this->set_error_(err); });

// allocate a frame buffer
frame_buffer_.resize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ setup()
.s3_settings = nullptr,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

ZarrCompressionSettings compression_settings = {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v2-compressed-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ setup()
.store_path = TEST,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v2-raw-to-filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5));
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v2-raw-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ setup()
.s3_settings = nullptr,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

ZarrCompressionSettings compression_settings = {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v3-compressed-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ setup()
.store_path = TEST,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v3-raw-to-filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5));
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v3-raw-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
1 change: 1 addition & 0 deletions tests/unit-tests/create-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ main()
ZarrStreamSettings settings;
memset(&settings, 0, sizeof(settings));
settings.version = ZarrVersion_2;
settings.max_threads = std::thread::hardware_concurrency();

try {
// try to create a stream with no store path
Expand Down
Loading