goka: ctx.Value() and View#Get(ctx.Key()) constantly do not match
From my understanding, when you run ctx.SetValue(x) then x is set in the table then emitted to Kafka.
That means, that when a subsequent messages comes in to a goka.Input ProcessCallback, then the ctx.Value() should be already set to the latest value emitted. Which means also if I initialize a view as such
view, err := goka.NewView([]string{localhost:9092}, goka.GroupTable("some-group"), new(SomeCodec)
and, in the ProcessCallback I call
view.Get(ctx.Key)
I should be equal to the return of ctx.Value()
I have a case where that is not the case. I have a group with 5 inputs and one persist. It is having all kinds of stale reads. For example.
- Message 1 from topic 1 comes in gets persisted
- Message 2 from topic 2 comes in,
ctx.Value()returnsnilso fallback toview.Get(ctx.Key)which returns a value from the table that I can work with. It gets persisted. - Message 3 comes in,
ctx.Value()returns the result of the persistence of message 1 but not message 2.view.Get(ctx.Key)returns the result of persistence of message 1 & message 2.
Any ideas why is this inconsistency present? Is it a bug? Or am I missing something here?
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 20 (9 by maintainers)
Commits related to this issue
- Systemtest to demonstrate multi input streams consistency (issue #332) — committed to lovoo/goka by frairon 3 years ago
Fabian isn’t probably the right guy to ask here 😄
Anyway, Kafka itself doesn’t determine the partition, it’s the producer that defines the partition. If you are in doubt that the partition is correct, you can always use the loopback, e.g. like this:
That however is cumbersome if you have multiple input topics with different codecs, because they are all channeled into the same loopback-topic where you have to unmangle the different types again (also, you can only have one codec here). If you do have multiple types and input topics, you can also create like a proxy-processor, where each input-topic is forwarded with
ctx.Output(..)to a “correct-partition”-output-topic, where the other processor then consumes from.Hope that answers your question.
No worries, take your time. I’m just thinking it’s either a design error or something about the partitioning. It should be easy, reliable and no need for workarounds, because there really isn’t any magic going on here 😃
Using Kafka’s default partitioner is probably actually be root of the problem though! In goka, we use this hasher to emit, and the same hasher here to figure out the partition to read from a view. Kafka however uses murmur2, as stated here. Now let’s consider incoming key
X, sent topartition-0by murmur2-hashing, then in the callback,ctx.Value(), will read what was last written to the value topartition-0. However, if we callctx.SetValue(), it will be stored in - let’s assume -partition-1, determined by goka’s hasher. Then on the next message with key ‘X’ (which will be sent topartition-0again), the next call toctx.Value()will not return expected value. Also the value read from a view will be different, because it will read from the partition goka assumes the value would be stored.Actually that makes me think we should have a warning (or even a fail the processor?) if the consume-callback tries to do a
ctx.SetValue()with a message where the outgoing partition wouldn’t match with the incoming partition. Then this problem would be obvious right away.Ok anyway, here’s a simple solution using loopback. This only works if both the callbacks and the codecs are the same for all input topics. If not, you have to pack the message back into a new type for the loopback so the callback can later figure out where the message was originally sent to.
Suppose this is the current group graph.
If we now add a loop edge, it will “fix” the partition by emitting the value once again to kafka
(haven’t compiled the code, hope it works).
Sorry for the long text. Good luck 😃
sorry for the long delay @EvgeniaMartynova-thebeat. Your assumptions are right though.