-
-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: improves delete_dir for s3fs-backed FsspecStore #2661
base: main
Are you sure you want to change the base?
Feat: improves delete_dir for s3fs-backed FsspecStore #2661
Conversation
- override Store.delete_dir default method, which deletes keys one by one, to support bulk deletion for fsspec implementations that support a list of paths in the fs._rm method. - This can greatly reduce the number of requests to S3, which reduces likelihood of running into throttling errors and improves delete performance. - Currently, only s3fs is supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reservations for this code. It seems to me, that calling ._rm()
should be all that's required, and let fsspec handle everything else.
src/zarr/storage/_fsspec.py
Outdated
return | ||
|
||
try: | ||
import s3fs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a bad idea. We
- shouldn't import something the user may not need
- shouldn't special-case one backend
- make this functionality available only in one case, even though it's implemented for several.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree on all points, the reason I didn't call ._rm(path, recursive=True)
without checking the specific filesystem instance was that I wasn't sure whether the signature is the same for all fsspec backends. For example, it seems that ossfs does not support passing a top-level path str
with recursive=True
https://github.com/fsspec/ossfs/blob/3bac1f556fa06fdd414f6181aa153d1c93e3a460/src/ossfs/core.py#L332
Is there a cleaner way to make this functionality available for all supported backends? One thing that came to mind was subclassing FsspecStore, e.g., S3fsStore, but I figured that would be unpopular 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It _rm is a coroutine, call it (defined as async def _rm(self, path, recursive=False, batch_size=None, **kwargs)
). If not, call rm()
. How this interacts with the "make it async process" method - ossfs is sync - I am not sure.
Issues and PRs should be made to increase consistency between fsspec backend implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something like this?
async def delete_dir(self, prefix: str) -> None:
"""
Remove all keys and prefixes in the store that begin with a given prefix.
"""
if not self.supports_deletes:
raise NotImplementedError('This method is only available for stores that support deletes.')
if not self.supports_listing:
raise NotImplementedError('This method is only available for stores that support directory listing.')
self._check_writable()
# Safely construct the path/prefix to delete
# This helps avoid duplicated slashes and handles edge cases with empty prefix
# (e.g., "path/" + "/prefix" => "path/prefix")
def _path_join(*parts):
return "/".join(s.strip("/") for s in parts if s)
path_to_delete = _path_join(self.path, prefix)
if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm):
try:
await self.fs._rm(path_to_delete, recursive=True)
except self.allowed_exceptions:
pass
else:
try:
loop = asyncio.get_running_loop()
# Run the sync operation in a thread to avoid blocking the event loop
await loop.run_in_executor(None, functools.partial(self.fs.rm, path=path_to_delete, recursive=True))
except self.allowed_exceptions:
pass
Although, it seems that FsspecStore
only supports classes inheriting from AsyncFileSystem
, so maybe handling the synchronous case isn't necessary? See
zarr-python/src/zarr/storage/_fsspec.py
Lines 89 to 90 in 12f6012
if not self.fs.async_impl: | |
raise TypeError("Filesystem needs to support async operations.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that FsspecStore only supports classes inheriting from AsyncFileSystem
This is right, but you should explicitly check what happens if that async filsystem is actually a sync one wrapped with https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/asyn_wrapper.py#L32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapping a local filesystem appears to work as expected, though not sure about others
import time
import os
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
from fsspec.implementations.local import LocalFileSystem
import zarr
from zarr.core.sync import sync
import numpy as np
from zarr.storage._fsspec import FsspecStore
def main():
path = r"C:\Users\cameron.arshadi\Desktop\fsspec_test.zarr"
sync_fs = LocalFileSystem(auto_mkdir=True)
async_fs = AsyncFileSystemWrapper(sync_fs)
store = FsspecStore(async_fs, read_only=False, path=path)
start = time.time()
z = zarr.create_group(store=store, overwrite=True)
end = time.time()
print(f"Time to open store: {end - start:.2f} seconds")
# Create an array and time it
start = time.time()
a = z.create_array("foo", shape=(2000, 2000), chunks=(100, 100), dtype=np.uint16)
a[:] = np.random.randint(0, 1000, size=(2000, 2000), dtype=np.uint16)
end = time.time()
print(f"Time to create array: {end - start:.2f} seconds")
# Delete the array and time it
start = time.time()
sync(z.store.delete_dir(""))
end = time.time()
print(f"Time to delete array: {end - start:.2f} seconds")
if __name__ == "__main__":
main()
Using the following in FsspecStore:
async def delete_dir(self, prefix: str) -> None:
"""
Remove all keys and prefixes in the store that begin with a given prefix.
"""
if not self.supports_deletes:
raise NotImplementedError('This method is only available for stores that support deletes.')
if not self.supports_listing:
raise NotImplementedError('This method is only available for stores that support directory listing.')
self._check_writable()
# Safely construct the path/prefix to delete
# This helps avoid duplicated slashes and handles edge cases with empty prefix
# (e.g., "path/" + "/prefix" => "path/prefix")
def _path_join(*parts):
return "/".join(s.strip("/") for s in parts if s)
path_to_delete = _path_join(self.path, prefix)
if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm):
try:
await self.fs._rm(path_to_delete, recursive=True)
except self.allowed_exceptions:
pass
else:
raise NotImplementedError("The filesystem does not support async deletes")
Looking at https://github.com/fsspec/filesystem_spec/blob/1d34249f0b043907f86064c274a33454ec670ebe/fsspec/implementations/asyn_wrapper.py#L56-L71
it seems that any wrapped filesystems would have their synchronous rm()
methods converted to an async _rm()
, so at least in theory the if statement should always evaluate to True
. Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local is the one we really care about. I wonder if there are cases where _rm() does something necessary in the sync filesystem that needs wrapping - in which case, this will need a fix in async_wrapper, but shouldn't need any further work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, changes made in bfb4397
…shadi/zarr-python into feat-fsspecstore-bulk-delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @carshadi for continued efforts here. I'd like to see some fsspec specific tests here if possible.
# this is probably the same condition as `if self.fs.async_impl` | ||
if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): | ||
with suppress(*self.allowed_exceptions): | ||
await self.fs._rm(path_to_delete, recursive=True) | ||
else: | ||
raise NotImplementedError("The store does not support async deletes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we think about further simplifying this to:
# this is probably the same condition as `if self.fs.async_impl` | |
if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): | |
with suppress(*self.allowed_exceptions): | |
await self.fs._rm(path_to_delete, recursive=True) | |
else: | |
raise NotImplementedError("The store does not support async deletes") | |
with suppress(*self.allowed_exceptions): | |
await self.fs._rm(path_to_delete, recursive=True) |
I'll let @martindurant comment but I would think Fsspec would be responsible for raising a NotImplementedError
if this isn't supported by a backend.
Improves performance of
FsspecStore.delete_dir
when underlyingfs
iss3fs
, by passing a list of filepaths to be removed in bulk instead of one-by-one.Resolves #2659
TODO: