futures-rs: Support infinite streams
This is an issue opened to consider adding support for infinite streams as a first-class abstraction.
Infinite streams are streams which do not terminate, so they yield values of T directly instead of Option<T>. This is useful for things which conceptually never ends, like Intervals, channels, or lazy settings updates. The fact that we currently have a panicking select_next_some speaks to me that there might be space for this.
Infinite streams would look something like this:
trait InfiniteStream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Item>;
}
Or in a perfect world, consider making the existing Stream not resolve with an Option<T> and make terminating streams a specialization on this. Similarly to how Result<T, E> was removed from Future. This in my mind would eventually suit streams as a language item better, like:
async fn produces_stream() -> u32 {
yield 1u32;
return 42;
}
Which could produce an implementation of Stream<Item = YieldOrReturn<u32, u32>>. Which for Stream<Item=YieldOrReturn<u32, ()>> would have the same semantics as Stream<Item = Option<u32>>.
An infinite stream with could look something like:
async fn produces_stream() {
loop {
yield 1u32;
}
}
And would produce an implementation of Stream<Item = u32>.
One sticking point I’m not sure about is if combinators could be implemented once instead of once per stream variant (terminating vs infinite).
CC: #1765
About this issue
- Original URL
- State: open
- Created 5 years ago
- Reactions: 4
- Comments: 17 (16 by maintainers)
Streamshould have two associated items:YieldandReturn. That way you can have:The current stream behaviour
Infinite streams
infinite streams that have fatal errors
finite streams that have both fatal and non-fatal errors and allow you to distinguish them
etc.
I’ve had the opposite experience, the majority of my
Streamusage has been wrappers around multi-message IO channels, which all gracefully terminate when disconnected. The next major part has beenmpscchannels which are in some way connected to an IO channel, so will terminate when the IO terminates. The only infinite streams I can think of using are simple timer ticks, and theIncomingstream from aTcpListener.They’re mostly per-client
Stream<Item = Result<T, E>>, they could have an IO error (e.g. something in the network connection fails) or a parse error (e.g. the other side sends an invalid message), which would normally result in me just abandoning that task and dropping all other connections it’s dealing with (all associated to that one client). If the other side closes the connection the the stream will cleanly finish and I instead close all other connections and wait for them to complete before the task finishes.I’m happy to leave this open for discussion, but my personal temperature on this is that we don’t need native support for streams with a separate final output type from their normal yielded type. The benefits gained from modeling these kinds of streams differently aren’t worth the costs, IMO. @sfackler previously made an effort to develop these stream types in a separate crate, and I believe came to the same conclusion.