kafka-streams-dotnet: NullReferenceException after restoring state
Description
Topology with some statefull logic inside a Transformer (kafka-streams-dotnet version 1.4.0-RC3)
Topology starts up fine and populates the state. (entries appear on the underlying Kafka state topic)
When I would now kill the pod running the KafkaStreams topology, it restarts (all fine), KafkaStreams starts up all fine, but then all of a sudden a NullReferenceException happens when trying to manipulate the state-store.
(state-store = InMemoryKeyValueStore)
ERROR Streamiz.Kafka.Net.Processors.Internal.TaskManager - Failed to process stream task 0-1 due to the following error:
System.NullReferenceException: Object reference not set to an instance of an object.
at Streamiz.Kafka.Net.State.Logging.ChangeLoggingKeyValueBytesStore.Publish(Bytes key, Byte[] value)
at Streamiz.Kafka.Net.State.Logging.ChangeLoggingKeyValueBytesStore.Delete(Bytes key)
at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.<>c__DisplayClass27_0.<Delete>b__0()
at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func`1 actionToMeasure, Sensor sensor)
at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.Delete(K key)
My test topology consists out of following components:
- 1 source topic (4 partitions)
- simple filter
- transformer: just issues PUT and DELETE actions on the mounted state-store
- no output topic
When deploying multiple instances of the topology, eventually it makes it to running state and starts processing correctly (after a few crashes with above exception), so it looks like the producer lying underneath the state-stores does not get initialized in time when switching to “normal processing”.
This error also only happens when there is indeed state to restore. When starting the topology clean (no data in the state-stores yet), then everything works fine.
How to reproduce
I was not able to directly reproduce the above error on a local Kafka setup (on my PC). The above exception shows up on our Kubernetes cluster with Kafka running in SSL setup (with strict ACLs towards the clients) which might indicate a timing issue?
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 19 (10 by maintainers)
Commits related to this issue
- 🔥 NullReferenceException after restoring state #200 — committed to LGouellec/kafka-streams-dotnet by LGouellec 2 years ago
- 🎨 NullReferenceException after restoring state #200 — committed to LGouellec/kafka-streams-dotnet by LGouellec 2 years ago
- ✨ NullReferenceException after restoring state #200 — committed to LGouellec/kafka-streams-dotnet by LGouellec 2 years ago
Hi @LGouellec Thanks for the effort 👍
I quickly ran some tests:
After that I can see Task 2 processing data, but nothing on the other 2 tasks. They get initialized, but I don’t see them restoring state and starting to process data …
This was a very quick test … I’ll do some more thorough testing later …
@LGouellec “Instance 1 : topic1:0,topic1:1 (nothing change), topic1:2 switch from SUSPEND to RESUME and here I’m not sure that the task restore the changelog before processing new messages.”
That is exactly what I see happening. Instead of restoring state, messages from those partitions are still consumed, and added to the internal queue (up till 1000 entries and then it all halts). The processors for those partitions are not triggered, but it looks like the system is waiting for the restore of those partitions to complete while it has never started …
@duke-bartholomew ,
No, this TODO is just Streamiz does not support to subscribe restoration event from KafkaStream like JAVA do it.
But maybe the issue is based on this workflow :
Suppose the topic topic1 with 3 partitions
Instance 1 : topic1:0,topic1:1,topic1:2 restore and process. Everything ok
Scale up to 2, so Instance 1 : topic1:0,topic1:1 (nothing change), topic1:2 switch to SUSPEND state Instance 2 : topic1:2 restore and process Everything ok
Scale down, so Instance 1 : topic1:0,topic1:1 (nothing change), topic1:2 switch from SUSPEND to RESUME and here I’m not sure that the task restore the changelog before processing new messages.
I will try to reproduce this issue beginning next week.
Hope it’s clear
@duke-bartholomew ,
Yes exactly ! I will fix asap
@duke-bartholomew ,
I reproduced the issue locally. It’s not a state restoration problem. The null object is
context.RecordContextThe problem is the instance of the transformer = It’s the same instance for all partitions, so I think I have a concurrency problem.I will try to release a new RC this afternoon with a fix.