Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non selected channel in select! lose sent value #50

Open
negrel opened this issue Sep 21, 2024 · 2 comments
Open

Non selected channel in select! lose sent value #50

negrel opened this issue Sep 21, 2024 · 2 comments

Comments

@negrel
Copy link

negrel commented Sep 21, 2024

Hi!
First, thank you very much for this crate (IMHO it has the best API).

kanal bounded_async channels seems to work differently than tokio and go channels on select.
Suprisingly, when a branch of a select is executed, other branches channel value are lost.

Here is a code snippet that shows different behavior between tokio (1.39.3) and kanal (0.1.0-pre08).

pub fn main() -> anyhow::Result<()> {
    println!("tokio mpsc");
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
            let (tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);

            tokio::spawn(async move {
                tx1.send(1).await.unwrap();
            });
            tokio::spawn(async move {
                tx2.send(2).await.unwrap();
            });

            let mut received = 0;
            while received < 2 {
                select! {
                    res = rx1.recv() => if let Some(v) = res {
                        println!("rx1 {v:?}");
                        received+=1;
                    },
                    res = rx2.recv() => if let Some(v) = res {
                        println!("rx2 {v:?}");
                        received+=1;
                    },
                }
            }
        });

    println!("kanal bounded async");
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            let (tx1, rx1) = kanal::bounded_async::<usize>(1);
            let (tx2, rx2) = kanal::bounded_async::<usize>(1);

            let tx1clone = tx1.clone();
            tokio::spawn(async move {
                tx1clone.send(1).await.unwrap();
            });
            let tx2clone = tx2.clone();
            tokio::spawn(async move {
                tx2clone.send(2).await.unwrap();
            });

            let mut received = 0;
            while received < 2 {
                select! {
                    res = rx1.recv() => { println!("rx1 {:?}", res.unwrap()); received+=1; },
                    res = rx2.recv() => { println!("rx2 {:?}", res.unwrap()); received+=1; },
                }
            }
        });

    Ok(())
}

Output:

tokio mpsc
rx2 2
rx1 1
kanal bounded async
rx1 1
... (blocks forever)

Is this the expected behavior? If not, I can start working on a fix with your help.

@negrel
Copy link
Author

negrel commented Oct 13, 2024

I just learned about cancel safety, maybe you should just document that it is not cancel safe.

@AaronKutch
Copy link

It seems to only have issues with multiple recv, it seems to be fine with just a single one being cancelled which is why I haven't encountered it in my usage yet. This is still quite concerning though. I'm guessing that this is due to the typical lost wakeup bug, where you have to call some enable function and reset the future e.x. https://docs.rs/tokio/1.40.0/tokio/sync/futures/struct.Notified.html .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants