moquette: Dropping session queues leaks netty ByteBufs, resulting in OOM
Expected behavior
Steps described below should not result in Moquette OOM
Actual behavior
After a couple hours, Moquette is unable to allocate memory for incoming PUBLISH messages.
[nioEventLoopGroup-3-6] ERROR NewNettyMQTTHandler exceptionCaught 93 - Unexpected exception while processing MQTT message. Closing Netty channel. CId=pubclient
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 4294967296, max: 4294967296)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:776)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204)
at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:188)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:138)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:128)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:378)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Steps to reproduce
OOM can be reproduced using two clients.
Client A publishes to topic
at QoS 1 at a high TPS
Client B subscribes to topic
at QoS 1.
Client B is a slow device. This can either be a slow physical device (e.g. Raspberry Pi) or simulate a slow device by artificially delaying PUBACKs. Client B will connect and disconnect, alternating between clean and persistent sessions.
To accelerate reproduction, I also reduced INFLIGHT_WINDOW_SIZE
from 10 to 1.
Minimal yet complete reproducer code (or URL to code) or complete log file
Publisher code:
#!/usr/bin/python3
import time
import paho.mqtt.client as mqtt
mqttc = mqtt.Client(client_id="pubclient")
mqttc.connect("localhost", 1883, 60)
mqttc.loop_start()
while True:
for i in range(100):
mqttc.publish("my/topic", "a" * 1024 * 4, 1)
time.sleep(.1)
Subscriber code:
#!/usr/bin/python3
import time
import paho.mqtt.client as mqtt
mqttc = mqtt.Client()
clean_session=True
while True:
mqttc.reinitialise(client_id="sub_client", clean_session=clean_session)
clean_session = not clean_session
mqttc.connect("localhost", 1883, 60)
mqttc.loop_start()
mqttc.subscribe("my/topic", 1)
time.sleep(5)
mqttc.disconnect()
mqttc.loop_stop()
time.sleep(.5)
I suspect this leak occurs when a client connects using a persistent session, disconnect, and reconnects using a clean session. When this happens, the SessionRegistry drops the session queue. Dropping the session queue then leaks all messages in the queue, unless I’m missing something.
Thoughts?
Moquette MQTT version
0.14
JVM version (e.g. java -version
)
openjdk version “1.8.0_282” OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_282-b08) OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.282-b08, mixed mode)
OS version (e.g. uname -a
)
Darwin 19.6.0 Darwin Kernel Version 19.6.0
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 74 (10 by maintainers)
Commits related to this issue
- Provides a manual test for #608 issues realted to buffer leaks in concurrency between publish and reconnections — committed to andsel/moquette by andsel 3 years ago
FYI I’ve update PR #631 cherry-picking from your #637:
I propose to have thread pool, with cardinality equals to the number of cores. Each thread in this pool has a queue. Each session is associated to one of these queues, using a modulo operation on the
clientID
so that all the commands for a session goes always to the same queue/thread. Each of these threads execute the logic related to command they pull from it’s commands queue.The queues lives all the time that the Moquette process is up, the clean shutdown of Moquette would avoid to accept new commands on these queues and the let the pulling threads to consume the remaining commands. In this way no leakage happens, however in case of crash the full JVM hosting Moquette is killed and the memory buffers are freed as consequence of process death (by the OS).
Reference counting on the Session is an interesting idea!
Could we solve the
handleConnectionLost()
issue using reference counting? If we give the Session a reference to the SessionRegistry, then the Session can remove itself from the Registry when it terminates. This would be the cleanest solution, since only terminated Sessions should be removed from the Registry, and terminated sessions should always be removed from the Registry. The MQTTConnection could justrelease()
the Session. For persistent Sessions, the registry itself can do a retain, so that the Session is not cleaned up when the connection releases the Session. For clean session, the session would be cleaned up as soon as the connection drops the session, and no other threads are in the publish process.