reactor-netty: Reactor hangs once in a while under stress

Greetings! I’ve implemented a simple spring-based webflux application with a controller which returns an OK-response with the same 10-byte plain-text body whatever GET request it receives and put it under stress using a multithreaded client (JMeter-based). After a period of correct functioning the server could sporadically fail to return a response making the client’s thread to wait forever (until the TCP connection timeout elapses of course). As I found out a request’s subscription.onComplete hadn’t been called thus the server couldn’t start to write a response (think the culprit might be a nulled receiver reference inside FluxReceive#drainReceiver invocation). I expand on motivation for my test and the results below.

Motivation for the test

I receive rare bug reports about a JS application which gets stuck, namely a user gets a white screen. This happens when one of numerous parallel requests which a JS application sends hangs (a browser dev-tools easily reveal the cause). I wasn’t able to figure out the reason (the backend code doesn’t have any apparent reason) so I checked out reactor-netty and calculated activations of every branch throughout ChannelOperatonsHandler and FluxReceive and printed the results after the start of the quiescence at the end of the test hoping to find discrepancies between successful and unsuccessful runs (I provide the altered version of FluxReceive below: the only difference is the insertion of multiple Counter.INSTANCE.xxx calls which increment atomic counters).

The results

The test client is the following:

  • based on JMeter
  • number of threads: 128
  • number of requests per thread: 5000
  • number of warm-up cycles: 1
  • number of test cycles: 5

The test machine configuration:

  • Linux 4.15.0-121-generic x86_64 GNU/Linux
  • 16Gb RAM
  • 1 processor, 12 cores x 2.2GHz
  • openjdk version “11.0.5-BellSoft” 2019-10-15

As aforesaid the test hangs just once in a while (an interesting thing is that it hangs mostly on Linux) notwithstanding it didn’t give me any clue about the reason so I tried to find discrepancies between a successful and unsuccessful run. The only discrepancy was in the number of Flux.drainReceiver invocations when a receiver inside appeared to be null: in a run which didn’t get stuck this number was always the same as the number of Operators.complete invocation in FluxReceive.subscribe (sounds weird, but it what I see), in a run which hung drainReceiver calls with the nulled receiver outnumbered. Calculation results are the following:

A typical run which didn’t get stuck:

ChannelOperationsHandler:
    channelRead with HttpRequest invoked: 3840000
    channelRead with LastHttpContent invoked: 3840000 (processed: 3840000)
    exceptionCaught invoked: 0

FluxReceive:
  subscribe invoked: 3840000
    receiver is not null: 0
    Operators.complete invoked: 3822331
    Operators.error invoked: 0
    s.onSubscribe(this) invoked: 17669
  onInboundNext invoked: 0
    fast path (receiver.onNext invoked): 0
    drainReceiver invoked: 0
  onInboundComplete invoked: 7680000
    inboundDone is false: 3840000
    receiver.onComplete invoked: 0
    drainReceiver invoked: 3840000
  onInboundError invoked: 0
    receiver.onError invoked: 0
  drainReceiver invoked
    invoked from onInboundComplete: 3840000
    receiver is null: 3822331
    receiver.onNext invoked: 0
  terminateReceiver invoked:
    a != receiver: 0
    complete: 17669
    error: 0
 unsubscribe invoked: 3840000

Application code:
WebfluxServerHttpRequest (reads request via a Subscriber):
  Subscribed: 3840000 (cancelled in onSubscribe: 0)
  Next: 0
  Completed: 3840000
  Failed: 0
WebfluxSinkServerHttpResponse (writes response via FluxSink):
  FluxSink accepted: 3840000
  FluxSink.next called: 3840000:
  FluxSink.completed called: 3840000:
  FluxSink.error called: 0:

A typical Run which got stuck:

ChannelOperationsHandler:
    channelRead with HttpRequest invoked: 3195455
    channelRead with LastHttpContent invoked: 3195455 (processed: 3195455)
    exceptionCaught invoked: 0

FluxReceive:
  subscribe invoked: 3195455
    receiver is not null: 0
    Operators.complete invoked: 2143195
    Operators.error invoked: 0
    s.onSubscribe(this) invoked: 1052260
  onInboundNext invoked: 0
    fast path (receiver.onNext invoked): 0
    drainReceiver invoked: 0
  onInboundComplete invoked: 6390909
    inboundDone is false: 3195455
    receiver.onComplete invoked: 0
    drainReceiver invoked: 3195455
  onInboundError invoked: 0
    receiver.onError invoked: 0
  drainReceiver invoked
    invoked from onInboundComplete: 3195455
    receiver is null: 2143196 <<<< Probably it matters
    receiver.onNext invoked: 0
  terminateReceiver invoked:
    a != receiver: 0
    complete: 1052259
    error: 0
 unsubscribe invoked: 3195454
 cancelReceiver invoked: 3195454

Application code:
WebfluxServerHttpRequest (reads request via a Subscriber):
  Subscribed: 3195455 (cancelled in onSubscribe: 0)
  Next: 0
  Completed: 3195454 <<<< That's why the server didn't send a response (I think)
  Failed: 0
WebfluxSinkServerHttpResponse (writes response via FluxSink):
  FluxSink accepted: 3195455:
  FluxSink.next called: 3195454:
  FluxSink.completed called: 3195454:
  FluxSink.error called: 0:

The exact reason why a client hangs is because a subscriber.onComplete is not called (think might be interconnected with a nulled receiver during Flux.drainReceiver invocation) as a result the server couldn’t start to write a response.

Do you have any ideas why this could happen? Thanks.

Environment:

  • Java 11,
  • Linux,
  • Reactor 0.9.12.RELEASE (or snapshot from develop).

An altered FluxReceive:

/*
 * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.netty.channel;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import org.reactivestreams.Subscription;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.Logger;
import reactor.util.Loggers;

import static reactor.netty.ReactorNetty.format;

/**
 * @author Stephane Maldini
 */
final class FluxReceive extends Flux<Object> implements Subscription, Disposable {

	static final int QUEUE_LOW_LIMIT = 32;

	final Channel           channel;
	final ChannelOperations<?, ?> parent;
	final EventLoop         eventLoop;

	CoreSubscriber<? super Object> receiver;
	boolean                        receiverFastpath;
	long                           receiverDemand;
	Queue<Object>                  receiverQueue;

	boolean needRead = true;

	volatile boolean   inboundDone;
	Throwable inboundError;

	volatile Disposable receiverCancel;

	volatile int once;
	static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
		AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");

	// Please note, in this specific case WIP is non-volatile since all operation that
	// involves work-in-progress pattern is within Netty Event-Loops which guarantees
	// serial, thread-safe behaviour.
	// However, we need that flag in order to preserve work-in-progress guarding that
	// prevents stack overflow in case of onNext -> request -> onNext cycling on the
	// same stack
	int wip;


	FluxReceive(ChannelOperations<?, ?> parent) {

		//reset channel to manual read if re-used

		this.parent = parent;
		this.channel = parent.channel();
		this.eventLoop = channel.eventLoop();
		channel.config()
		       .setAutoRead(false);
		CANCEL.lazySet(this, () -> {
			if (eventLoop.inEventLoop()) {
				unsubscribeReceiver();
			}
			else {
				eventLoop.execute(this::unsubscribeReceiver);
			}
		});
	}

	@Override
	public void cancel() {
		cancelReceiver();
		if (eventLoop.inEventLoop()) {
			drainReceiver(false);
		}
		else {
			eventLoop.execute(() -> drainReceiver(false));
		}
	}

	final long getPending() {
		return receiverQueue != null ? receiverQueue.size() : 0;
	}

	final boolean isCancelled() {
		return receiverCancel == CANCELLED;
	}

	@Override
	public void dispose() {
		cancel();
	}

	@Override
	public boolean isDisposed() {
		return (inboundDone && (receiverQueue == null || receiverQueue.isEmpty()));
	}

	@Override
	public void request(long n) {
		if (Operators.validate(n)) {
			if (eventLoop.inEventLoop()) {
				this.receiverDemand = Operators.addCap(receiverDemand, n);
				drainReceiver(false);
			}
			else {
				eventLoop.execute(() -> {
					this.receiverDemand = Operators.addCap(receiverDemand, n);
					drainReceiver(false);
				});
			}
		}
	}

	@Override
	public void subscribe(CoreSubscriber<? super Object> s) {
		Counter.INSTANCE.frCounter.subscribeCall.increment();
		if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
			if (log.isDebugEnabled()) {
				log.debug(format(channel, "{}: subscribing inbound receiver"), this);
			}
			if (inboundDone && getPending() == 0) {
				if (inboundError != null) {
					Counter.INSTANCE.frCounter.subscribeError.increment();
					Operators.error(s, inboundError);
					return;
				}

				Counter.INSTANCE.frCounter.subscribeComplete.increment();
				Operators.complete(s);
				return;
			}

			if (receiver != null)
				Counter.INSTANCE.frCounter.subscribeNotNullReceiver.increment();

			receiver = s;

			Counter.INSTANCE.frCounter.subscribeDone.increment();
			s.onSubscribe(this);
		}
		else {
			if (inboundDone && getPending() == 0) {
				if (inboundError != null) {
					Counter.INSTANCE.frCounter.subscribeError.increment();
					Operators.error(s, inboundError);
					return;
				}

				Counter.INSTANCE.frCounter.subscribeComplete.increment();
				Operators.complete(s);
			}
			else {
				Counter.INSTANCE.frCounter.subscribeError.increment();
				Operators.error(s,
						new IllegalStateException(
								"Only one connection receive subscriber allowed."));
			}
		}
	}

	final boolean cancelReceiver() {
		Disposable c = receiverCancel;
		if (c != CANCELLED) {
			c = CANCEL.getAndSet(this, CANCELLED);
			if (c != CANCELLED) {
				Counter.INSTANCE.frCounter.cancelReceiver.increment();
				c.dispose();
				return true;
			}
		}
		return false;
	}

	final void cleanQueue(@Nullable Queue<Object> q){
		if (q != null) {
			Object o;
			while ((o = q.poll()) != null) {
				if (log.isDebugEnabled()) {
					log.debug(format(channel, "{}: dropping frame {}"), this, o);
				}
				ReferenceCountUtil.release(o);
			}
		}
	}

	final void drainReceiver(boolean completed) {
		if (completed)
			Counter.INSTANCE.frCounter.drainReceiverCompletedTrue.increment();

		// general protect against stackoverflow onNext -> request -> onNext
		if (wip++ != 0) {
			return;
		}
		int missed = 1;
		for(;;) {
			final Queue<Object> q = receiverQueue;
			final CoreSubscriber<? super Object> a = receiver;
			boolean d = inboundDone;

			if (a == null) {
				Counter.INSTANCE.frCounter.drainReceiverNull.increment();
				if (isCancelled()) {
					cleanQueue(q);
					return;
				}
				if (d && getPending() == 0) {
					Throwable ex = inboundError;
					if (ex != null) {
						parent.listener.onUncaughtException(parent, ex);
					}
					return;
				}
				missed = (wip -= missed);
				if(missed == 0){
					break;
				}
				continue;
			}

			long r = receiverDemand;
			long e = 0L;

			while (e != r) {
				if (isCancelled()) {
					cleanQueue(q);
					terminateReceiver(q, a);
					return;
				}

				d = inboundDone;
				Object v = q != null ? q.poll() : null;
				boolean empty = v == null;

				if (d && empty) {
					terminateReceiver(q, a);
					return;
				}

				if (empty) {
					break;
				}

				try {
					Counter.INSTANCE.frCounter.drainOnNext.increment();
					a.onNext(v);
				}
				finally {
					try {
						ReferenceCountUtil.release(v);
					}
					catch(Throwable t) {
						inboundError = t;
						cleanQueue(q);
						terminateReceiver(q, a);
					}
				}

				e++;
			}

			if (isCancelled()) {
				cleanQueue(q);
				terminateReceiver(q, a);
				return;
			}

			if (inboundDone && (q == null || q.isEmpty())) {
				terminateReceiver(q, a);
				return;
			}

			if (r == Long.MAX_VALUE) {
				receiverFastpath = true;
				if (needRead) {
					needRead = false;
					channel.config()
					       .setAutoRead(true);
				}
				missed = (wip -= missed);
				if(missed == 0){
					break;
				}
			}

			if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) {
				if (needRead) {
					needRead = false;
					channel.config()
					       .setAutoRead(true);
				}
			}
			else if (!needRead) {
				needRead = true;
				channel.config()
				       .setAutoRead(false);
			}

			missed = (wip -= missed);
			if(missed == 0){
				break;
			}
		}
	}

	final void onInboundNext(Object msg) {
		Counter.INSTANCE.frCounter.inboundNext.increment();
		if (inboundDone || isCancelled()) {
			if (log.isDebugEnabled()) {
				log.debug(format(channel, "{}: dropping frame {}"), this, msg);
			}
			ReferenceCountUtil.release(msg);
			return;
		}

		if (receiverFastpath && receiver != null) {
			try {
				if (log.isDebugEnabled()){
					if(msg instanceof ByteBuf) {
						((ByteBuf) msg).touch(format(channel, "Unbounded receiver, bypass inbound " +
								"buffer queue"));
					}
					else if (msg instanceof ByteBufHolder){
						((ByteBufHolder) msg).touch(format(channel,"Unbounded receiver, bypass inbound " +
								"buffer queue"));
					}
				}
				Counter.INSTANCE.frCounter.inboundNextFastPath.increment();
				receiver.onNext(msg);
			}
			finally {
				ReferenceCountUtil.release(msg);
			}
		}
		else {
			Queue<Object> q = receiverQueue;
			if (q == null) {
				// please note, in that case we are using non-thread safe, simple
				// ArrayDeque since all modifications on this queue happens withing
				// Netty Event Loop
				q = new ArrayDeque<>();
				receiverQueue = q;
			}
			if (log.isDebugEnabled()){
				if(msg instanceof ByteBuf) {
					((ByteBuf) msg).touch(format(channel,"Buffered ByteBuf in Inbound Flux Queue"));
				}
				else if (msg instanceof ByteBufHolder){
					((ByteBufHolder) msg).touch(format(channel,"Buffered ByteBufHolder in Inbound Flux" +
							" Queue"));
				}
			}
			q.offer(msg);
			Counter.INSTANCE.frCounter.inboundNextDrain.increment();
			drainReceiver(false);
		}
	}

	final void onInboundComplete() {
		Counter.INSTANCE.frCounter.inboundComplete.increment();
		if (inboundDone) {
			return;
		}
		inboundDone = true;
		Counter.INSTANCE.frCounter.inboundCompleteProcessed.increment();
		if (receiverFastpath) {
			CoreSubscriber<?> receiver = this.receiver;
			if (receiver != null) {
				Counter.INSTANCE.frCounter.inboundCompleteProcessedFastPath.increment();
				receiver.onComplete();
			}
			return;
		}
		Counter.INSTANCE.frCounter.inboundCompleteProcessedDrain.increment();
		drainReceiver(true);
	}

	final void onInboundError(Throwable err) {
		Counter.INSTANCE.frCounter.inboundError.increment();
		if (isCancelled() || inboundDone) {
			if (log.isDebugEnabled()) {
				if (AbortedException.isConnectionReset(err)) {
					log.debug(format(channel, "Connection reset has been observed post termination"), err);
				}
				else {
					log.warn(format(channel, "An exception has been observed post termination"), err);
				}
			} else if (log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) {
				log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
			}
			return;
		}

		CoreSubscriber<?> receiver = this.receiver;
		this.inboundDone = true;
		if(channel.isActive()){
			parent.markPersistent(false);
		}

		if (err instanceof OutOfMemoryError) {
			this.inboundError = parent.wrapInboundError(err);
			try {
				if (receiver != null) {
					// propagate java.lang.OutOfMemoryError: Direct buffer memory
					Counter.INSTANCE.frCounter.inboundErrorProcessed.increment();
					receiver.onError(inboundError);
				}
			}
			finally {
				// close the connection
				// release the buffers
				parent.terminate();
			}
		}
		else if (err instanceof ClosedChannelException) {
			this.inboundError = parent.wrapInboundError(err);
		}
		else {
			this.inboundError = err;
		}

		if (receiverFastpath && receiver != null) {
			Counter.INSTANCE.frCounter.inboundErrorProcessed.increment();
			receiver.onError(inboundError);
		}
		else {
			drainReceiver(false);
		}
	}

	final void terminateReceiver(@Nullable Queue<?> q, CoreSubscriber<?> a) {
		if (q != null) {
			q.clear();
		}
		Throwable ex = inboundError;
		if (a != receiver)
			Counter.INSTANCE.frCounter.terminateWrongReceiver.increment();

		receiver = null;
		if (ex != null) {
			//parent.listener.onReceiveError(channel, ex);
			Counter.INSTANCE.frCounter.terminateReceiverError.increment();
			a.onError(ex);
		}
		else {
			Counter.INSTANCE.frCounter.terminateReceiverComplete.increment();
			a.onComplete();
		}
	}

	final void unsubscribeReceiver() {
		Counter.INSTANCE.frCounter.unsubscribe.increment();
		receiverDemand = 0L;
		receiver = null;
		if(isCancelled()) {
			parent.onInboundCancel();
		}
	}

	@Override
	public String toString() {
		return "FluxReceive{" +
				"pending=" + getPending() +
				", cancelled=" + isCancelled() +
				", inboundDone=" + inboundDone +
				", inboundError=" + inboundError +
				'}';
	}

	@SuppressWarnings("rawtypes")
	static final AtomicReferenceFieldUpdater<FluxReceive, Disposable> CANCEL =
			AtomicReferenceFieldUpdater.newUpdater(FluxReceive.class,
					Disposable.class,
					"receiverCancel");

	static final Disposable CANCELLED = () -> {
	};

	static final Logger log = Loggers.getLogger(FluxReceive.class);
}

An additional class Counter:

/*
 * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.netty.channel;

import java.util.concurrent.atomic.LongAdder;

import reactor.util.Logger;
import reactor.util.Loggers;

final class Counter
{
    public static final Counter INSTANCE = new Counter();

    private static final Logger LOG = Loggers.getLogger(Counter.class);

    private final Thread printer;
    final ChannelOperationsHandlerCounter cohCounter;
    final FluxReceiverCounter frCounter;

    static class ChannelOperationsHandlerCounter
    {
        final LongAdder httpRequest = new LongAdder();
        final LongAdder lastContent = new LongAdder();
        final LongAdder lastContentPassed = new LongAdder();
        final LongAdder exceptions = new LongAdder();
    }

    static class FluxReceiverCounter
    {
        final LongAdder subscribeCall = new LongAdder();
        final LongAdder subscribeNotNullReceiver = new LongAdder();
        final LongAdder subscribeDone = new LongAdder();
        final LongAdder subscribeError = new LongAdder();
        final LongAdder subscribeComplete = new LongAdder();

        final LongAdder inboundNext = new LongAdder();
        final LongAdder inboundNextFastPath = new LongAdder();
        final LongAdder inboundNextDrain = new LongAdder();

        final LongAdder inboundComplete = new LongAdder();
        final LongAdder inboundCompleteProcessed = new LongAdder();
        final LongAdder inboundCompleteProcessedFastPath = new LongAdder();
        final LongAdder inboundCompleteProcessedDrain = new LongAdder();

        final LongAdder inboundError = new LongAdder();
        final LongAdder inboundErrorProcessed = new LongAdder();

        final LongAdder drainReceiverCompletedTrue = new LongAdder();
        final LongAdder drainReceiverNull = new LongAdder();
        final LongAdder drainOnNext = new LongAdder();

        final LongAdder terminateReceiverComplete = new LongAdder();
        final LongAdder terminateReceiverError = new LongAdder();
        final LongAdder terminateWrongReceiver = new LongAdder();
        final LongAdder unsubscribe = new LongAdder();
        final LongAdder cancelReceiver = new LongAdder();
    }

    private Counter()
    {
        cohCounter = new ChannelOperationsHandlerCounter();
        frCounter = new FluxReceiverCounter();

        printer = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted())
            {
                try
                {
                    Thread.sleep(5000);
                }
                catch (InterruptedException e)
                {
                    return;
                }

                LOG.info("\n-------------------------------\n" +
                        "ChannelOperationsHandler:\n" +
                        "    channelRead with HttpRequest invoked: {}\n" +
                        "    channelRead with LastHttpContent invoked: {} (processed: {})\n" +
                        "    exceptionCaught invoked: {}\n\n" +
                        "FluxReceive:\n" +
                        "  subscribe invoked: {}\n" +
                        "    receiver is not null: {}\n" +
                        "    Operators.complete invoked: {}\n" +
                        "    Operators.error invoked: {}\n" +
                        "    s.onSubscribe(this) invoked: {}\n" +
                        "  onInboundNext invoked: {}\n" +
                        "    fast path (receiver.onNext invoked): {}\n" +
                        "    drainReceiver invoked: {}\n" +
                        "  onInboundComplete invoked: {}\n" +
                        "    inboundDone is false: {}\n" +
                        "    receiver.onComplete invoked: {}\n" +
                        "    drainReceiver invoked: {}\n" +
                        "  onInboundError invoked: {}\n" +
                        "    receiver.onError invoked: {}\n" +
                        "  drainReceiver invoked\n" +
                        "    invoked from onInboundComplete: {}\n" +
                        "    receiver is null: {}\n" +
                        "    receiver.onNext invoked: {}\n" +
                        "  terminateReceiver invoked:\n" +
                        "    a != receiver: {}\n" +
                        "    complete: {}\n" +
                        "    error: {}\n" +
                        " unsubscribe invoked: {}\n" +
                        " cancelReceiver invoked: {}\n"
                        ,
                        cohCounter.httpRequest.longValue(),
                        cohCounter.lastContent.longValue(),
                        cohCounter.lastContentPassed.longValue(),
                        cohCounter.exceptions.longValue(),
                        frCounter.subscribeCall.longValue(),
                        frCounter.subscribeNotNullReceiver.longValue(),
                        frCounter.subscribeComplete.longValue(),
                        frCounter.subscribeError.longValue(),
                        frCounter.subscribeDone.longValue(),
                        frCounter.inboundNext.longValue(),
                        frCounter.inboundNextFastPath.longValue(),
                        frCounter.inboundNextDrain.longValue(),
                        frCounter.inboundComplete.longValue(),
                        frCounter.inboundCompleteProcessed.longValue(),
                        frCounter.inboundCompleteProcessedFastPath.longValue(),
                        frCounter.inboundCompleteProcessedDrain.longValue(),
                        frCounter.inboundError.longValue(),
                        frCounter.inboundErrorProcessed.longValue(),
                        frCounter.drainReceiverCompletedTrue.longValue(),
                        frCounter.drainReceiverNull.longValue(),
                        frCounter.drainOnNext.longValue(),
                        frCounter.terminateWrongReceiver.longValue(),
                        frCounter.terminateReceiverComplete.longValue(),
                        frCounter.terminateReceiverError.longValue(),
                        frCounter.unsubscribe.longValue(),
                        frCounter.cancelReceiver.longValue());
            }
        });
        printer.setDaemon(true);
        printer.start();
    }
}

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 28 (12 by maintainers)

Commits related to this issue

Most upvoted comments

@evgenyvsmirnov 0.9.14.RELEASE is available

@violetagg seems it helped. Thanks a lot!

@violetagg not 100 percent sure. Need to give it an acid test on the aforementioned 1 processor/12 core server. Will do it on Monday.