quarkus: Reactive Messaging (Kafka) - Application exit on dns issues (need lazy consumers/producers)
Description
Hey, i’m running a quarkus application using platform version 2.10.2.Final
,
my application logic receive messages and then dispatches them to kafka producer.
I’m having an issue that although message dispatch is optional in my application, the application fails to start due to some DNS issues we are having from time to time in the cluster.
MRE
This is the configuration
# Kafka general configs
kafka.bootstrap.servers=${SOME_DOMAIN_URI}
kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# SmallRye - Kafka
mp.messaging.outgoing.channel-name.connector=smallrye-kafka
mp.messaging.outgoing.channel-name.topic=topic-name
When i’m starting my application, sometime KafkaProducer creation fails and i’m receiving the following stacktrace
2022-08-09 15:40:21,225 ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:442)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
at com.app.AppMain.main(AppMain.kt:11)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
... 46 more
2022-08-09 15:40:21,372 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
at com.app.AppMain.main(AppMain.kt:11)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)
it happens because we’re having issue to resolve ip from ${SOME_DOMAIN_URI}
As kafka is not a mandatory part of our application, I would prefer to see these errors while dispatching the events, I would want the entire application to crash in such cases. I do understand why general KafkaProducer configurations errors should crash the application (for example bad port was supplied), but I’d expect some configuration to make it more fault tolerant regarding DNS resolving.
Implementation ideas
Swallow and log config exceptions at https://github.com/quarkusio/quarkus/blob/main/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java#L41 or at least supply some configuration to do so.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 2
- Comments: 19 (12 by maintainers)
Closing this one.