futures-rs: Shared can interact badly with futures that don't always poll their subfutures
(I’m opening this issue to continue the discussion from https://github.com/alexcrichton/futures-rs/pull/305.)
The problem is that Shared, as it is currently designed, can interact poorly with certain futures, such as the ModedFuture sketched below. We should either formalize some reason why ModedFuture is an invalid implementation of Future, or we should redesign Shared to accommodate this kind of situation.
extern crate futures;
extern crate tokio_core;
use futures::{Future, Poll};
use futures::sync::oneshot;
use std::rc::Rc;
use std::cell::RefCell;
enum Mode { Left, Right }
struct ModedFutureInner<F> where F: Future {
mode: Mode,
left: F,
right: F,
task: Option<::futures::task::Task>,
}
struct ModedFuture<F> where F: Future {
inner: Rc<RefCell<ModedFutureInner<F>>>,
}
struct ModedFutureHandle<F> where F: Future {
inner: Rc<RefCell<ModedFutureInner<F>>>,
}
impl <F> ModedFuture<F> where F: Future {
pub fn new(left: F, right: F, mode: Mode) -> (ModedFutureHandle<F>, ModedFuture<F>) {
let inner = Rc::new(RefCell::new(ModedFutureInner {
left: left, right: right, mode: mode, task: None,
}));
(ModedFutureHandle { inner: inner.clone() }, ModedFuture { inner: inner })
}
}
impl <F> ModedFutureHandle<F> where F: Future {
pub fn switch_mode(&mut self, mode: Mode) {
self.inner.borrow_mut().mode = mode;
if let Some(t) = self.inner.borrow_mut().task.take() {
// The other future may have become ready while we were ignoring it.
t.unpark();
}
}
}
impl <F> Future for ModedFuture<F> where F: Future {
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let ModedFutureInner { ref mut mode, ref mut left, ref mut right, ref mut task } =
*self.inner.borrow_mut();
*task = Some(::futures::task::park());
match *mode {
Mode::Left => left.poll(),
Mode::Right => right.poll(),
}
}
}
pub fn main() {
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let (tx, rx) = oneshot::channel::<u32>();
let f1 = rx.shared();
let f2 = f1.clone();
let (mut mfh, mf) = ModedFuture::new(
Box::new(f1.map_err(|_| ()).map(|v| *v)) as Box<Future<Item=u32, Error=()>>,
Box::new(::futures::future::empty()) as Box<Future<Item=u32, Error=()>>,
Mode::Left);
let (tx3, rx3) = oneshot::channel::<u32>();
handle.spawn(f2.map(|x| tx3.complete(*x)).map_err(|_| ()));
core.turn(Some(::std::time::Duration::from_millis(1)));
handle.spawn(mf.map(|_| ()));
core.turn(Some(::std::time::Duration::from_millis(1)));
mfh.switch_mode(Mode::Right);
tx.complete(11); // It seems like this ought to cause f2 and then rx3 to get resolved.
// This hangs forever.
core.run(rx3).unwrap();
}
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Reactions: 1
- Comments: 27 (27 by maintainers)
Commits related to this issue
- Implement an alternative to future::Shared due to https://github.com/alexcrichton/futures-rs/issues/330 — committed to twitter/pants by stuhood 7 years ago
- A much smaller workaround for https://github.com/alexcrichton/futures-rs/issues/330 — committed to twitter/pants by stuhood 7 years ago
My current inclination is to say that
ModedFutureis an invalidFutureimplementation.I propose that the following invariant must be upheld by any valid implementation of
Future: WhenFuture::poll()runs, it must propagatepoll()calls to any objects that might have triggered the unparking of the current task.I have changed heart on this topic since last week, largely because some of my code in capnp-rpc-rust now depends on the above invariant.
My current thinking is that it would not work out well to require that all impls of
Futuresatisfy some notion of “faithfulness”, and therefore the current implementation ofSharedis buggy because it assumes that all futures will faithfully propagatepoll()calls.It helps to consider the case of streams. If we require futures to be faithful, then we probably also need to require streams to be faithful, because it’s easy to convert back and forth between streams and futures. But what would that mean for
stream::Select? Currentlystream::Selectis not faithful because if the first substream is ready then it does not bother to callpoll()on the second substream. You could imagine trying to fix that by always polling both substreams and by adding a field to hold onto an item in the case that both are ready. However, if the first substream is ready and the second is not, thenpoll()would returnOk(Ready(..))and would also register the current task to be unparked when the second substream becomes ready. Now if thisSelectis itself the substream of another streams0, say an alternating stream that takes turns pulling an element from some number of substreams, thens0.poll()needs to always callpoll()on theSelectin order to faithfully propagate events that could have been triggered by it. If doing so returns aOk(Ready(..)), thens0needs to hold onto that value somewhere. Indeed, it seems thats0would need to be able to hold onto an unbounded number of values produced by its substreams!The only plausible way I can see around such problems would be to make heavy use of
with_unpark_event().The alternative is simply to not require faithfulness, and instead fix the bug in
Shared. Perhaps the fix will involvewith_unpark_event(), but that will be a much more confined use ofwith_unpark_event().@dwrensha yeah the futures library is not currently optimized for heavy usage of
with_unpark_event. The initial intention was that it’d be used once or twice for a task so we’d get by just fine with a small vector to avoid allocations and such.Also as a side note I’d love to add single-threaded versions to the
futurescrate of various sync primitives. E.g. a futures-aware channel/queue, RefCell/Mutex equivalent, etc. I think it’d make a great module infuturesitself and we could ensure that the API ofsyncmirrors it at least (or is a subset I guess)