futures-rs: Pre-RFC: Built-in timeout/deadline functionality for Futures
So I just finished converting rusoto to futures/tokio, and we’ll be cutting a new release soon! 🎉
With that out of the door, I wanted to see if I can feed some ideas back that I had while working on the conversion…
One of the major challenges in that process was implementing timeouts. The design goal was to keep the core of Rusoto independent of any event loop functionality (no direct tokio dependency), and only based it on futures. This made implementing timeouts tricky, since current approaches are either coupled to tokio (Timeout in tokio-core) or need to spawn an additional thread (tokio-timer). This required us to push the complexity of timeout handling into the pluggable parts that do depend on tokio/hyper, but IMO it would have been preferable to separate these concerns.
It would be neat if there was some built-in timeout functionality in futures itself, perhaps as part of thread park/wait mechanism. This would enable things like a pure-futures Timeout combinator, or perhaps even implementing some idle timeout tracking for AsyncRead. If there is general interest to support something like it, I’d be willing to write up an RFC to facilitate more discussion around how to approach it.
Familiarising myself with the futures 0.2 architecture, I came up with the following straw-man proposal (I’m confident there are much better ways to achieve this). I thought I’d just float this and see if something sticks 😃
Let me know what you think! (and please also don’t hesitate to tell me if I’m way off track here)
Staw-man Proposal
Changes to futures-core
The idea here is to keep track of a deadline/timeout on the task context.
struct Waker {
...,
timeout_ms: AtomicUsize // initialised to usize::MAX
}
impl Waker {
...
fn set_timeout_ms(&self, ms: usize) {
// atomically set `self.timeout_ms` to the min of `ms` and `self.timeout_ms`
}
fn reset_timeout_ms(&self) -> usize {
// atomically reset `self.timeout_ms` to usize::MAX and return the last timeout set
}
}
impl<'a> Context<'a> {
pub fn set_timeout_ms(&self, ms: usize) {
self.waker.set_timeout(ms);
}
}
Changes to futures-executor
Executors will now need to be aware of the timeout setting on the Waker, and incorporate that into their parking logic.
Example for local-pool.rs
fn run_executor<T, F: FnMut(&Waker) -> Async<T>>(mut f: F) -> T {
...
ThreadNotify::with_current(|thread| {
let waker = &Waker::from(thread.clone());
loop {
if let Async::Ready(t) = f(waker) {
return t;
}
let timeout = waker.reset_timeout_ms();
thread.park_timeout(Duration::from_millis(timeout as u64));
}
})
}
Alternatives
An alternative could be to extend the Async<T> enum to be Async::Ready(T) or Async::NotReady(Option<Duration>). That would avoid having to move the timeout as atomics into Waker, but on the downside it complicates the futures contract for the 90% of futures that don’t need timeouts.
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 20 (10 by maintainers)
I wrote this up at one point: https://github.com/tokio-rs/tokio-rfcs/pull/2#issuecomment-331931531