cyclejs: Solve memory leak with circular dependencies of streams

This is an issue I keep getting stuck at in different context. When the parent needs to listen of events of the child, how do you break or avoid a cycle?

e.g.


function List(sources) {
  const actions = intent({ ...sources, click$: ??? })
  const state = model(actions).shareReplay(1)

  const items$ = state
    .flatMapLatest(({ items, selected }) => 
      items.map(item => Item({ 
        ...sources, 
        props$: Observable.of({ id: item.id, selected: item.id === 'selected' })
      }))

  const click$ = items$.map(items => Observable.merge(...items.map(item => item.click$)))

  return {
    ...
  }
}

Basically the following flow:

click => List => selected => Item => click

Which gives me a cycle. How do I get around this? I could use a proxy as I’ve seen in some examples:

  const clickProxy = new Subject()
  const actions = intent({ ...sources, click$: clickProxy })

  const subscription = click$.subscribe(clickProxy)

  // subscription leaks...

However, that gives me a memory leak…

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 34 (17 by maintainers)

Most upvoted comments

I believe leak with proxy subject can be avoided using finally that catches any termination of the stream:

var proxySubject = new Rx.Subject()

let source2 = proxySubject  
  .finally(() =>  proxySubscription.dispose())
  .map(x => x*2)

var source = Rx.Observable.timer(0, 1000)
.do(x => console.log('timer', x))

let proxySubscription = source.subscribe(proxySubject)

let sub = source.subscribe(x => {
    console.log('source', x)
})
let sub2 = source2.subscribe(x => {
    console.log(' source2', x)
})
setTimeout(() => {
  console.log('DISPOSE')
  sub.dispose()
  sub2.dispose()
}, 2500)

https://jsfiddle.net/jm4ke9h0/

babel plugin (https://github.com/cyclejs/core/issues/170) definitly will be needed to get rid of this dirt)

using construction is quite clumsy, especially if multiple proxy will be needed in function.

@staltz: This doesn’t feel quite solved.

Take the following more advanced example:

function Grid(sources) {
  const click$ = new Subject()

  const actions = intent({ ...sources, click$ })
  const state$ = model(actions).shareReplay(1)

  const items$ = state$
    .pluck('items')
    .flatMapLatest(items => items.map(id => Asset({
      ...sources,
      props$: state$
        .pluck('selected')
        .map(selected => selected.includes(id))
        .distinctUntilChanged()
        .map(selected => ({ id, selected })),
    })))
    .shareReplay(1)

  // Leaks
  items$
    .map(items => items.map(item => item.click$))
    .flatMapLatest(items => Observable.merge(...items))
    .subscribe(click$)

  return {
    DOM: view({
      state$,
      items$: items$
        .map(items => items.map(item => item.DOM))
        .flatMapLatest(items => items.length
          ? Observable.combineLatest(...items, (...items) => items)
          : Observable.of([])
        ),
    }),
    HTTP: 
      items$
        .map(items => items.map(item => item.HTTP))
        .flatMapLatest(items => Observable.merge(...items))
  })
}

What stream do you hook up with Observable.using. With my suggestion it’s kind of undefined and I am confused on how to apply your suggestion on this, since your example only returns a single sink.

i.e. the fundamental conceptual questions is, who owns the subscription? Is it the DOM?

const vtree$ = Observable.using(
  () => items$
    .map(items => items.map(item => item.click$))
    .flatMapLatest(items => Observable.merge(...items))
    .subscribe(click$),
  () => view({
      state$,
      items$: items$
        .map(items => items.map(item => item.DOM))
        .flatMapLatest(items => items.length
          ? Observable.combineLatest(...items, (...items) => items)
          : Observable.of([])
        ),
    })
  )

return {
    DOM: vtree$,
    HTTP: items$
      .map(items => items.map(item => item.HTTP))
      .flatMapLatest(items => Observable.merge(...items))
}

In that case we can create a more generic helper:

function using(sinks, ...disposables) {
  return {
    ...sinks,
    DOM: Observable.using(() => new CompositeDisposable(disposables), () => sinks.DOM),
  }
}

Which would give us:

const clickSubscription = items$
    .map(items => items.map(item => item.click$))
    .flatMapLatest(items => Observable.merge(...items))
    .subscribe(click$)

return using({
    DOM: view({
      state$,
      items$: items$
        .map(items => items.map(item => item.DOM))
        .flatMapLatest(items => items.length
          ? Observable.combineLatest(...items, (...items) => items)
          : Observable.of([])
        ),
    }),
    HTTP: 
      items$
        .map(items => items.map(item => item.HTTP))
        .flatMapLatest(items => Observable.merge(...items))
  }, clickSubscription)

I wonder whether it is enough to just dispose click$ or whether one has to dispose the actual subscription.

I’m labeling this issue as a discussion, because xstream has built-in solution for this. But I’m not “closing” the comments, I still think this is a good discussion and resource to read/re-read.

Just curious, would it work to create a circularSubscribe method (with a better name though), so you can replace let proxySubscription = source.subscribe(proxySubject) by source.circularSubscribe(proxySubject), and with :

Rx.Observable.prototype.circularSubscribe = function (proxySubject) {
  var source = this;
  var disposable =  source.subscribe(proxySubject);
  var onCompletedHandler = function () {
    disposable.dispose();
  }
  proxySubject.subscribe(noop, noop, onCompletedHandler);
  return disposable;
}

This way, when proxySubject completes, the subscription is disposed.

Updated comment above with helper suggestion.

For reference, here is how I would have done a List component using previously suggested helpers: https://gist.github.com/ronag/9f9fdfb7c6169e6850c4