smallrye-reactive-messaging: ClassCastException in KafkaRecordStream

Hi, I am currently migrating a kafka consumer service from Quarkus 1.13.7.Final to 2.2.2.Final and have possibly found a bug in your class KafkaRecordStream.

Quarkus 2.2.2 uses smallrye-reactive-messaging-kafka version 3.9.1 (quarkus 1 uses some 2.x version).

In KafkaRecordStream line 59 in its private class KafkaRecordStreamSubscription the value of max.poll.record is cast to an Integer if present, else set to default 500.

When setting max.poll.records=50(or any value) in my application.properties, the value in KafkaConnectorIncomingConfiguration is somehow interpreted as a String and hence the ClassCastException is thrown:

ERROR [io.qua.mut.run.MutinyInfrastructure] (smallrye-kafka-consumer-thread-0) Mutiny had to drop the following exception: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
        at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream$KafkaRecordStreamSubscription.<init>(KafkaRecordStream.java:79)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream.subscribe(KafkaRecordStream.java:41)
        at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:40)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToMulti$FlatMapPublisherSubscriber.onItem(UniOnItemTransformToMulti.java:127)
        at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:36)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

A screenshot of my debugger. Bildschirmfoto 2021-09-13 um 15 53 58

Temporary workaround is obviously not setting max.poll.records. But can you confirm that this is indeed an issue in smallrye-reactive-messaging and whether it will be fixed?

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 23 (20 by maintainers)

Commits related to this issue

Most upvoted comments

Sorry,m it won’t reproduce it. I may be found the problem, and it’s not a conversation problem (at least it’s not obvious). This configuration fails:

kafka.max.poll.records=50

It fails with:

java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
	at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream$KafkaRecordStreamSubscription.<init>(KafkaRecordStream.java:79)
	at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream.subscribe(KafkaRecordStream.java:41)
	at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:40)

This configuration works:

mp.messaging.incoming.foo.max.poll.records=50

You may wonder how does that matter? The kafka.x attributes are extracted by Quarkus which creates a Map<String, ?>. This map is then used in the connector to configure the Kafka client. To do that, the configuration from SR Config and the maps are merged, and I’m pretty sure the problem from from there…