quarkus: Reactive Messaging (Kafka) - Application exit on dns issues (need lazy consumers/producers)

Description

Hey, i’m running a quarkus application using platform version 2.10.2.Final, my application logic receive messages and then dispatches them to kafka producer. I’m having an issue that although message dispatch is optional in my application, the application fails to start due to some DNS issues we are having from time to time in the cluster.

MRE

This is the configuration

# Kafka general configs
kafka.bootstrap.servers=${SOME_DOMAIN_URI}
kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# SmallRye - Kafka
mp.messaging.outgoing.channel-name.connector=smallrye-kafka
mp.messaging.outgoing.channel-name.topic=topic-name

When i’m starting my application, sometime KafkaProducer creation fails and i’m receiving the following stacktrace

2022-08-09 15:40:21,225 ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:442)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
        at com.app.AppMain.main(AppMain.kt:11)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
        ... 46 more

2022-08-09 15:40:21,372 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
        at com.app.AppMain.main(AppMain.kt:11)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)

it happens because we’re having issue to resolve ip from ${SOME_DOMAIN_URI} As kafka is not a mandatory part of our application, I would prefer to see these errors while dispatching the events, I would want the entire application to crash in such cases. I do understand why general KafkaProducer configurations errors should crash the application (for example bad port was supplied), but I’d expect some configuration to make it more fault tolerant regarding DNS resolving.

Implementation ideas

Swallow and log config exceptions at https://github.com/quarkusio/quarkus/blob/main/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java#L41 or at least supply some configuration to do so.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 2
  • Comments: 19 (12 by maintainers)

Most upvoted comments

Closing this one.