ReactiveCocoa: Internal signal not being disposed by flatMap operator on outter SignalProducer

I’m not sure if this is a bug or expected behavior but given my interpretation of the docs, in particular to the comments of the observe method

Observes the Signal by sending any future events to the given observer. If the Signal has already terminated, the observer will immediately receive an Interrupted event.

Returns a Disposable which can be used to disconnect the observer. Disposing of the Disposable will have no effect on the Signal itself.

which says that the disposable returned by this method is used to remove the observer and given https://github.com/ReactiveCocoa/ReactiveCocoa/issues/2453 and https://github.com/ReactiveCocoa/ReactiveCocoa/pull/2582. I understand that if I have a producer of signal which is flattened and then the producer is disposed, it should also dispose the internal signal inside the closure provided to flatMap.

The following is an example that reproduces the issue that I describe in the title

let signal = Signal<Int, NoError> { observer in

    func next(count: Int = 0) {
        let delayTime = dispatch_time(DISPATCH_TIME_NOW, Int64(1 * Double(NSEC_PER_SEC)))
        dispatch_after(delayTime, dispatch_get_main_queue()) {
                observer.sendNext(count)
                next(count + 1)
        }
    }

    next()

    return nil
}

let action = Action { (_: AnyObject) -> SignalProducer<(), NSError> in
    return SignalProducer { observer, _ in
        let delayTime = dispatch_time(DISPATCH_TIME_NOW, Int64(2 * Double(NSEC_PER_SEC)))
        dispatch_after(delayTime, dispatch_get_main_queue()) {
            observer.sendNext(())
            observer.sendCompleted()
        }
    }
    .on(
        terminated: { print("action producer terminated") },
        disposed: { print("action producer disposed") },
        interrupted: { print("action producer interrupted") }
    )
}
let result = action.executing.producer
    .filter { $0 }
    .flatMap(.Latest) { executing -> Signal<[Int], NoError> in
        print("action executed")
        return signal
            .scan(Array<Int>()) { acum, current in
                print("Accumulating")
                var newAccum = acum
                newAccum.append(current)
                return newAccum
            }
            .on(
                terminated: { print("scan signal terminated") },
                disposed: { print("scan signal disposed") },
                interrupted: { print("scan signal interrupted") }
            )
    }
    .on(
        terminated: { print("result producer terminated") },
        disposed: { print("result producer disposed") },
        interrupted: { print("result producer interrupted") }
    )

let disposable = result.startWithNext { print($0) }

action.apply("").start()

let delayTime = dispatch_time(DISPATCH_TIME_NOW, Int64(12 * Double(NSEC_PER_SEC)))
dispatch_after(delayTime, dispatch_get_main_queue()) {
    print("disposing")
    disposable.dispose()
}

and the output generated is

action executed
Accumulating
[0]
action producer terminated
action producer disposed
Accumulating
[0, 1]
Accumulating
[0, 1, 2]
Accumulating
[0, 1, 2, 3]
Accumulating
[0, 1, 2, 3, 4]
Accumulating
[0, 1, 2, 3, 4, 5]
Accumulating
[0, 1, 2, 3, 4, 5, 6]
Accumulating
[0, 1, 2, 3, 4, 5, 6, 7]
Accumulating
[0, 1, 2, 3, 4, 5, 6, 7, 8]
Accumulating
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Accumulating
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
disposing
result producer interrupted
result producer terminated
result producer disposed
Accumulating
Accumulating
Accumulating
Accumulating
Accumulating

I would expect the internal signal to be terminated when the disposable is executed. What is happening is that the signal returned by applying the scan method is not being disposed and its observer is kept alive and that is why the Accumulating message keeps appearing after the result producer has been terminated.

I know that the signal is an infinite signal (a completed event is never sent inside the infinite loop that send a value every one second) that I think that the observer should be remove when the outer producer has been disposed.

Thanks!

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 26 (23 by maintainers)

Most upvoted comments

Yeah, I’d be 👍 on something like that for RAC5. (A PR could target that branch today.)

I’d probably take the approach of retaining the Signal in the Disposable, which is a little different than the approach in #2591.