futures-rs: Cloned mpsc::Sender never blocks

I believe I have found a pretty serious bug with sync::mpsc::Sender. Consider the program below, which has one thread sending on a channel (and waiting for each send to complete before sending the next), and one thread reading from that same channel. The sender is faster than the reader (due to the sleep in the receiver thread). This program runs correctly; it produces 1001 lines of recv i, then a recv done, with done sending appearing somewhere near recv 11 (since the channel has a buffer size of 11).

extern crate futures;

use futures::{Future, Sink, Stream};

fn main() {
    use std::time;
    use std::thread;
    let (mut tx, rx) = futures::sync::mpsc::channel(10);

    let jh = thread::spawn(move || {
        rx.for_each(|x| {
                thread::sleep(time::Duration::from_millis(1));
                println!("recv {}", x);
                if x == 0 {
                    futures::future::err(())
                } else {
                    futures::future::ok(())
                }
            })
            .wait()
            .is_err()
    });

    let n = 1_000;
    for i in 0..n {
        tx = tx.send(n - i).wait().unwrap();
    }
    tx.send(0).wait().unwrap();
    println!("done sending");

    jh.join().unwrap();
    println!("recv done");
}

Now try changing the line

tx = tx.send(n - i).wait().unwrap();

to

tx.clone().send(n - i).wait().unwrap();

Semantically, these should be the same. The only change should be that the latter clones the transmit handle to the sender before sending on the corresponding channel (blocking if necessary). However, what happens instead is that the .send().wait() in the second version never blocks. The output looks something like

recv 1000
recv 999
done sending
recv 998
...
recv 0
recv done

That is, the channel suddenly behaves as if it is unbounded!

Interestingly, if the line is instead replaced with

tx = tx.clone().send(n - i).wait().unwrap();

The code reverts to the expected blocking behavior.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 18 (18 by maintainers)

Commits related to this issue

Most upvoted comments

I’ve written similar lockless queue code in the past, and I guess I’m having a hard time recognizing why unpark_one cannot also be used without these extra slots, but I’ll take your word for it.

Taking a step back, this strikes me as fairly unintuitive behavior for a “FIFO queue with back pressure”, as it under some conditions simply does not provide back pressure at all. The docs also only reference this behavior in a single place, and arguably in the place you are least likely to look; I suspect few people look up the docs for mpsc::channel, since they assume it works the same as sync::mpsc::sync_channel (at least that was the case for me). Maybe a description both under its own heading at futures::sync::mpsc, and on Sender would be appropriate? The first line of mpsc::channel should also change to reflect the fact that the channel is not actually bounded, so that that information appears on the front page of mpsc where users are most likely to end their search (“to answer the question: how do I make a bounded futures channel?”).

I think personally I would prefer channel to be renamed too, to more clearly indicate that the channel is only partially bounded. Something along the lines of mpsc::per_sink_bounded? Though I realize that’s a breaking, and more controversial change. I guess I feel so strongly about this because this behavior came as a complete surprise to me, and was extremely hard to debug. I concede that this may be giving me a somewhat biased view 😃

So, the intended behavior of this code:

extern crate futures;

use futures::{Future, Sink};

fn main() {
    let (tx, rx) = futures::sync::mpsc::channel(10);

    loop {
        if let Err(e) = tx.clone().send(0).wait() {
            println!(":( {:?}", e);
            break;
        }
    }
    tx.send(0).wait().unwrap();
    println!("done sending");
    drop(rx);
}

is for it to run out of memory? Despite the channel being bounded and of size 10?