rxjs: Pipe operator cannot infer return type as ConnectableObservable

RxJS version: 5.5.0

Code to reproduce:

const obs = Observable.of(5);
const connectableObs = obs.pipe(
    publishReplay(1)
);

Expected behavior: The inferred type of connectableObs is ConnectableObservable<number>.

Actual behavior: The inferred type of connectableObs is Observable<number>.

Additional information:

TypeScript version: 2.5.3

My current workaround is to manually downcast the return value:

const obs = Observable.of(5);
const connectableObs = obs.pipe(
    publishReplay(1)
) as ConnectableObservable<number>;

Happy to send a PR, but not sure how should I fix this properly.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 65
  • Comments: 17 (3 by maintainers)

Commits related to this issue

Most upvoted comments

This issue still exists in rxjs 6.0 final. Reproduce:

const connectableObservable = from([1, 2, 3]).pipe(publish());
connectableObservable.connect()

tsc will fail, saying error TS2339: Property 'connect' does not exist on type 'Observable<any>'.. Casting the connectableObservable to any and calling .connect() works as expected.

Edit: Used latest typescript 2.8.3.

Actually, if the overload signatures for pipe are written like this:

pipe<A, OA extends Observable<A>>(op1: UnaryFunction<Observable<T>, OA>): OA;
pipe<A, B, OB extends Observable<B>>(op1: OperatorFunction<T, A>, op2: UnaryFunction<Observable<A>, OB>): OB;
// etc.

The problem can be solved without specific reference to ConnectableObservable.

This seems to be okay with the version of TypeScript (2.0) that RxJS uses, but I’ve not investigated this thoroughly.

Seems it’s still an issue. Is there any update related to this?

image

The problem can be solved by adding additional overload signatures to pipe, allowing for the last operator to return a ConnectableObservable.

For example:

pipe<A>(op1: UnaryFunction<Observable<T>, ConnectableObservable<A>>): ConnectableObservable<A>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;

Using ConnectableObservable in the signtures for a method in Observable seems a little weird, as ConnectableObservable extends Observable, but this is only way I can see this working.

An import of ConnectableObservable would be necessary in Observable.ts, but as it’s only used in the typings, the import is erased during transpilation.

The signatures for publish and publishLast would have to be changed, too, as it they do not specify ConnectableObservable in their return types.

Interestingly, adding such overload signatures to pipe seems to fix another problem with the inference of the publishReplay. Without them, the return type of the following:

const o = of(0).pipe(publishReplay(1));

is inferred as Observable<any> (note the any) and with them it’s inferred as ConnectableObservable<number>.

@ebrehault this looks like (type)safer workaround https://github.com/ReactiveX/rxjs/issues/3595

upd: actually that can be wrapped into observable factory function to provide better type inference and more traditional shape: Observable<T> => ConnectedObservable<T>

// NB! Remember that with such import you probably would like tree-shaking
import { publish } from 'rxjs/operators';

export function _publish<T>(source: Observable<T>): ConnectableObservable<T> {
  return publish<T>()(source);
}

upd:

import { UnaryFunction } from 'rxjs/interfaces';

export function _publish<T, R>(
  source: Observable<T>,
  publishFn: UnaryFunction<Observable<T>, ConnectableObservable<R>>
): ConnectableObservable<R> {
  return publishFn(source);
}

// use:
// NB! Remember that with such import you probably would like tree-shaking
import { publishReplay } from 'rxjs/operators';
declare const foo$: Observable<number>;
_publish(foo$, publishReplay(1)); // inferred to ConnectableObservable<number>

@benlesh @cartant what do you say, is it possible to fix with the whole pipe approach? And another question regarding piping and types: since there’s a limited set of overload signatures for pipe, it means that that type-safe long pipelines are impossible? Can this topic be addressed in v7?

I’m curious if this is considered a bug that’s going to get fixed? I am fine type casting but would prefer not to. I do want to point out though, that pipe() has always only returned Observable<T> in my experience – if I’m wrong there, cool, I just wonder if justifying having an operator transform the output type of the piped observable leads to justifying doing it for all types? What I mean by that is, I created a custom operator to convert an observable to a BehaviorSubject which gives caching. But this method requires the solution provided here – typecasting to BehaviorSubject<T> at the end of the pipe. This is “fine”, but ideally, if an operator is spitting out some subtype of Observable, that subtype, at least logically speaking, should be preserved.

I got myself super confused by this whole issue, so just wanted to add a couple findings that may help others - especially in understanding what is not the problem:

  • First - this problem only applies to publish and not publishReplay
  • publishReplay does NOT return a connected observable so no issue with the pipe there
  • Ultimately the problem is that publish() in a pipe completely loses both the connectable nature of the observable and its type. (Typescript limitation from an earlier version AFAIK).
  • When you use publish typescript strips away both the ConnectableObservable type AND the fundamental type of your source.

In other words the type of of(1,2,3).pipe(publish()) is Observable<any> instead of ConnectableObservable<number> (which is what it really is).

When publish is most useful you’ll often have several ‘child’ streams that take the output of your primary source stream and transform it. So just by using publish in the pipe you lose the ability inside your operators to know what your incoming observable is.

@fljot’s answer is most useful to solve this problem because all it does it to call the publish operator and in isolation typescript can properly figure out all the types. The key usefulness of this method is that you don’t need to explicitly provide (or even know) the type of your source observable, so everything will ripple through nicely by itself.

So you can publish a stream, use it to create other streams and then connect to it.

The only recommendation I have for usage is to call it on a separate line like this:


// I used an underscore to suggest this is a temporary variable
const _stockQuotes$ = of(10, 11, 12, 11, 10);   
const publishedStockQuotes$ = publish(_stockQuotes$);  // function from @fjlot

// derivative streams - (publishedStockQuotes$ is Observable<number>)
const throttledStockQuotes$ = publishedStockQuotes$.pipe(throttleTime(5000));
const splitStockQuotes$ = publishedStockQuotes$.pipe(map(quote => quote / 2));   // 

publishedStockQuotes$.connect();   

@ebrehault your answer is misleading. The connect() method isn’t generic, so the goal is not to have a ‘typed connectable’ observable when you call connect(), but a ‘connectable’ observable when you call connect AND a ‘typed’ observable when you create further streams.

 // connect(): Subscription;
(yourObs as ConnectableObservable<any>).connect();

@fljot In v7 it won’t be an issue, as the operators that will be pipeable won’t return a ConnectableObservable. They will be like the variants that accept a selector. ConnectableObservable instances will only be returned by static creator/factory functions. See https://github.com/ReactiveX/rxjs/issues/3833 for more information. Or look at the source in the experimental branch.

Workaround:

(yourObs as ConnectableObservable<YourType>).connect()

EDIT: that’s wrong, it will just type the connectable (see @simeyla comment below)