kafka-streams-dotnet: NullReferenceException after restoring state

Description

Topology with some statefull logic inside a Transformer (kafka-streams-dotnet version 1.4.0-RC3) Topology starts up fine and populates the state. (entries appear on the underlying Kafka state topic) When I would now kill the pod running the KafkaStreams topology, it restarts (all fine), KafkaStreams starts up all fine, but then all of a sudden a NullReferenceException happens when trying to manipulate the state-store. (state-store = InMemoryKeyValueStore)

ERROR Streamiz.Kafka.Net.Processors.Internal.TaskManager - Failed to process stream task 0-1 due to the following error:
System.NullReferenceException: Object reference not set to an instance of an object.
   at Streamiz.Kafka.Net.State.Logging.ChangeLoggingKeyValueBytesStore.Publish(Bytes key, Byte[] value)
   at Streamiz.Kafka.Net.State.Logging.ChangeLoggingKeyValueBytesStore.Delete(Bytes key)
   at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.<>c__DisplayClass27_0.<Delete>b__0()
   at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func`1 actionToMeasure, Sensor sensor)
   at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.Delete(K key)

My test topology consists out of following components:

  • 1 source topic (4 partitions)
  • simple filter
  • transformer: just issues PUT and DELETE actions on the mounted state-store
  • no output topic

When deploying multiple instances of the topology, eventually it makes it to running state and starts processing correctly (after a few crashes with above exception), so it looks like the producer lying underneath the state-stores does not get initialized in time when switching to “normal processing”.

This error also only happens when there is indeed state to restore. When starting the topology clean (no data in the state-stores yet), then everything works fine.

How to reproduce

I was not able to directly reproduce the above error on a local Kafka setup (on my PC). The above exception shows up on our Kubernetes cluster with Kafka running in SSL setup (with strict ACLs towards the clients) which might indicate a timing issue?

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 19 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Hi @LGouellec Thanks for the effort 👍

I quickly ran some tests:

  • starting the topology now works as expected: ✔️
  • rebalancing (scale up from 1 instance to 3 instances): ✔️
  • rebalancing (scale down from 3 instances to 1 instance) : 💥 Here I still see some issues. At first glance it looks like the state does not get restored on the reassigned partitions.
2022-11-18 16:45:51,699 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|1] Task 0-1 state transition from SUSPENDED to CREATED
2022-11-18 16:45:51,699 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|0] Task 0-0 state transition from SUSPENDED to CREATED
2022-11-18 16:45:51,699 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] State transition from RUNNING to PARTITIONS_ASSIGNED
2022-11-18 16:45:51,699 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.KafkaStream - stream-application[dummy.parkingprocessor-dotnet] State transition from RUNNING to REBALANCING
2022-11-18 16:45:51,699 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition assignment took 00:00:00.0059467 ms.
	Currently assigned active tasks: 0-0,0-1,0-2
	Revoked assigned active tasks: 

2022-11-18 16:45:51,808 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  ParkingSessions.NET.ObservationTimeExtractor - extract timestamp - tpo: dummy.parkingspot.events:[1]@3796508, record-time: 1668789905488, partition-time: -1, observation-time: 1668789905000
2022-11-18 16:45:51,808 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  ParkingSessions.NET.ObservationTimeExtractor - extract timestamp - tpo: dummy.parkingspot.events:[0]@3798701, record-time: 1668789903226, partition-time: -1, observation-time: 1668789903000
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.KafkaStream - stream-application[dummy.parkingprocessor-dotnet] State transition from REBALANCING to RUNNING
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - Initializing to the starting offset for changelog dummy.parkingprocessor-dotnet-my-state-changelog [[0]] of in-memory state store my-state
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|0] Task 0-0 state transition from CREATED to RESTORING
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|0] Restoration will start soon.
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - Initializing to the starting offset for changelog dummy.parkingprocessor-dotnet-my-state-changelog [[1]] of in-memory state store my-state
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|1] Task 0-1 state transition from CREATED to RESTORING
2022-11-18 16:45:51,809 [dummy.parkingprocessor-dotnet-680afeeb-8e4e-43b8-bcfa-e1cac785c9c1-stream-thread-0] INFO  Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|1] Restoration will start soon.

After that I can see Task 2 processing data, but nothing on the other 2 tasks. They get initialized, but I don’t see them restoring state and starting to process data …

This was a very quick test … I’ll do some more thorough testing later …

@LGouellecInstance 1 : topic1:0,topic1:1 (nothing change), topic1:2 switch from SUSPEND to RESUME and here I’m not sure that the task restore the changelog before processing new messages.

That is exactly what I see happening. Instead of restoring state, messages from those partitions are still consumed, and added to the internal queue (up till 1000 entries and then it all halts). The processors for those partitions are not triggered, but it looks like the system is waiting for the restore of those partitions to complete while it has never started …

@duke-bartholomew ,

No, this TODO is just Streamiz does not support to subscribe restoration event from KafkaStream like JAVA do it.

But maybe the issue is based on this workflow :

Suppose the topic topic1 with 3 partitions

Instance 1 : topic1:0,topic1:1,topic1:2 restore and process. Everything ok

Scale up to 2, so Instance 1 : topic1:0,topic1:1 (nothing change), topic1:2 switch to SUSPEND state Instance 2 : topic1:2 restore and process Everything ok

Scale down, so Instance 1 : topic1:0,topic1:1 (nothing change), topic1:2 switch from SUSPEND to RESUME and here I’m not sure that the task restore the changelog before processing new messages.

I will try to reproduce this issue beginning next week.

Hope it’s clear

@duke-bartholomew ,

Yes exactly ! I will fix asap

@duke-bartholomew ,

I reproduced the issue locally. It’s not a state restoration problem. The null object is context.RecordContext The problem is the instance of the transformer = It’s the same instance for all partitions, so I think I have a concurrency problem.

I will try to release a new RC this afternoon with a fix.