spring-cloud-sleuth: sleuth trace not working with Kstream

I have a stream processor written using spring-cloud-stream-binder-kafka-streams

@Slf4j
@EnableBinding(KafkaStreamsProcessor.class)
@RequiredArgsConstructor
@Component
public class SomeWorker extends BaseStream<SomePojo> {

    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public KStream<?, SomePojo> process(KStream<?, SomePojo> pojoObj) {
                     //stream processing
}

pom.xml <spring-cloud.version>Greenwich.SR1</spring-cloud.version>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

properties

#kafka stream properties
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.bindings.input.contentType=application/json
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=dedup-metric-worker
spring.cloud.stream.bindings.input.destination=split-metrics3
spring.cloud.stream.bindings.output.destination=cleaned-metrics9

Sleuth trace keys in MDC(X-B3-TraceId, X-B3-SpanId) are missing when logging. Is Kstream supported by sleuth? (the main feature of kafka streams)

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 24 (11 by maintainers)

Most upvoted comments

@t0il3ts0ap not sure how to interpret the šŸ˜• reaction on my above comment; You asked about log correlation with Kafka Streams; at present there’s two implementation for Spring + Kafka:

  1. spring-cloud-stream-binder-kafka-streams, which you’re using in your examples
  2. spring-kafka, which uses @EnableKafkaStreams to bootstrap & support the StreamsBuilder DSL

As you’ve found, approach 1 above, currently does not output your Span correlation IDs on any logged messages. Gary Russel above has suggested some hooks to work around that, and that might be a valuable route to pursue.

Above you mentioned possibly migrating to spring-kafka (approach 2 above). That at present currently also lacks log correlation out of the box. Now Jorge Otoya has done some work recently to enable Trace forwarding and log correlation with an explicit KafkaStreamsTracing wrapper in brave-instrumentation-kafka-streams.

For my part; I’m taking the wrapper created by Jorge, and adding auto configuration for that in Sleuth https://github.com/spring-cloud/spring-cloud-sleuth/pull/1365. Add that configuration class on your classpath, and invoke the wrapper with something like:

kstream.transform(kafkaStreamsTracing.peek("my-named-span",
    (k, v) -> LOGGER.info("This message will have log correlation IDs; key: {}, value: {}", k, v)));

That will give your logged message Sleuth Trace/Span correlation IDs in the log output!

There’s more operations you can wrap, and an ongoing discussion to follow on whether this should be part of Sleuth, but this should at least get you going. Hope it helps!

I opened an issue against the binder; link above.

Yes, I think it would need hooks in the spring-cloud-stream Kafka Streams binder.

The binder builds the ā€œstartā€ and ā€œendā€ of the KStream, with the app dev providing the ā€œmiddleā€ of the stream topology.

I think the binder would need to provide a hook to allow the insertion of the processor/transformer at the appropriate points.

/cc: @sobychacko @sabbyanandan

@timtebeek Thanks for the feedback. I guess for the time being we will leave the binder with the explicit options mentioned above for instrumentation. I know it is not ideal, but the more implicit strategy requires a bit more effort and I am afraid it may not fit with our current timeline for Spring Cloud Stream Horsham release. Therefore, I leave the issue in the binder open and moved to the backlog. Contributions there are welcomed though:-)

I closed the issue as stream instrumentation is possible using spring-kafka though not fully automated yet. Please reopen if necessary.

Concluding the summary of issue: If you are using A) kafka streams via spring-cloud-stream Instrumentation is not possible until binders hooks that allow insertion of processor/transformer are supported.

B)kafka streams via spring-kafka Polling instrumentation will start happening automatically ( visible on zipkin dashboard ) once milestone 2.2.0.M1 is released. Yet the logs wont have tracer id unless transform/peek or other fns combination ( as described above) is used manually.

@t0il3ts0ap for the spring-kafka instrumentation of Kafka Streams there’s a start in https://github.com/spring-cloud/spring-cloud-sleuth/pull/1365 At present that also lacks the log correlation, so migrating to that got your project would not help you much yet I suppose. But perhaps support can be expanded either way, as neither option right now seems to have to correlation.