azure-sdk-for-java: [BUG] NullPointerException when creating Spring-Cloud-Stream project with multiple Service-Bus binders and using namespace
Describe the bug
When creating a Spring-Cloud-Stream project with multiple Service-Bus binders (following the example in this repo), when using namespace instead of connection-string to the additional binder, a NullPointerException is thrown.
Exception or Stack Trace
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'queueClientFactory' defined in com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusQueueAutoConfiguration: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory]: Factory method 'queueClientFactory' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:655)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:635)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1336)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1176)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:556)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:516)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:324)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:322)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:897)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:175)
at org.springframework.cloud.stream.function.FunctionConfiguration$1.lambda$afterPropertiesSet$0(FunctionConfiguration.java:193)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:97)
at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelKeys(AbstractMessageProcessingRouter.java:84)
at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:203)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:177)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:102)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.lambda$doStart$1(ReactiveStreamsConsumer.java:177)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:439)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:526)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory]: Factory method 'queueClientFactory' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:650)
... 57 common frames omitted
Caused by: java.lang.NullPointerException: null
at com.microsoft.azure.spring.cloud.autoconfigure.servicebus.ServiceBusUtils.getNamespace(ServiceBusUtils.java:9)
at com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusQueueAutoConfiguration.queueClientFactory(AzureServiceBusQueueAutoConfiguration.java:57)
at com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusQueueAutoConfiguration$$EnhancerBySpringCGLIB$$b0f64529.CGLIB$queueClientFactory$0(<generated>)
at com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusQueueAutoConfiguration$$EnhancerBySpringCGLIB$$b0f64529$$FastClassBySpringCGLIB$$6617ee6d.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
at com.microsoft.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusQueueAutoConfiguration$$EnhancerBySpringCGLIB$$b0f64529.queueClientFactory(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 58 common frames omitted
To Reproduce
Create a project with multiple binders that use namespace and not a connection string. Example application.yaml:
spring:
cloud:
azure:
msi-enabled: true
resource-group: my-resource-group
region: us-west
subscription-id: 11111111-1111-1111-1111-111111111111
service-bus:
namespace: my-namespace-1
stream:
function:
definition: produce
binders:
queue-1-binder:
type: servicebus-queue
defaultCandidate: false
environment:
spring:
cloud:
azure:
msi-enabled: true
resource-group: my-resource-group
region: us-west
subscription-id: 11111111-1111-1111-1111-111111111111
service-bus:
namespace: my-namespace-2
bindings:
produce-out-0:
destination: queue-a-0
my-producer-output-1:
destination: queue-a-1
binder: queue-1-binder
Code Snippet
It seems that when creating the additional binder, in AzureServiceBusQueueAutoConfiguration.java, resourceManagerProvider is null, and therefore the else clause is triggered and the connection-string is checked:
if (resourceManagerProvider != null) {
clientFactory.setResourceManagerProvider(resourceManagerProvider);
clientFactory.setNamespace(serviceBusProperties.getNamespace());
} else {
TelemetryCollector.getInstance().addProperty(SERVICE_BUS_QUEUE, NAMESPACE,
ServiceBusUtils.getNamespace(connectionString));
}
Then in ServiceBusUtils there’s an assumption that the connection-string cannot be null:
public static String getNamespace(String connectionString) {
String prefix = "Endpoint=sb://";
int start = connectionString.indexOf(prefix) + prefix.length();
return connectionString.substring(start, connectionString.indexOf('.', start));
}
Expected behavior The binder should connect just like when using a connection-string.
Setup (please complete the following information):
- OS: Tested on AKS
- Version of the Library used: 1.2.7, 1.2.8
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 23 (15 by maintainers)
The reason why the application cannot function in AKS is that ConfigMap is used. The way Spring Cloud Stream handles multiple binders is to create additional context, and the properties set to the binder will be passed to the context as CommandLine arguments, which by default inherits environment from ApplicationContext. The default value for inheritEnvironment of BinderProperties is
true. So When ConfigMap starter adds ConfigMapPropertySource as the first propertySource in the list, it will shadow the command line arguments passed to the additional context.So if we configure the application like this:
when creating the context for
additional-binder-1, it will pick up the first eventhub connection string from the configmap, which has the highest priority, instead of reading the second eventhub connection string from command line arguments.To avoid such case, we could configure the environment all in binder properties.
But due to our implementation’s limitation, this still couldn’t work. So the PR https://github.com/Azure/azure-sdk-for-java/pull/17066 will fix the throwing exception issue.
@avpines
@avpines sorry for the late response, it was Chinese National Day holiday. We’ve just figured out what was wrong with our nightly built pipeline and it shall be fixed soon. I’ll update you once it’s done.
@avpines We are looking into issue and will provide you update soon. Appreciate you providing us details here.