hibernate-reactive: Connection not closed when parent coroutine timeout

I wanted to give timeout when I used the connection through MutinySessionFactory. This is a reproduction code for reproducing a problem situation. In fact, Netty-based API servers are used with timeout, and Connection leaks occur. In this situation, pool hang forever.

@Test
fun testTimeout() {
        println("execute")
        try {
            runBlocking {
                withTimeout(Duration.ofMillis(150)) {
                    try {
                        val withStatelessSession = sessionFactory.withStatelessSession { session ->
                            val createNativeQuery = session.createNativeQuery<Long>("SELECT COUNT(1) FROM User")
                            createNativeQuery.singleResultOrNull
                        }
                        println("result : ${withStatelessSession.awaitSuspending()}")
                    } finally {
                        println("finally called")
                    }
                }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        }
        CountDownLatch(1).await()
}

expected action is to close connection as shown in the log below.

[] 2023-01-03 15:19:58,209 DEBUG [vert.x-eventloop-thread-0] o.h.r.j.i.LogicalConnectionMan: Initiating JDBC connection release from afterTransaction
[] 2023-01-03 15:19:58,210 TRACE [vert.x-eventloop-thread-0] o.h.r.p.i.SqlClientConnection: Connection closed: io.vertx.mysqlclient.impl.MySQLConnectionImpl@39e6b6f7

However, the actual operation did not close Connection.

execute
[] 2023-01-03 15:15:31,600 DEBUG [Test worker @coroutine#1] o.h.r.m.i.MutinySessionFactory: No existing open Mutiny.StatelessSession was found in the current Vert.x context: opening a new instance
[] 2023-01-03 15:15:31,632 DEBUG [vert.x-eventloop-thread-0] i.n.channel.DefaultChannelId: -Dio.netty.processId: 54046 (auto-detected)
[] 2023-01-03 15:15:31,634 DEBUG [vert.x-eventloop-thread-0] i.n.channel.DefaultChannelId: -Dio.netty.machineId: f8:4d:89:ff:fe:69:71:a4 (auto-detected)
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.numHeapArenas: 20
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.numDirectArenas: 20
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.pageSize: 8192
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.maxOrder: 9
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.chunkSize: 4194304
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.smallCacheSize: 256
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.normalCacheSize: 64
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.maxCachedBufferCapacity: 32768
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.cacheTrimInterval: 8192
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.cacheTrimIntervalMillis: 0
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.useCacheForAllThreads: false
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
[] 2023-01-03 15:15:31,653 DEBUG [vert.x-eventloop-thread-0] io.netty.buffer.ByteBufUtil: -Dio.netty.allocator.type: pooled
[] 2023-01-03 15:15:31,653 DEBUG [vert.x-eventloop-thread-0] io.netty.buffer.ByteBufUtil: -Dio.netty.threadLocalDirectBufferSize: 0
[] 2023-01-03 15:15:31,653 DEBUG [vert.x-eventloop-thread-0] io.netty.buffer.ByteBufUtil: -Dio.netty.maxThreadLocalCharBufferSize: 16384
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.maxCapacityPerThread: 4096
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.ratio: 8
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.chunkSize: 32
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.blocking: false
[] 2023-01-03 15:15:31,699 DEBUG [vert.x-eventloop-thread-0] i.netty.buffer.AbstractByteBuf: -Dio.netty.buffer.checkAccessible: true
[] 2023-01-03 15:15:31,699 DEBUG [vert.x-eventloop-thread-0] i.netty.buffer.AbstractByteBuf: -Dio.netty.buffer.checkBounds: true
[] 2023-01-03 15:15:31,699 DEBUG [vert.x-eventloop-thread-0] i.n.u.ResourceLeakDetectorFact: Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@78c7da6e
[] 2023-01-03 15:15:31,716 TRACE [vert.x-eventloop-thread-0] o.h.r.p.i.SqlClientConnection: Connection created: io.vertx.mysqlclient.impl.MySQLConnectionImpl@39e6b6f7
[] 2023-01-03 15:15:31,753 DEBUG [vert.x-eventloop-thread-0] o.h.s.i.StatisticsInitiator: Statistics initialized [enabled=false]
finally called
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 150 ms
	(Coroutine boundary)
	at com.TestService$name$1$1.invokeSuspend(TestService.kt:47)
	at com.TestService$name$1.invokeSuspend(TestService.kt:41)
	(Coroutine creation stacktrace)
	at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.TestService.name(TestService.kt:40)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 150 ms
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)
	at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)
	at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)
	at java.base/java.lang.Thread.run(Thread.java:1589)
[] 2023-01-03 15:15:31,765 DEBUG [vert.x-eventloop-thread-0] o.h.e.j.spi.SqlStatementLogger: SELECT COUNT(1) FROM User
[] 2023-01-03 15:15:31,787 DEBUG [vert.x-eventloop-thread-0] o.j.l.DelegatingBasicLogger: Result set row: 0
[] 2023-01-03 15:15:31,787 DEBUG [vert.x-eventloop-thread-0] org.hibernate.loader.Loader: Result row: 
[] 2023-01-03 15:15:31,791 DEBUG [vert.x-eventloop-thread-0] o.h.r.j.i.LogicalConnectionMan: Initiating JDBC connection release from afterTransaction

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 32 (19 by maintainers)

Commits related to this issue

Most upvoted comments

Note that I’ll backport the fix to Mutiny 1.x, I’m due a 1.9.0.

But then Clement, naively, that seems to make eventually() useless and misleading, since it can’t be used to do cleanup, which would seem to be its whole purpose. I would predict that almost everyone who calls that method expects it to be “always called”, and so almost every usage of it is actually broken.

Am I missing something here? What is the use case for a “finally” which sometimes doesn’t get called?

(The question about whether eventually should be called on cancellation is something we need to think about)

I think it would be better if you could first create a test case. This way we can see the error and make sure that the fix actually solves it.

You can check one of the many tests we have in the test folder for inspiration. Check the one extending BaseReactiveTest

Makes sense?

Yeah, I don’t think this is a problem, or at least, IIRC, it’s a problem we already know how to solve, and already have code for, isn’t that right @DavideD?

(FTR, you would have to solve the exact same problem, in the exact same way, if you did it via .onCancellation().call(), right?)

@namjug-kim probably not. Mutiny does not have the notion of context (this comes from Vert.x, not mutiny which is just an API).

eventually will be called from the thread that forwards a cancel() signal. Just like eventually is called from the thread that forwards an item or a failure.

Meanwhile you can use .onTermination().invoke(Functions.TriConsumer<T, Throwable, Boolean> consumer) to handle all the cases (item received, failure, or cancelled)

That is to say: it’s supposed to be a finally

in MutinySessionFactoryImpl#withSession theres no Uni#onCancellation, is this intended?

Mmmh… that’s a good point. I haven’t thought of it. I was expecting that using eventually would cover all the basis, but I haven’t tested what happens with onCancellation. We will have to check.