Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working example of using Async/Await #871

Open
Sarafudinov opened this issue Apr 26, 2024 · 11 comments
Open

Working example of using Async/Await #871

Sarafudinov opened this issue Apr 26, 2024 · 11 comments

Comments

@Sarafudinov
Copy link

Hello, I’m working with s3fs and haven’t found any examples of how to work with asynchrony; after looking through your checklog I saw that you’ve already added functionality. Could you please tell me how to use asynchronous methods for reading/writing and listing files
the same question was asked in 2020
then this functionality was not ready and only a link to a small piece was provided https://s3fs.readthedocs.io/en/latest/#async

Thanks in advance for your attention!)

@martindurant
Copy link
Member

What kind of workflow would you like? Perhaps I can sketch out what you need to do, and you help update the docs?

@Sarafudinov
Copy link
Author

Thank you, I would like to learn more, see examples of how to use the reading (s3.open), writing and listing (s3.ls) methods in asynchronous code

  1. I need the ability to asynchronously download files from a bucket (as far as I understand, this is the open_async method)
  2. I would also like to see an example of asynchronously writing files to a bucket
  3. and regarding the listing of files, I don’t quite understand whether there is an analogue when initializing s3 = S3FileSystem(asynchronous=True) since to work with the listing (s3.ls(path)) you have to use synchronous initialization s3 = S3FileSystem()

@Sarafudinov
Copy link
Author

Sarafudinov commented Apr 30, 2024

all I need is an example of how to read files from the bucket, write files to the bucket and look at the listing of files on the bucket

at the same time, so that it is all asynchronous
As far as I understand from your changelog, you have already implemented this, but I haven’t found any examples

2023.4.0
Add streaming async read file (#722)

or for now link only to the botocore library https://aiobotocore.readthedocs.io/en/latest/examples/s3/basic_usage.html

@martindurant
Copy link
Member

Sorry to be slow, thanks for the ping.
Here is an example:

import asyncio

async def f():
    import fsspec
    mybucket = "mybucket"   # <- change this
    fs = fsspec.filesystem("s3", asynchronous=True, anon=False)
    print(await fs._ls(mybucket))
    await fs._pipe_file(f"{mybucket}/afile", b"hello world")
    print(await fs._cat_file(f"{mybucket}/afile"))

asyncio.run(f())

You should note that the file-like interface (via fs.open(...)) is not async, because the upstream python file-like object (io.IOBase) is not async. This is why open_async exists, but this "streaming" API is incomplete.

@Sarafudinov
Copy link
Author

do not quite understand😅
As far as I understand, asynchronous reading is designed through s3fs.open_async()
in your example you indicate something like this:

async def async_read():
    s3 = S3FileSystem(..., asynchronous=True)
    fs = await s3.open_async(path, "rb")
    result = await fs.read()

this is what this asynchronous reading looks like + - I figured it out after digging through your git😁

is there any example when writing is used?

async def async_write(some_content):
    s3 = S3FileSystem(..., asynchronous=True)
    fs = await s3.open_async(path, "w")
    await fs.write(some_content)

since the only thing I found was also an issue #853

@martindurant
Copy link
Member

No, we do not have asynchronous stream writing, sorry. If you can think of a way to do it, I would be happy.

@Sarafudinov
Copy link
Author

Hello,
I tried to add the logic of asynchronous writing,
I hope this will somehow help in development!
#885

@espdev
Copy link

espdev commented Dec 6, 2024

@martindurant

No, we do not have asynchronous stream writing, sorry. If you can think of a way to do it, I would be happy.

I recently encountered this too and spent a long time figuring out what the problem was. There was just an error that the file was already closed. It turns out that the write method is just not implemented in the class S3AsyncStreamedFile. A method of the base class is called in which the closed flag turns out to be True.

You could just override write method and raise NotImplementedError exception, it would save time because the documentation doesn't say a word about all this.

@martindurant
Copy link
Member

I think the following is the right way to do it. The API for async files for writing shouldn't change, but this will ensure an early not-implemented error:

--- a/fsspec/asyn.py
+++ b/fsspec/asyn.py
@@ -1092,7 +1092,7 @@ class AbstractAsyncStreamedFile(AbstractBufferedFile):
         raise NotImplementedError

     async def _initiate_upload(self):
-        pass
+        raise NotImplementedError

@wild-endeavor
Copy link

wild-endeavor commented Dec 12, 2024

What is the right way to use open_async (specifically for "wb")? Will it ever be a context manager?

@n-splv
Copy link

n-splv commented Dec 19, 2024

Would highly appreciate if anyone added a working example for Jupyterlab. RN this fails with RuntimeError: no running event loop:

async def foo():
    fs = s3fs.S3FileSystem(profile=profile, asynchronous=True)

    async with await fs.open(filepath, mode="rb") as f:
        return pd.read_csv(f)


result = await foo()
# This doesn't help either

import asyncio
import nest_asyncio

nest_asyncio.apply()

fs = s3fs.S3FileSystem(profile=profile, asynchronous=True, loop=asyncio.get_event_loop())

All while simple async calls have been supported in Jupyter for quite a while now. There's something with the internal library workings, but I don't have enough time to dig into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants