Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pulsar updates... #22

Merged
merged 16 commits into from
Dec 15, 2024
Merged

pulsar updates... #22

merged 16 commits into from
Dec 15, 2024

Conversation

daveads
Copy link

@daveads daveads commented Sep 25, 2024

Refactor PulsarBackend for multi-channel support and improved error handling

  • Implement separate producers and consumers for each channel
  • Add channel-specific subscribe and unsubscribe functionality
  • Improve logging with a dedicated logger
  • Enhance error handling and timeout management in next_published method
  • Optimize connection and disconnection processes

…andling

- Implement separate producers and consumers for each channel
- Add channel-specific subscribe and unsubscribe functionality
- Improve logging with a dedicated logger
- Enhance error handling and timeout management in next_published method
- Optimize connection and disconnection processes
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
@gemanor
Copy link

gemanor commented Sep 30, 2024

@daveads please address @danyi1212's comments

@daveads
Copy link
Author

daveads commented Sep 30, 2024

@daveads please address @danyi1212's comments

Sure

@gemanor
Copy link

gemanor commented Oct 8, 2024

Hey @daveads any news on this?

@daveads
Copy link
Author

daveads commented Oct 8, 2024

Hey @daveads any news on this?

@gemanor, I’ve been busy with OpenFGA integration and other tasks. I’ll send a commit tomorrow with the required changes.

@daveads
Copy link
Author

daveads commented Oct 11, 2024

@gemanor @danyi1212 Done

@gemanor gemanor requested a review from danyi1212 October 11, 2024 06:14
@gemanor
Copy link

gemanor commented Oct 11, 2024

@danyi1212 it's LGTM, please run a last sanity review and merge.

broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
tests/test_broadcast.py Outdated Show resolved Hide resolved
@daveads
Copy link
Author

daveads commented Nov 3, 2024

will work on the review this week...

@daveads
Copy link
Author

daveads commented Nov 27, 2024

we were suppose to drop test for 3.8 and add 3.12..

will get back to this... need to attend to something via opal...

@danyi1212

Done

@daveads
Copy link
Author

daveads commented Nov 27, 2024

Replaced anyio.to_thread.run_sync() with asyncio.to_thread()

Copy link

@danyi1212 danyi1212 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very good, left few small comments

broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Outdated Show resolved Hide resolved
broadcaster/_backends/pulsar.py Show resolved Hide resolved
tests/test_broadcast.py Outdated Show resolved Hide resolved
@danyi1212
Copy link

Notice the difference between

await asyncio.gather(*coros, return_exceptions=True)  

Compared to

try:
    await asyncio.gather(*coros)
except Exception as e:
    logger.exception("Failed ...")

In the first option it will supress any exception raised inside the coros and include them in the list returned by the gather, retaining the order of the received coros.

In the second example, when a single coro inside the gather fails, it will immediately cancel all running coros, and drop the returned values of completed coros.

If you want all coros to complete regardless of when some of them fail, you should use return_exceptions=True. Through you should remember to actually use the returned values, and if exception where raised, handle them (in our case, at least log them)

@daveads daveads requested a review from danyi1212 December 12, 2024 10:43
@danyi1212 danyi1212 merged commit 07b5c53 into permitio:master Dec 15, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants