Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helpers for bulk method such as async_bulk sleep in blocking manner, preventing graceful shutdown #2484

Open
artem-shelkovnikov opened this issue Mar 22, 2024 · 5 comments · May be fixed by #2717
Labels
Area: Helpers Category: Enhancement good first issue Issues that are good for new contributors

Comments

@artem-shelkovnikov
Copy link
Member

artem-shelkovnikov commented Mar 22, 2024

We tried using async_bulk and async_streaming_bulk helpers to ingest data into Elasticsearch and they work great, but we've found out that they prevent our code from gracefully shutting down when CTRL+C is pressed.

Example code that sleeps:

await asyncio.sleep(
min(max_backoff, initial_backoff * 2 ** (attempt - 1))
)

It would be great to have a way to:

  • Either define how the sleep happens by passing sleep function into the client
  • Make Elasticsearch client internally cancel all sleeps when the client is closed
@artem-shelkovnikov
Copy link
Member Author

Example code that our product calls when using the client: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py (see comment on top of the file on how we collect and ingest data).

In short, we have a SyncOrchestrator class that internally creates two classes:

  • Extractor. This class is responsible to provide a generator that will return documents from 3-rd party system and put it into the MemQueue
  • Sink. This class is responsible to pick up the data from the MemQueue and send it in batches to Elasticsearch. Right now it just sends it with regular bulk request: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py#L149, but ideally we'd love to switch to a helper from the python client.
  • MemQueue itself is there to provide backpressure, limiting the number of items that can in the queue AND total size of items that are in the queue - this way we can to some extent control memory usage of the framework

@pquentin
Copy link
Member

pquentin commented May 17, 2024

Sorry for the delay Artem. I would be happy to implement the first version, allowing the sleep function to be user-defined. Silently cancelling all sleeps/bulks isn't something we'd want in the general case.

@pquentin pquentin added Category: Enhancement good first issue Issues that are good for new contributors Area: Helpers labels Jul 2, 2024
@LorenzoFasolino
Copy link

Hi, I and @girolamo-giordano are working on this, just to understand better:

you want a function that is passed in input during the client instantiation, that manage the pressing of CTRL+C and in general how to terminate it.

@pquentin
Copy link
Member

pquentin commented Dec 5, 2024

Hey @LorenzoFasolino and @girolamo-giordano, please tell me if this snippet clarifies things:

import asyncio

async def fake_bulk(sleep=asyncio.sleep):
    print("start bulk, sleep 5")
    await sleep(5)

async def silently_cancelled_sleep(seconds):
    try:
        await asyncio.sleep(seconds)
    except asyncio.CancelledError:
        print("silently ignoring cancellation")

async def main():
    print("Calling with silently_cancelled_sleep, Ctrl-C will not raise")
    await fake_bulk(silently_cancelled_sleep)
    print("Calling with defaults, Ctrl-C will raise")
    await fake_bulk()
    print("Done")

asyncio.run(main())

The idea is to add this new sleep parameter to to async_bulk to allow using a custom sleep function. Then users can define a function like silently_cancelled_sleep that does not raise when canceled. That said, if cancelling happens outside of the sleep function, the function will still raise, so I'm not sure how useful it is.

@artem-shelkovnikov Would this actually solve your problem? Or do you need something else?

@artem-shelkovnikov
Copy link
Member Author

It should solve our problem, yes :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: Helpers Category: Enhancement good first issue Issues that are good for new contributors
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants