smallrye-reactive-messaging: ClassCastException in KafkaRecordStream
Hi, I am currently migrating a kafka consumer service from Quarkus 1.13.7.Final to 2.2.2.Final and have possibly found a bug in your class KafkaRecordStream.
Quarkus 2.2.2 uses smallrye-reactive-messaging-kafka version 3.9.1 (quarkus 1 uses some 2.x version).
In KafkaRecordStream line 59 in its private class KafkaRecordStreamSubscription the value of max.poll.record is cast to an Integer if present, else set to default 500.
When setting max.poll.records=50(or any value) in my application.properties, the value in KafkaConnectorIncomingConfiguration is somehow interpreted as a String and hence the ClassCastException is thrown:
ERROR [io.qua.mut.run.MutinyInfrastructure] (smallrye-kafka-consumer-thread-0) Mutiny had to drop the following exception: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream$KafkaRecordStreamSubscription.<init>(KafkaRecordStream.java:79)
at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStream.subscribe(KafkaRecordStream.java:41)
at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:40)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToMulti$FlatMapPublisherSubscriber.onItem(UniOnItemTransformToMulti.java:127)
at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:36)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
A screenshot of my debugger.

Temporary workaround is obviously not setting max.poll.records. But can you confirm that this is indeed an issue in smallrye-reactive-messaging and whether it will be fixed?
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 23 (20 by maintainers)
Sorry,m it won’t reproduce it. I may be found the problem, and it’s not a conversation problem (at least it’s not obvious). This configuration fails:
It fails with:
This configuration works:
You may wonder how does that matter? The
kafka.xattributes are extracted by Quarkus which creates aMap<String, ?>. This map is then used in the connector to configure the Kafka client. To do that, the configuration from SR Config and the maps are merged, and I’m pretty sure the problem from from there…