futures-rs: Shared can interact badly with futures that don't always poll their subfutures

(I’m opening this issue to continue the discussion from https://github.com/alexcrichton/futures-rs/pull/305.)

The problem is that Shared, as it is currently designed, can interact poorly with certain futures, such as the ModedFuture sketched below. We should either formalize some reason why ModedFuture is an invalid implementation of Future, or we should redesign Shared to accommodate this kind of situation.

extern crate futures;
extern crate tokio_core;

use futures::{Future, Poll};
use futures::sync::oneshot;
use std::rc::Rc;
use std::cell::RefCell;

enum Mode { Left, Right }

struct ModedFutureInner<F> where F: Future {
    mode: Mode,
    left: F,
    right: F,
    task: Option<::futures::task::Task>,
}

struct ModedFuture<F> where F: Future {
    inner: Rc<RefCell<ModedFutureInner<F>>>,
}

struct ModedFutureHandle<F> where F: Future {
    inner: Rc<RefCell<ModedFutureInner<F>>>,
}

impl <F> ModedFuture<F> where F: Future {
    pub fn new(left: F, right: F, mode: Mode) -> (ModedFutureHandle<F>, ModedFuture<F>) {
        let inner = Rc::new(RefCell::new(ModedFutureInner {
            left: left, right: right, mode: mode, task: None,
         }));
        (ModedFutureHandle { inner: inner.clone() }, ModedFuture { inner: inner })
    }
}

impl <F> ModedFutureHandle<F> where F: Future {
    pub fn switch_mode(&mut self, mode: Mode) {
        self.inner.borrow_mut().mode = mode;
        if let Some(t) = self.inner.borrow_mut().task.take() {
            // The other future may have become ready while we were ignoring it.
            t.unpark();
        }
    }
}

impl <F> Future for ModedFuture<F> where F: Future {
    type Item = F::Item;
    type Error = F::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let ModedFutureInner { ref mut mode, ref mut left, ref mut right, ref mut task } =
            *self.inner.borrow_mut();
        *task = Some(::futures::task::park());
        match *mode {
            Mode::Left => left.poll(),
            Mode::Right => right.poll(),
        }
    }
}

pub fn main() {
    let mut core = ::tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = oneshot::channel::<u32>();
    let f1 = rx.shared();
    let f2 = f1.clone();

    let (mut mfh, mf) = ModedFuture::new(
        Box::new(f1.map_err(|_| ()).map(|v| *v)) as Box<Future<Item=u32, Error=()>>,
        Box::new(::futures::future::empty()) as Box<Future<Item=u32, Error=()>>,
        Mode::Left);

    let (tx3, rx3) = oneshot::channel::<u32>();
    handle.spawn(f2.map(|x| tx3.complete(*x)).map_err(|_| ()));

    core.turn(Some(::std::time::Duration::from_millis(1)));

    handle.spawn(mf.map(|_| ()));

    core.turn(Some(::std::time::Duration::from_millis(1)));

    mfh.switch_mode(Mode::Right);

    tx.complete(11); // It seems like this ought to cause f2 and then rx3 to get resolved.

    // This hangs forever.
    core.run(rx3).unwrap();
}

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 1
  • Comments: 27 (27 by maintainers)

Commits related to this issue

Most upvoted comments

My current inclination is to say that ModedFuture is an invalid Future implementation.

I propose that the following invariant must be upheld by any valid implementation of Future: When Future::poll() runs, it must propagate poll() calls to any objects that might have triggered the unparking of the current task.

I have changed heart on this topic since last week, largely because some of my code in capnp-rpc-rust now depends on the above invariant.

My current thinking is that it would not work out well to require that all impls of Future satisfy some notion of “faithfulness”, and therefore the current implementation of Shared is buggy because it assumes that all futures will faithfully propagate poll() calls.

It helps to consider the case of streams. If we require futures to be faithful, then we probably also need to require streams to be faithful, because it’s easy to convert back and forth between streams and futures. But what would that mean for stream::Select? Currently stream::Select is not faithful because if the first substream is ready then it does not bother to call poll() on the second substream. You could imagine trying to fix that by always polling both substreams and by adding a field to hold onto an item in the case that both are ready. However, if the first substream is ready and the second is not, then poll() would return Ok(Ready(..)) and would also register the current task to be unparked when the second substream becomes ready. Now if this Select is itself the substream of another stream s0, say an alternating stream that takes turns pulling an element from some number of substreams, then s0.poll() needs to always call poll() on the Select in order to faithfully propagate events that could have been triggered by it. If doing so returns a Ok(Ready(..)), then s0 needs to hold onto that value somewhere. Indeed, it seems that s0 would need to be able to hold onto an unbounded number of values produced by its substreams!

The only plausible way I can see around such problems would be to make heavy use of with_unpark_event().

The alternative is simply to not require faithfulness, and instead fix the bug in Shared. Perhaps the fix will involve with_unpark_event(), but that will be a much more confined use of with_unpark_event().

@dwrensha yeah the futures library is not currently optimized for heavy usage of with_unpark_event. The initial intention was that it’d be used once or twice for a task so we’d get by just fine with a small vector to avoid allocations and such.

Also as a side note I’d love to add single-threaded versions to the futures crate of various sync primitives. E.g. a futures-aware channel/queue, RefCell/Mutex equivalent, etc. I think it’d make a great module in futures itself and we could ensure that the API of sync mirrors it at least (or is a subset I guess)