camel-kafka-connector: High CPU Usage with RabbitMQ Connector (polling interface implemented incorrectly?)

Hi,

I’m using the Camel Rabbit MQ Connector as a source connector for my Kafka cluster. Kafka connect is running in a Docker container built with the following Dockerfile

FROM openjdk:8-jre-alpine

ARG VERSION_APACHE_KAFKA=2.2.1
ARG VERSION_SCALA=2.12
ARG VERSION_APACHE_CAMEL=0.4.0

RUN mkdir -p /kafka/connect/plugins && \
    mkdir -p /kafka/connect/app && \
    mkdir -p /kafka/connect/config && \
    wget https://archive.apache.org/dist/kafka/${VERSION_APACHE_KAFKA}/kafka_${VERSION_SCALA}-${VERSION_APACHE_KAFKA}.tgz -q -O kafka.tgz && \
    tar -xzf kafka.tgz -C /kafka/connect/app --strip 1 && \
    rm -f kafka.tgz && \
    wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-rabbitmq-kafka-connector/${VERSION_APACHE_CAMEL}/camel-rabbitmq-kafka-connector-${VERSION_APACHE_CAMEL}-package.zip -q -O rabbitmq-connector.zip && \
    unzip -qq rabbitmq-connector.zip -d /kafka/connect/plugins && \
    rm -f rabbitmq-connector.zip && \
    apk add -q --no-cache bash

ENTRYPOINT [ "/kafka/connect/entrypoint.sh" ]

When running this configuration and adding a RabbitMQ connector with the following configuration

{
  "name": "{{ config.connector_name }}",
  "config": {
    "connector.class": "org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector",
    "topics": "{{ config.topic }}",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "tasks.max": 1,
    "camel.component.rabbitmq.hostname": "{{ kafka_connect_rabbitmq_host }}",
    "camel.component.rabbitmq.portNumber": {{ kafka_connect_rabbitmq_port }},
    "camel.component.rabbitmq.username": "{{ kafka_connect_rabbitmq_user }}",
    "camel.component.rabbitmq.password": "{{ kafka_connect_rabbitmq_password }}",
    "camel.source.path.exchangeName": "{{ config.exchange }}",
    "camel.source.endpoint.exchangeType": "topic",
    "camel.source.endpoint.autoDelete": false,
    "camel.source.endpoint.queue": "{{ config.queue }}",
    "camel.source.endpoint.routingKey": "{{ config.routing_key }}"
  }
}

the connector task created by this configuration consumes all resources of its assigned CPU core.

I’ve looked at the thread dumps and activated TRACE logging and came up with the following conclusion:
The CamelSourceTask does not seem to properly implement the poll method of the abstract SourceTask class.

The JavaDoc for the poll method is as follows (source):

Poll this source task for new records. If no data is currently available, this method should block but return control to the caller regularly (by returning {@code null}) in order for the task to transition to the {@code PAUSED} state if requested to do so.

The task will be {@link #stop() stopped} on a separate thread, and when that happens this method is expected to unblock, quickly finish up any remaining processing, and return.

Looking at the implementation I cannot see how the contract w.r.t. “should block, but return control to the caller regularly” is fulfilled. Since consumer.receiveNoWait(); is called, exchanges are not read in a blocking manner and the subsequent code is not blocking either.

This causes the execute method of WorkerSourceTask to continuously call the poll method which immediately returns if no exchange is available (due to the break statement in the while loop) effectively creating an infinite loop which consumes all CPU resources.

I would be grateful if someone could take a look at my analysis. Maybe there is some configuration option that solves my issue, but I was not able to find it. Looking forward to hearing from anyone.

Best regards,

Tobias

P.S.: when TRACE logging is activated, the logs immediately show that there is some kind of resource-intensive loop:

2020-09-02T10:57:16.381+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records
2020-09-02T10:57:16.381+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka
2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records
2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka
2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records
2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka
2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records
2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka

The corresponding log statements can be found in the execute method of WorkerSourceTask

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 33 (25 by maintainers)

Commits related to this issue

Most upvoted comments

As soon as I add the connector - from running the setup script - there’s a big spike in both the GC activity as well as the CPU usage. I’m checking if the same thing happens under different settings / connectors / etc.

Screenshot from 2020-09-03 13-33-31

Really well done @orpiske . Can you open a PR with your updates?

Thanks @oscerd! That was a fun one to play with.

Yes I can. I will cleanup the patches a little bit and I’ll send one for review on Monday … Tuesday at worst. I also put some code in place to make it easier for us to debug problems like this the future.

And thanks a lot!

I would focus on understanding of this specific to rabbitmq connector/component