kafka-streams-dotnet: Transformer not working as expected in 1.5.0-RC1

Description

Streamiz.Net version: 1.5.0-RC1 Apache Kafka version: 3.3.1 Operating System: Windows 10 Enterprise 21H2

I have a topology setup to ‘de-duplicate’ records from a topic. I use de-duplicate likely as they’re not technically duplicates, rather multiple events being raised from the data source for the same operation - to resolve this, I’ve got a transformer that will check the final record (key, value, headers) to ensure it hasn’t been seen yet and only proceed if it is a unique record. This worked in 1.4.2, and works when I build Streamiz.Net locally & use it as an assembly - I only get this issue when using the version that’s on NuGet.

I get that this is a pre-release version, and I can’t expect it to work flawlessly… however, it confuses me that it works fine when using it as an assembly vs through NuGet.

How to reproduce

Client Configuration:

return new()
{
    ApplicationId = _appSettings.KafkaApplicationId,
    BootstrapServers = _appSettings.KafkaBootstrapBaseUrl,
    SchemaRegistryUrl = _appSettings.KafkaSchemaRegistryUrl,
    AutoRegisterSchemas = false,
    SubjectNameStrategy = SubjectNameStrategy.Record,
    AutoOffsetReset = autoOffsetReset,
    DefaultKeySerDes = new StringSerDes(),
    DefaultValueSerDes = new StringSerDes()
};
  1. Setup a DeduplicationTransformer using this class: https://gist.github.com/attributeerror/5c11ce5bc757107d49d04a2da4fffe66
  2. The store should be initialised like so:
IStoreBuilder dedupStoreBuilder = Stores.WindowStoreBuilder(
    Stores.InMemoryWindowStore(
        storeName,
        windowSize,
        windowSize
    ),
    new StringSerDes(),
    new DedupStoreChangelogSerDes()
).WithLoggingDisabled();
  1. The topology looks like so:
builder.Stream("input-topic", new StringSerDes(), new StringSerDes())
.Transform(new TransformerBuilder<string, MappedEventValue, string, MappedEventValue>()
    .StateStore(dedupStoreBuilder)
    .Transformer<DeduplicationTransformer>(storeName, windowSize.TotalMilliseconds, _log, _messageTraceIdHandler)
    .Build()
)
.To("output-topic", new StringSerDes(), new SchemaAvroSerDes<BusinessEventWrapper>());

I unfortunately can’t provide a project that can be run as a lot of this stream processor is down to business logic. However, for the MappedEventValue class, it should contain “FinalKey” (string: the key to check for duplicates of) and “Headers” (Dictionary<string, string>: key-value pair of header keys & values) properties to be used by the de-duplication transformer.

Logs

info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[oracle-events-stream-processor-prtest]  Start creation of the stream application with this configuration:
        Stream property:
                client.id:
                num.stream.threads:     1
                default.key.serdes:     Streamiz.Kafka.Net.SerDes.StringSerDes
                default.value.serdes:   Streamiz.Kafka.Net.SerDes.StringSerDes
                default.timestamp.extractor:    Streamiz.Kafka.Net.Processors.Internal.FailOnInvalidTimestamp
                commit.interval.ms:     30000
                processing.guarantee:   AT_LEAST_ONCE
                transaction.timeout:    00:00:10
                poll.ms:        100
                max.poll.records:       500
                max.poll.restoring.records:     1000
                max.task.idle.ms:       0
                buffered.records.per.partition:         1000
                inner.exception.handler:        System.Func`2[System.Exception,Streamiz.Kafka.Net.ExceptionHandlerResponse]
                production.exception.handler:   System.Func`2[Confluent.Kafka.DeliveryReport`2[System.Byte[],System.Byte[]],Streamiz.Kafka.Net.ExceptionHandlerResponse]
                deserialization.exception.handler:      System.Func`4[Streamiz.Kafka.Net.ProcessorContext,Confluent.Kafka.ConsumeResult`2[System.Byte[],System.Byte[]],System.Exception,Streamiz.Kafka.Net.ExceptionHandlerResponse]
                rocksdb.config.setter:  System.Action`2[System.String,Streamiz.Kafka.Net.State.RocksDb.RocksDbOptions]
                follow.metadata:        False
                state.dir:      C:\Users\3966\AppData\Local\Temp\streamiz-kafka-net
                replication.factor:     1
                windowstore.changelog.additional.retention.ms:  86400000
                offset.checkpoint.manager:
                metrics.interval.ms:    30000
                metrics.recording.level:        INFO
                log.processing.summary:         00:01:00
                metrics.reporter:       System.Action`1[System.Collections.Generic.IEnumerable`1[Streamiz.Kafka.Net.Metrics.Sensor]]
                expose.librdkafka.stats:        False
                start.task.delay.ms:    5000
                parallel.processing:    False
                max.degree.of.parallelism:      8
                application.id:         oracle-events-stream-processor-prtest
                schema.registry.url:    [redacted]
                avro.serializer.auto.register.schemas:  False
                protobuf.serializer.auto.register.schemas:      False
                avro.serializer.subject.name.strategy:  Record
                protobuf.serializer.subject.name.strategy:      Record
        Client property:
                bootstrap.servers:      [redacted]
                debug:  broker,topic,msg,consumer,cgrp,topic,fetch
        Consumer property:
                max.poll.interval.ms:   300000
                enable.auto.commit:     False
                enable.auto.offset.store:       False
                partition.assignment.strategy:  cooperative-sticky
                auto.offset.reset:      earliest
        Producer property:
                partitioner:    murmur2_random
        Admin client property:
                None

info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[oracle-events-stream-processor-prtest-f51c3678-b91d-47e4-9586-b059873acd4a-stream-thread-0] Creating shared producer client
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[oracle-events-stream-processor-prtest-f51c3678-b91d-47e4-9586-b059873acd4a-stream-thread-0] Creating consumer client
info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[oracle-events-stream-processor-prtest] State transition from CREATED to REBALANCING
info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[oracle-events-stream-processor-prtest] Starting Streams client with this topology : Topologies:
         Sub-topology: 0
          Source: KSTREAM-SOURCE-0000000000 (topics: [Oracle.EBS.Events])
            --> KSTREAM-PEEK-0000000001
          Processor: KSTREAM-PEEK-0000000001 (stores: [])
            --> KSTREAM-PEEK-0000000002
            <-- KSTREAM-SOURCE-0000000000
          Processor: KSTREAM-PEEK-0000000002 (stores: [])
            --> KSTREAM-MAPVALUES-0000000003
            <-- KSTREAM-PEEK-0000000001
          Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
            --> KSTREAM-FILTER-0000000004
            <-- KSTREAM-PEEK-0000000002
          Processor: KSTREAM-FILTER-0000000004 (stores: [])
            --> KSTREAM-TRANSFORM-0000000005
            <-- KSTREAM-MAPVALUES-0000000003
          Processor: KSTREAM-TRANSFORM-0000000005 (stores: [dedup-store])
            --> KSTREAM-TRANSFORM-0000000006
            <-- KSTREAM-FILTER-0000000004
          Processor: KSTREAM-TRANSFORM-0000000006 (stores: [])
            --> KSTREAM-PEEK-0000000007
            <-- KSTREAM-TRANSFORM-0000000005
          Processor: KSTREAM-PEEK-0000000007 (stores: [])
            --> KSTREAM-SINK-0000000008
            <-- KSTREAM-TRANSFORM-0000000006
          Sink: KSTREAM-SINK-0000000008 (extractor class: Streamiz.Kafka.Net.Processors.Internal.WrapperTopicNameExtractor`2[[System.String, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[Numatic.Apache.Avro.Schemas.Oracle.BusinessEventSystem.BusinessEventWrapper, Numatic.Libraries.Infrastructure.Repositories.Kafka, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null]])
            <-- KSTREAM-PEEK-0000000007

info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[oracle-events-stream-processor-prtest-f51c3678-b91d-47e4-9586-b059873acd4a-stream-thread-0] Starting
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[oracle-events-stream-processor-prtest-f51c3678-b91d-47e4-9586-b059873acd4a-stream-thread-0] State transition from CREATED to STARTING
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[oracle-events-stream-processor-prtest-f51c3678-b91d-47e4-9586-b059873acd4a-stream-thread-0] State transition from STARTING to PARTITIONS_ASSIGNED
info: Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener[0]
      Partition assignment took 00:00:00.0640065 ms.
        Currently assigned active tasks: 0-0
        Revoked assigned active tasks:

info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[oracle-events-stream-processor-prtest-f51c3678-b91d-47e4-9586-b059873acd4a-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING
info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[oracle-events-stream-processor-prtest] State transition from REBALANCING to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[0|0] Task 0-0 state transition from CREATED to RUNNING

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>(“topic”).to(“an-another-topic”)😉
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 41 (41 by maintainers)

Most upvoted comments

Hi @LGouellec - I think the problem was due to using an event store as I realise I never received one of my debug logs for when the store was initialised. I’ve gone through a different route of storing seen events & the stream processor is now working as expected.

Can’t say I’ve gotten to the bottom of it, but I’ve now got the functionality working how I want it.

I appreciate all of your help!

Will do, I’m done for the weekend now so I’ll follow back up with this Monday morning when I have a moment.

That’s going to be my next step - I appreciate your help in diagnosing the issue. Thank you!

@LGouellec Nope, no logs besides rebalancing. Appreciate the response, I’ll take a look at Confluent’s operator.

Ok weird ! I close this ticket. Feel free to open a new one if you have another issue.

I will plan to release the 1.5 as a final release, beginning of next week.