futures-rs: Support infinite streams

This is an issue opened to consider adding support for infinite streams as a first-class abstraction.

Infinite streams are streams which do not terminate, so they yield values of T directly instead of Option<T>. This is useful for things which conceptually never ends, like Intervals, channels, or lazy settings updates. The fact that we currently have a panicking select_next_some speaks to me that there might be space for this.

Infinite streams would look something like this:

trait InfiniteStream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Item>;
}

Or in a perfect world, consider making the existing Stream not resolve with an Option<T> and make terminating streams a specialization on this. Similarly to how Result<T, E> was removed from Future. This in my mind would eventually suit streams as a language item better, like:

async fn produces_stream() -> u32 {
    yield 1u32;
    return 42;
}

Which could produce an implementation of Stream<Item = YieldOrReturn<u32, u32>>. Which for Stream<Item=YieldOrReturn<u32, ()>> would have the same semantics as Stream<Item = Option<u32>>.

An infinite stream with could look something like:

async fn produces_stream() {
    loop {
        yield 1u32;
    }
}

And would produce an implementation of Stream<Item = u32>.

One sticking point I’m not sure about is if combinators could be implemented once instead of once per stream variant (terminating vs infinite).

CC: #1765

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Reactions: 4
  • Comments: 17 (16 by maintainers)

Most upvoted comments

Stream should have two associated items: Yield and Return. That way you can have:

  • The current stream behaviour

      Stream<Yield = T, Return = ()>
    
  • Infinite streams

      Stream<Yield = T, Return = !>
    
  • infinite streams that have fatal errors

      Stream<Yield = T, Return = E>
    
  • finite streams that have both fatal and non-fatal errors and allow you to distinguish them

      Stream<Yield = Result<T, NonFatalE>, Return = Result<(), FatalE>>
    
  • etc.

I can’t actually recall from the top of my head a stream implementation which is finite that I’ve used during the last couple of months. Note that there’s plenty that wraps a Result for fallible streams though which terminate for other reasons.

I’ve had the opposite experience, the majority of my Stream usage has been wrappers around multi-message IO channels, which all gracefully terminate when disconnected. The next major part has been mpsc channels which are in some way connected to an IO channel, so will terminate when the IO terminates. The only infinite streams I can think of using are simple timer ticks, and the Incoming stream from a TcpListener.

They’re mostly per-client Stream<Item = Result<T, E>>, they could have an IO error (e.g. something in the network connection fails) or a parse error (e.g. the other side sends an invalid message), which would normally result in me just abandoning that task and dropping all other connections it’s dealing with (all associated to that one client). If the other side closes the connection the the stream will cleanly finish and I instead close all other connections and wait for them to complete before the task finishes.

I’m happy to leave this open for discussion, but my personal temperature on this is that we don’t need native support for streams with a separate final output type from their normal yielded type. The benefits gained from modeling these kinds of streams differently aren’t worth the costs, IMO. @sfackler previously made an effort to develop these stream types in a separate crate, and I believe came to the same conclusion.