reactor-netty: Reactor hangs once in a while under stress
Greetings! I’ve implemented a simple spring-based webflux application with a controller which returns an OK-response with the same 10-byte plain-text body whatever GET request it receives and put it under stress using a multithreaded client (JMeter-based). After a period of correct functioning the server could sporadically fail to return a response making the client’s thread to wait forever (until the TCP connection timeout elapses of course). As I found out a request’s subscription.onComplete hadn’t been called thus the server couldn’t start to write a response (think the culprit might be a nulled receiver reference inside FluxReceive#drainReceiver invocation). I expand on motivation for my test and the results below.
Motivation for the test
I receive rare bug reports about a JS application which gets stuck, namely a user gets a white screen. This happens when one of numerous parallel requests which a JS application sends hangs (a browser dev-tools easily reveal the cause). I wasn’t able to figure out the reason (the backend code doesn’t have any apparent reason) so I checked out reactor-netty and calculated activations of every branch throughout ChannelOperatonsHandler and FluxReceive and printed the results after the start of the quiescence at the end of the test hoping to find discrepancies between successful and unsuccessful runs (I provide the altered version of FluxReceive below: the only difference is the insertion of multiple Counter.INSTANCE.xxx calls which increment atomic counters).
The results
The test client is the following:
- based on JMeter
- number of threads: 128
- number of requests per thread: 5000
- number of warm-up cycles: 1
- number of test cycles: 5
The test machine configuration:
- Linux 4.15.0-121-generic x86_64 GNU/Linux
- 16Gb RAM
- 1 processor, 12 cores x 2.2GHz
- openjdk version “11.0.5-BellSoft” 2019-10-15
As aforesaid the test hangs just once in a while (an interesting thing is that it hangs mostly on Linux) notwithstanding it didn’t give me any clue about the reason so I tried to find discrepancies between a successful and unsuccessful run. The only discrepancy was in the number of Flux.drainReceiver invocations when a receiver inside appeared to be null: in a run which didn’t get stuck this number was always the same as the number of Operators.complete invocation in FluxReceive.subscribe (sounds weird, but it what I see), in a run which hung drainReceiver calls with the nulled receiver outnumbered. Calculation results are the following:
A typical run which didn’t get stuck:
ChannelOperationsHandler:
channelRead with HttpRequest invoked: 3840000
channelRead with LastHttpContent invoked: 3840000 (processed: 3840000)
exceptionCaught invoked: 0
FluxReceive:
subscribe invoked: 3840000
receiver is not null: 0
Operators.complete invoked: 3822331
Operators.error invoked: 0
s.onSubscribe(this) invoked: 17669
onInboundNext invoked: 0
fast path (receiver.onNext invoked): 0
drainReceiver invoked: 0
onInboundComplete invoked: 7680000
inboundDone is false: 3840000
receiver.onComplete invoked: 0
drainReceiver invoked: 3840000
onInboundError invoked: 0
receiver.onError invoked: 0
drainReceiver invoked
invoked from onInboundComplete: 3840000
receiver is null: 3822331
receiver.onNext invoked: 0
terminateReceiver invoked:
a != receiver: 0
complete: 17669
error: 0
unsubscribe invoked: 3840000
Application code:
WebfluxServerHttpRequest (reads request via a Subscriber):
Subscribed: 3840000 (cancelled in onSubscribe: 0)
Next: 0
Completed: 3840000
Failed: 0
WebfluxSinkServerHttpResponse (writes response via FluxSink):
FluxSink accepted: 3840000
FluxSink.next called: 3840000:
FluxSink.completed called: 3840000:
FluxSink.error called: 0:
A typical Run which got stuck:
ChannelOperationsHandler:
channelRead with HttpRequest invoked: 3195455
channelRead with LastHttpContent invoked: 3195455 (processed: 3195455)
exceptionCaught invoked: 0
FluxReceive:
subscribe invoked: 3195455
receiver is not null: 0
Operators.complete invoked: 2143195
Operators.error invoked: 0
s.onSubscribe(this) invoked: 1052260
onInboundNext invoked: 0
fast path (receiver.onNext invoked): 0
drainReceiver invoked: 0
onInboundComplete invoked: 6390909
inboundDone is false: 3195455
receiver.onComplete invoked: 0
drainReceiver invoked: 3195455
onInboundError invoked: 0
receiver.onError invoked: 0
drainReceiver invoked
invoked from onInboundComplete: 3195455
receiver is null: 2143196 <<<< Probably it matters
receiver.onNext invoked: 0
terminateReceiver invoked:
a != receiver: 0
complete: 1052259
error: 0
unsubscribe invoked: 3195454
cancelReceiver invoked: 3195454
Application code:
WebfluxServerHttpRequest (reads request via a Subscriber):
Subscribed: 3195455 (cancelled in onSubscribe: 0)
Next: 0
Completed: 3195454 <<<< That's why the server didn't send a response (I think)
Failed: 0
WebfluxSinkServerHttpResponse (writes response via FluxSink):
FluxSink accepted: 3195455:
FluxSink.next called: 3195454:
FluxSink.completed called: 3195454:
FluxSink.error called: 0:
The exact reason why a client hangs is because a subscriber.onComplete is not called (think might be interconnected with a nulled receiver during Flux.drainReceiver invocation) as a result the server couldn’t start to write a response.
Do you have any ideas why this could happen? Thanks.
Environment:
- Java 11,
- Linux,
- Reactor 0.9.12.RELEASE (or snapshot from develop).
An altered FluxReceive:
/*
* Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.channel;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscription;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.Logger;
import reactor.util.Loggers;
import static reactor.netty.ReactorNetty.format;
/**
* @author Stephane Maldini
*/
final class FluxReceive extends Flux<Object> implements Subscription, Disposable {
static final int QUEUE_LOW_LIMIT = 32;
final Channel channel;
final ChannelOperations<?, ?> parent;
final EventLoop eventLoop;
CoreSubscriber<? super Object> receiver;
boolean receiverFastpath;
long receiverDemand;
Queue<Object> receiverQueue;
boolean needRead = true;
volatile boolean inboundDone;
Throwable inboundError;
volatile Disposable receiverCancel;
volatile int once;
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");
// Please note, in this specific case WIP is non-volatile since all operation that
// involves work-in-progress pattern is within Netty Event-Loops which guarantees
// serial, thread-safe behaviour.
// However, we need that flag in order to preserve work-in-progress guarding that
// prevents stack overflow in case of onNext -> request -> onNext cycling on the
// same stack
int wip;
FluxReceive(ChannelOperations<?, ?> parent) {
//reset channel to manual read if re-used
this.parent = parent;
this.channel = parent.channel();
this.eventLoop = channel.eventLoop();
channel.config()
.setAutoRead(false);
CANCEL.lazySet(this, () -> {
if (eventLoop.inEventLoop()) {
unsubscribeReceiver();
}
else {
eventLoop.execute(this::unsubscribeReceiver);
}
});
}
@Override
public void cancel() {
cancelReceiver();
if (eventLoop.inEventLoop()) {
drainReceiver(false);
}
else {
eventLoop.execute(() -> drainReceiver(false));
}
}
final long getPending() {
return receiverQueue != null ? receiverQueue.size() : 0;
}
final boolean isCancelled() {
return receiverCancel == CANCELLED;
}
@Override
public void dispose() {
cancel();
}
@Override
public boolean isDisposed() {
return (inboundDone && (receiverQueue == null || receiverQueue.isEmpty()));
}
@Override
public void request(long n) {
if (Operators.validate(n)) {
if (eventLoop.inEventLoop()) {
this.receiverDemand = Operators.addCap(receiverDemand, n);
drainReceiver(false);
}
else {
eventLoop.execute(() -> {
this.receiverDemand = Operators.addCap(receiverDemand, n);
drainReceiver(false);
});
}
}
}
@Override
public void subscribe(CoreSubscriber<? super Object> s) {
Counter.INSTANCE.frCounter.subscribeCall.increment();
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: subscribing inbound receiver"), this);
}
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Counter.INSTANCE.frCounter.subscribeError.increment();
Operators.error(s, inboundError);
return;
}
Counter.INSTANCE.frCounter.subscribeComplete.increment();
Operators.complete(s);
return;
}
if (receiver != null)
Counter.INSTANCE.frCounter.subscribeNotNullReceiver.increment();
receiver = s;
Counter.INSTANCE.frCounter.subscribeDone.increment();
s.onSubscribe(this);
}
else {
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Counter.INSTANCE.frCounter.subscribeError.increment();
Operators.error(s, inboundError);
return;
}
Counter.INSTANCE.frCounter.subscribeComplete.increment();
Operators.complete(s);
}
else {
Counter.INSTANCE.frCounter.subscribeError.increment();
Operators.error(s,
new IllegalStateException(
"Only one connection receive subscriber allowed."));
}
}
}
final boolean cancelReceiver() {
Disposable c = receiverCancel;
if (c != CANCELLED) {
c = CANCEL.getAndSet(this, CANCELLED);
if (c != CANCELLED) {
Counter.INSTANCE.frCounter.cancelReceiver.increment();
c.dispose();
return true;
}
}
return false;
}
final void cleanQueue(@Nullable Queue<Object> q){
if (q != null) {
Object o;
while ((o = q.poll()) != null) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: dropping frame {}"), this, o);
}
ReferenceCountUtil.release(o);
}
}
}
final void drainReceiver(boolean completed) {
if (completed)
Counter.INSTANCE.frCounter.drainReceiverCompletedTrue.increment();
// general protect against stackoverflow onNext -> request -> onNext
if (wip++ != 0) {
return;
}
int missed = 1;
for(;;) {
final Queue<Object> q = receiverQueue;
final CoreSubscriber<? super Object> a = receiver;
boolean d = inboundDone;
if (a == null) {
Counter.INSTANCE.frCounter.drainReceiverNull.increment();
if (isCancelled()) {
cleanQueue(q);
return;
}
if (d && getPending() == 0) {
Throwable ex = inboundError;
if (ex != null) {
parent.listener.onUncaughtException(parent, ex);
}
return;
}
missed = (wip -= missed);
if(missed == 0){
break;
}
continue;
}
long r = receiverDemand;
long e = 0L;
while (e != r) {
if (isCancelled()) {
cleanQueue(q);
terminateReceiver(q, a);
return;
}
d = inboundDone;
Object v = q != null ? q.poll() : null;
boolean empty = v == null;
if (d && empty) {
terminateReceiver(q, a);
return;
}
if (empty) {
break;
}
try {
Counter.INSTANCE.frCounter.drainOnNext.increment();
a.onNext(v);
}
finally {
try {
ReferenceCountUtil.release(v);
}
catch(Throwable t) {
inboundError = t;
cleanQueue(q);
terminateReceiver(q, a);
}
}
e++;
}
if (isCancelled()) {
cleanQueue(q);
terminateReceiver(q, a);
return;
}
if (inboundDone && (q == null || q.isEmpty())) {
terminateReceiver(q, a);
return;
}
if (r == Long.MAX_VALUE) {
receiverFastpath = true;
if (needRead) {
needRead = false;
channel.config()
.setAutoRead(true);
}
missed = (wip -= missed);
if(missed == 0){
break;
}
}
if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) {
if (needRead) {
needRead = false;
channel.config()
.setAutoRead(true);
}
}
else if (!needRead) {
needRead = true;
channel.config()
.setAutoRead(false);
}
missed = (wip -= missed);
if(missed == 0){
break;
}
}
}
final void onInboundNext(Object msg) {
Counter.INSTANCE.frCounter.inboundNext.increment();
if (inboundDone || isCancelled()) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: dropping frame {}"), this, msg);
}
ReferenceCountUtil.release(msg);
return;
}
if (receiverFastpath && receiver != null) {
try {
if (log.isDebugEnabled()){
if(msg instanceof ByteBuf) {
((ByteBuf) msg).touch(format(channel, "Unbounded receiver, bypass inbound " +
"buffer queue"));
}
else if (msg instanceof ByteBufHolder){
((ByteBufHolder) msg).touch(format(channel,"Unbounded receiver, bypass inbound " +
"buffer queue"));
}
}
Counter.INSTANCE.frCounter.inboundNextFastPath.increment();
receiver.onNext(msg);
}
finally {
ReferenceCountUtil.release(msg);
}
}
else {
Queue<Object> q = receiverQueue;
if (q == null) {
// please note, in that case we are using non-thread safe, simple
// ArrayDeque since all modifications on this queue happens withing
// Netty Event Loop
q = new ArrayDeque<>();
receiverQueue = q;
}
if (log.isDebugEnabled()){
if(msg instanceof ByteBuf) {
((ByteBuf) msg).touch(format(channel,"Buffered ByteBuf in Inbound Flux Queue"));
}
else if (msg instanceof ByteBufHolder){
((ByteBufHolder) msg).touch(format(channel,"Buffered ByteBufHolder in Inbound Flux" +
" Queue"));
}
}
q.offer(msg);
Counter.INSTANCE.frCounter.inboundNextDrain.increment();
drainReceiver(false);
}
}
final void onInboundComplete() {
Counter.INSTANCE.frCounter.inboundComplete.increment();
if (inboundDone) {
return;
}
inboundDone = true;
Counter.INSTANCE.frCounter.inboundCompleteProcessed.increment();
if (receiverFastpath) {
CoreSubscriber<?> receiver = this.receiver;
if (receiver != null) {
Counter.INSTANCE.frCounter.inboundCompleteProcessedFastPath.increment();
receiver.onComplete();
}
return;
}
Counter.INSTANCE.frCounter.inboundCompleteProcessedDrain.increment();
drainReceiver(true);
}
final void onInboundError(Throwable err) {
Counter.INSTANCE.frCounter.inboundError.increment();
if (isCancelled() || inboundDone) {
if (log.isDebugEnabled()) {
if (AbortedException.isConnectionReset(err)) {
log.debug(format(channel, "Connection reset has been observed post termination"), err);
}
else {
log.warn(format(channel, "An exception has been observed post termination"), err);
}
} else if (log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) {
log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
}
return;
}
CoreSubscriber<?> receiver = this.receiver;
this.inboundDone = true;
if(channel.isActive()){
parent.markPersistent(false);
}
if (err instanceof OutOfMemoryError) {
this.inboundError = parent.wrapInboundError(err);
try {
if (receiver != null) {
// propagate java.lang.OutOfMemoryError: Direct buffer memory
Counter.INSTANCE.frCounter.inboundErrorProcessed.increment();
receiver.onError(inboundError);
}
}
finally {
// close the connection
// release the buffers
parent.terminate();
}
}
else if (err instanceof ClosedChannelException) {
this.inboundError = parent.wrapInboundError(err);
}
else {
this.inboundError = err;
}
if (receiverFastpath && receiver != null) {
Counter.INSTANCE.frCounter.inboundErrorProcessed.increment();
receiver.onError(inboundError);
}
else {
drainReceiver(false);
}
}
final void terminateReceiver(@Nullable Queue<?> q, CoreSubscriber<?> a) {
if (q != null) {
q.clear();
}
Throwable ex = inboundError;
if (a != receiver)
Counter.INSTANCE.frCounter.terminateWrongReceiver.increment();
receiver = null;
if (ex != null) {
//parent.listener.onReceiveError(channel, ex);
Counter.INSTANCE.frCounter.terminateReceiverError.increment();
a.onError(ex);
}
else {
Counter.INSTANCE.frCounter.terminateReceiverComplete.increment();
a.onComplete();
}
}
final void unsubscribeReceiver() {
Counter.INSTANCE.frCounter.unsubscribe.increment();
receiverDemand = 0L;
receiver = null;
if(isCancelled()) {
parent.onInboundCancel();
}
}
@Override
public String toString() {
return "FluxReceive{" +
"pending=" + getPending() +
", cancelled=" + isCancelled() +
", inboundDone=" + inboundDone +
", inboundError=" + inboundError +
'}';
}
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<FluxReceive, Disposable> CANCEL =
AtomicReferenceFieldUpdater.newUpdater(FluxReceive.class,
Disposable.class,
"receiverCancel");
static final Disposable CANCELLED = () -> {
};
static final Logger log = Loggers.getLogger(FluxReceive.class);
}
An additional class Counter:
/*
* Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.channel;
import java.util.concurrent.atomic.LongAdder;
import reactor.util.Logger;
import reactor.util.Loggers;
final class Counter
{
public static final Counter INSTANCE = new Counter();
private static final Logger LOG = Loggers.getLogger(Counter.class);
private final Thread printer;
final ChannelOperationsHandlerCounter cohCounter;
final FluxReceiverCounter frCounter;
static class ChannelOperationsHandlerCounter
{
final LongAdder httpRequest = new LongAdder();
final LongAdder lastContent = new LongAdder();
final LongAdder lastContentPassed = new LongAdder();
final LongAdder exceptions = new LongAdder();
}
static class FluxReceiverCounter
{
final LongAdder subscribeCall = new LongAdder();
final LongAdder subscribeNotNullReceiver = new LongAdder();
final LongAdder subscribeDone = new LongAdder();
final LongAdder subscribeError = new LongAdder();
final LongAdder subscribeComplete = new LongAdder();
final LongAdder inboundNext = new LongAdder();
final LongAdder inboundNextFastPath = new LongAdder();
final LongAdder inboundNextDrain = new LongAdder();
final LongAdder inboundComplete = new LongAdder();
final LongAdder inboundCompleteProcessed = new LongAdder();
final LongAdder inboundCompleteProcessedFastPath = new LongAdder();
final LongAdder inboundCompleteProcessedDrain = new LongAdder();
final LongAdder inboundError = new LongAdder();
final LongAdder inboundErrorProcessed = new LongAdder();
final LongAdder drainReceiverCompletedTrue = new LongAdder();
final LongAdder drainReceiverNull = new LongAdder();
final LongAdder drainOnNext = new LongAdder();
final LongAdder terminateReceiverComplete = new LongAdder();
final LongAdder terminateReceiverError = new LongAdder();
final LongAdder terminateWrongReceiver = new LongAdder();
final LongAdder unsubscribe = new LongAdder();
final LongAdder cancelReceiver = new LongAdder();
}
private Counter()
{
cohCounter = new ChannelOperationsHandlerCounter();
frCounter = new FluxReceiverCounter();
printer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted())
{
try
{
Thread.sleep(5000);
}
catch (InterruptedException e)
{
return;
}
LOG.info("\n-------------------------------\n" +
"ChannelOperationsHandler:\n" +
" channelRead with HttpRequest invoked: {}\n" +
" channelRead with LastHttpContent invoked: {} (processed: {})\n" +
" exceptionCaught invoked: {}\n\n" +
"FluxReceive:\n" +
" subscribe invoked: {}\n" +
" receiver is not null: {}\n" +
" Operators.complete invoked: {}\n" +
" Operators.error invoked: {}\n" +
" s.onSubscribe(this) invoked: {}\n" +
" onInboundNext invoked: {}\n" +
" fast path (receiver.onNext invoked): {}\n" +
" drainReceiver invoked: {}\n" +
" onInboundComplete invoked: {}\n" +
" inboundDone is false: {}\n" +
" receiver.onComplete invoked: {}\n" +
" drainReceiver invoked: {}\n" +
" onInboundError invoked: {}\n" +
" receiver.onError invoked: {}\n" +
" drainReceiver invoked\n" +
" invoked from onInboundComplete: {}\n" +
" receiver is null: {}\n" +
" receiver.onNext invoked: {}\n" +
" terminateReceiver invoked:\n" +
" a != receiver: {}\n" +
" complete: {}\n" +
" error: {}\n" +
" unsubscribe invoked: {}\n" +
" cancelReceiver invoked: {}\n"
,
cohCounter.httpRequest.longValue(),
cohCounter.lastContent.longValue(),
cohCounter.lastContentPassed.longValue(),
cohCounter.exceptions.longValue(),
frCounter.subscribeCall.longValue(),
frCounter.subscribeNotNullReceiver.longValue(),
frCounter.subscribeComplete.longValue(),
frCounter.subscribeError.longValue(),
frCounter.subscribeDone.longValue(),
frCounter.inboundNext.longValue(),
frCounter.inboundNextFastPath.longValue(),
frCounter.inboundNextDrain.longValue(),
frCounter.inboundComplete.longValue(),
frCounter.inboundCompleteProcessed.longValue(),
frCounter.inboundCompleteProcessedFastPath.longValue(),
frCounter.inboundCompleteProcessedDrain.longValue(),
frCounter.inboundError.longValue(),
frCounter.inboundErrorProcessed.longValue(),
frCounter.drainReceiverCompletedTrue.longValue(),
frCounter.drainReceiverNull.longValue(),
frCounter.drainOnNext.longValue(),
frCounter.terminateWrongReceiver.longValue(),
frCounter.terminateReceiverComplete.longValue(),
frCounter.terminateReceiverError.longValue(),
frCounter.unsubscribe.longValue(),
frCounter.cancelReceiver.longValue());
}
});
printer.setDaemon(true);
printer.start();
}
}
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 28 (12 by maintainers)
@evgenyvsmirnov
0.9.14.RELEASE
is available@violetagg seems it helped. Thanks a lot!
@violetagg not 100 percent sure. Need to give it an acid test on the aforementioned 1 processor/12 core server. Will do it on Monday.