moquette: Dropping session queues leaks netty ByteBufs, resulting in OOM

Expected behavior

Steps described below should not result in Moquette OOM

Actual behavior

After a couple hours, Moquette is unable to allocate memory for incoming PUBLISH messages.

[nioEventLoopGroup-3-6] ERROR NewNettyMQTTHandler exceptionCaught 93  - Unexpected exception while processing MQTT message. Closing Netty channel. CId=pubclient
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 4294967296, max: 4294967296)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:776)
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204)
	at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:188)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:138)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:128)
	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:378)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
	at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
	at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce

OOM can be reproduced using two clients.

Client A publishes to topic at QoS 1 at a high TPS

Client B subscribes to topic at QoS 1.

Client B is a slow device. This can either be a slow physical device (e.g. Raspberry Pi) or simulate a slow device by artificially delaying PUBACKs. Client B will connect and disconnect, alternating between clean and persistent sessions.

To accelerate reproduction, I also reduced INFLIGHT_WINDOW_SIZE from 10 to 1.

Minimal yet complete reproducer code (or URL to code) or complete log file

Publisher code:

#!/usr/bin/python3

import time
import paho.mqtt.client as mqtt

mqttc = mqtt.Client(client_id="pubclient")
mqttc.connect("localhost", 1883, 60)
mqttc.loop_start()

while True:
    for i in range(100):
        mqttc.publish("my/topic", "a" * 1024 * 4, 1)
    time.sleep(.1)

Subscriber code:

#!/usr/bin/python3

import time
import paho.mqtt.client as mqtt

mqttc = mqtt.Client()
clean_session=True
while True:
    mqttc.reinitialise(client_id="sub_client", clean_session=clean_session)
    clean_session = not clean_session

    mqttc.connect("localhost", 1883, 60)
    mqttc.loop_start()
    mqttc.subscribe("my/topic", 1)

    time.sleep(5)
    mqttc.disconnect()
    mqttc.loop_stop()
    time.sleep(.5)

I suspect this leak occurs when a client connects using a persistent session, disconnect, and reconnects using a clean session. When this happens, the SessionRegistry drops the session queue. Dropping the session queue then leaks all messages in the queue, unless I’m missing something.

Thoughts?

Moquette MQTT version

0.14

JVM version (e.g. java -version)

openjdk version “1.8.0_282” OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_282-b08) OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.282-b08, mixed mode)

OS version (e.g. uname -a)

Darwin 19.6.0 Darwin Kernel Version 19.6.0

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 74 (10 by maintainers)

Commits related to this issue

Most upvoted comments

FYI I’ve update PR #631 cherry-picking from your #637:

  • 2387afe0597993e5e83173e52e77627086b93edc
  • 01637bcc7a6d71543cfc42a41213a451bbd4132f
  • 7559f2c8b13b6dd89ccce02cd8f003e3195ab067
  • ac5d10b6c6e9f4f81b88a5ebc0573f188956fed2

Ah, one queue for multiple Sessions, interesting idea. How would that work? What would the lifecycle of these queues be?

I propose to have thread pool, with cardinality equals to the number of cores. Each thread in this pool has a queue. Each session is associated to one of these queues, using a modulo operation on the clientID so that all the commands for a session goes always to the same queue/thread. Each of these threads execute the logic related to command they pull from it’s commands queue.
The queues lives all the time that the Moquette process is up, the clean shutdown of Moquette would avoid to accept new commands on these queues and the let the pulling threads to consume the remaining commands. In this way no leakage happens, however in case of crash the full JVM hosting Moquette is killed and the memory buffers are freed as consequence of process death (by the OS).

If these queues are not permanent What do you mean for “permanent”?

Seeing a performance comparison between the two approaches would also be interesting. My plan is to create a PR with this new processing model and:

  • prove it solves the problems of leakage due to concurrency
  • have comparable performance to the actual implementation

Reference counting on the Session is an interesting idea!

Could we solve the handleConnectionLost() issue using reference counting? If we give the Session a reference to the SessionRegistry, then the Session can remove itself from the Registry when it terminates. This would be the cleanest solution, since only terminated Sessions should be removed from the Registry, and terminated sessions should always be removed from the Registry. The MQTTConnection could just release() the Session. For persistent Sessions, the registry itself can do a retain, so that the Session is not cleaned up when the connection releases the Session. For clean session, the session would be cleaned up as soon as the connection drops the session, and no other threads are in the publish process.