vertx-sql-client: [PostgreSQL] Connections are not returned/closed and max pool size is not honored.

Context

I would call this a follow up to #721 We’re using micronaut as the backend and stream data from the database to the client via REST, using flowables. The database code looks like this:

public Flowable<DomainObject> stream() {
    return pgPool
        .rxBegin()
        .toFlowable()
        .concatMap(
            tx -> {
              return tx.rxPrepare(query.sql())
                  .toFlowable()
                  .concatMap(
                      prepared ->
                          prepared
                              .createStream(10, query.parameters())
                              .toFlowable()
                              .map(
                                  row -> {
                                    return mapRowToDomainObject(row);
                                  }))
                  .doOnError(e -> log.error("got error", e))
                  .doAfterTerminate(
                      () -> {
                        tx.commit();
                      });
            });
  }

If the HTTP request is cancelled, the connection is not returned to the pool and stays open indefinitely. The pool will grow and gradually drain the max connection limit on our postgresql database. The doOnError callback is never called so I don’t see a way to work around this on our side. I tried to manage the connection myself but ran into other troubles and gave up on that.

Do you have a reproducer?

Nothing but the code above.

Steps to reproduce

Create a long running query and stream the results via micronaut. While the HTTP Request is in flight, kill it. Then execute select usename, count(*) from pg_stat_activity; in postgres and see the connection hang.

Extra

I tried playing around with the max pool size, max wait queue etc. The problem persists.

Stacktrace

java.lang.IllegalStateException: No current cursor read
at io.vertx.sqlclient.impl.CursorImpl.hasMore(CursorImpl.java:54)
at io.vertx.sqlclient.impl.RowStreamImpl.checkPending(RowStreamImpl.java:200)
at io.vertx.sqlclient.impl.RowStreamImpl.handle(RowStreamImpl.java:146)
at io.vertx.sqlclient.impl.RowStreamImpl.handle(RowStreamImpl.java:32)
at io.vertx.sqlclient.impl.SqlResultHandler.complete(SqlResultHandler.java:98)
at io.vertx.sqlclient.impl.SqlResultHandler.handle(SqlResultHandler.java:87)
at io.vertx.sqlclient.impl.SqlResultHandler.handle(SqlResultHandler.java:33)
at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:239)
at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:86)
at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:101)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:357)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:78)
at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:138)
at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:226)
at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:86)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)

Version

mvn dependency:tree gives latest:

[INFO] +- io.micronaut.sql:micronaut-vertx-pg-client:jar:3.2.0:compile
[INFO] |  +- io.vertx:vertx-pg-client:jar:3.9.4:compile
[INFO] |  |  +- io.vertx:vertx-core:jar:3.9.4:compile
[INFO] |  |  |  \- io.netty:netty-resolver-dns:jar:4.1.53.Final:compile
[INFO] |  |  |     \- io.netty:netty-codec-dns:jar:4.1.53.Final:compile
[INFO] |  |  \- io.vertx:vertx-sql-client:jar:3.9.4:compile
[INFO] |  +- io.vertx:vertx-rx-java2:jar:3.9.4:compile
[INFO] |  |  \- io.vertx:vertx-rx-gen:jar:3.9.4:compile
[INFO] |  \- io.vertx:vertx-codegen:jar:3.9.4:compile
[INFO] |     +- com.fasterxml.jackson.core:jackson-core:jar:2.11.2:compile
[INFO] |     \- org.mvel:mvel2:jar:2.3.1.Final:compile

Settings

vertx:
    pg:
        client:
            max-size: 4
            idle-timeout: 5
            max-wait-queue-size: 4
            uri: jdbc:postgres....
            prepared-statement-max-cache-size: 1

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 1
  • Comments: 21 (13 by maintainers)

Most upvoted comments

After removing several subscribeOn/observeOn pieces we now have this stacktrace, even without killing the http request. This now happens every time, which drains the pool even faster than before and actually produces wrong json output (but that’s a micronaut problem)

WARN  i.n.util.concurrent.DefaultPromise - An exception was thrown by io.micronaut.http.netty.reactive.HandlerSubscriber$$Lambda$959/0x000000010083c840.operationComplete()
java.lang.IllegalStateException: null
at io.vertx.sqlclient.impl.SocketConnectionBase.schedule(SocketConnectionBase.java:135)
at io.vertx.sqlclient.impl.ConnectionPool$PooledConnection.schedule(ConnectionPool.java:117)
at io.vertx.sqlclient.impl.PreparedStatementImpl.lambda$closeCursor$5(PreparedStatementImpl.java:201)
at io.vertx.core.impl.SucceededFuture.onComplete(SucceededFuture.java:41)
at io.vertx.sqlclient.impl.PreparedStatementImpl.closeCursor(PreparedStatementImpl.java:197)
at io.vertx.sqlclient.impl.CursorImpl.close(CursorImpl.java:103)
at io.vertx.sqlclient.Cursor.close(Cursor.java:52)
at io.vertx.sqlclient.impl.RowStreamImpl.checkPending(RowStreamImpl.java:205)
at io.vertx.sqlclient.impl.RowStreamImpl.fetch(RowStreamImpl.java:110)
at io.vertx.sqlclient.impl.RowStreamImpl.fetch(RowStreamImpl.java:32)
at io.vertx.reactivex.impl.FlowableReadStream$1.request(FlowableReadStream.java:61)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
at io.reactivex.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:237)
at io.reactivex.internal.operators.flowable.FlowableDoFinally$DoFinallySubscriber.request(FlowableDoFinally.java:107)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
at io.reactivex.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:237)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.operators.flowable.FlowableDoFinally$DoFinallySubscriber.request(FlowableDoFinally.java:107)
at io.reactivex.internal.subscriptions.SubscriptionHelper.deferredRequest(SubscriptionHelper.java:219)
at io.reactivex.internal.subscribers.StrictSubscriber.request(StrictSubscriber.java:70)
at io.reactivex.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
at io.reactivex.internal.subscriptions.SubscriptionHelper.deferredRequest(SubscriptionHelper.java:219)
at io.reactivex.internal.subscribers.StrictSubscriber.request(StrictSubscriber.java:70)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscriptions.SubscriptionHelper.deferredRequest(SubscriptionHelper.java:219)
at io.reactivex.internal.subscribers.StrictSubscriber.request(StrictSubscriber.java:70)
at io.micronaut.http.netty.reactive.HandlerSubscriber.maybeRequestMore(HandlerSubscriber.java:215)
at io.micronaut.http.netty.reactive.HandlerSubscriber.lambda$onNext$0(HandlerSubscriber.java:177)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.util.concurrent.PromiseCombiner.tryPromise(PromiseCombiner.java:170)
at io.netty.util.concurrent.PromiseCombiner.access$600(PromiseCombiner.java:35)
at io.netty.util.concurrent.PromiseCombiner$1.operationComplete0(PromiseCombiner.java:62)
at io.netty.util.concurrent.PromiseCombiner$1.operationComplete(PromiseCombiner.java:44)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:431)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:898)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)

I created this test in Sql client test suite that reproduces it

  @Test
  public void testReproducer(TestContext ctx) {
    Async async = ctx.async();
    connector.accept(ctx.asyncAssertSuccess(tx -> {
      List<Future> futures = new ArrayList<>();
      for (int i = 0;i < 2;i++) {
        Promise promise = Promise.promise();
        futures.add(promise.future());
        tx.query("SELECT whatever from DOES_NOT_EXIST").execute(promise);
      }
      CompositeFuture.all(futures).onComplete(ar -> {
        if (ar.succeeded()) {
          tx.commit(ctx.asyncAssertSuccess(v -> {
            async.complete();
          }));
        } else {
          tx.rollback(ctx.asyncAssertSuccess(v -> {
            async.complete();
          }));
        }
      });
    }));
  }

indeed, there should be a check based on state that a connection is not closed twice on a rollback.

In Vert.x 4 there is a new construct withTransaction to simplify this and let the user not care of the actual transaction boundaries. That would not work with streams though, however we could have something similar as I explained above.

@chris-brace that is possible.

I’ll look at your reproducer and see if there is an actual bug (I think there might be).

I think that we should provide a method returning a stream that would automatically manage the transaction for you.

thanks we will investigate, does the code above reproduces consistently ?