spring-cloud-sleuth: Spring Cloud 2020.0.4 breaks Spring Cloud Stream Kafka AVRO Message Conversion when Sleuth is on the classpath

Describe the bug When Upgrading from Spring Cloud 2020.0.3 to 2020.0.4 AVRO message conversion is ignored in Spring Cloud Stream Kafka (Kafka 2.7) giving the following error:

2021-10-28 15:43:47.868 ERROR [spring-cloud-webflux-avor-source-only,14b2b7aa4a15f2e6,14b2b7aa4a15f2e6] 28075 --- [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [25d7b7f7-1]  500 Server Error for HTTP GET "/dave"

org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler example.DaveController#dave() [DispatcherHandler]
	|_ checkpoint ⇢ org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP GET "/dave" [ExceptionHandlingWebHandler]
Stack trace:
		at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:96) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1245) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1056) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:509) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:125) ~[spring-cloud-sleuth-instrumentation-3.0.4.jar:3.0.4]
		at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:41) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:257) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:214) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136) ~[spring-cloud-stream-3.1.4.jar:3.1.4]

Which looks like the avro message converters are being ignored for the Jackson ones.

To solve the error

  1. downgrade Spring Cloud Stream to 3.1.3
  2. removing Spring Cloud Sleuth as a dependency
  3. downgrade to Spring Cloud 2020.0.3

Sample https://github.com/davidmelia/spring-boot-webflux-avro-source-only

If you compile and run this project (Assumes a local confluent schema registry on http://localhost:8081 and kafka on localhost:9092) and hit http://localhost:8080/dave this sends a message to kafka. You will see the above error.

Downgraded to spring cloud stream 3.1.3 (uncomment spring-cloud-stream-dependencies in the pom.xml) OR removing Spring Cloud Sleuth in the pom.xml OR downgrading to Spring Cloud 2020.0.3 fixes this problem.

I previously raised on Spring Cloud Sleuth but after realising that downgrading spring cloud stream fixes the problem I am not sure where the problem lies (https://github.com/spring-cloud/spring-cloud-sleuth/issues/2048)

Thanks

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 16 (7 by maintainers)

Most upvoted comments

@davidmelia I’ll talk to @marcingrzejszczak tomorrow. I know there was a lot of work in TraceFunctionAroundWrapper in sleuth and will be available with RC tomorrow, but given that all that is cutting edge that flag is specifically for that purpose. Doesn’t mean that there is no issue, just means we still need to dig in and see what’s going on, but for now you have simply disabled that interceptor and helped identify that there is indeed an issue in sleuth, so thank you!