orleans: Talk about the stream (Feedback on the stream and QueueCacheMissException)
Continuing the theme https://github.com/dotnet/orleans/issues/844 at the request of @gabikliot, I give feedback. For starters, I would like to say I could be wrong in my statements because I may not know some of the nuances in the script and the principles of implementation or maybe I do not fully understand the work scenario.
Scenario example: lets says you subscribed at offset X, then produced event with sequence X, X was delivered to consumer, now it maybe be deleted from the cache, now you subscribe again to X from a different consumer, then you will get cache miss exception, rightfully, for the 2nd subscription.
Overall I completely agree with this scenario, but there are a number of issues.
During testing I noticed that if you receive an error QueueCacheMissException
I lose data flow (queue), based on the described scenario it is logical, because there is a false consumer that does not process messages, and all messages go nowhere
But is it right? How can I recover this data? Why not leave in the stream and in the queue until the consumer who will be able to handle them? As the message can be considered processed if it has not been processed?
About the subscription: Of course one the consumer needs to sign only once. But I faced a problem, what if multiple consumers subscribed to the stream, or rather how to understand what the current consumer I don’t have a subscription again? While I always work with only one consumer because to me it is important to follow FIFO So I want to show my subscription option with one consumer within the solution of the problem which I described https://github.com/dotnet/orleans/issues/844 Would welcome comments on the code and the answer to the question how to track the subscription of current consumer if you know what subscriptionHandles.Count > 1 ?
Consumer
public async override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider(PROVIDER_NAME);
_stream = streamProvider.GetStream<string>(Guid.Parse(GUID_STREAM), this.GetPrimaryKeyString());
var subscriptionHandles = await _stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
subscriptionHandles.ForEach(async x => await x.ResumeAsync(OnNextAsync));
}
public async Task SubscribeAsync()
{
var subscriptionHandles = await _stream.GetAllSubscriptionHandles();
if (subscriptionHandles.IsNullOrEmpty())
await _stream.SubscribeAsync(OnNextAsync);
}
Producer
private async Task<IAsyncStream<string>> GetStream(string consumerName)
{
var streamConsumerGrain = GrainFactory.GetGrain<IConsumerGrain>(consumerName);
await streamConsumerGrain.SubscribeAsync();
var streamProvider = GetStreamProvider(PROVIDER_NAME);
return streamProvider.GetStream<string>(Guid.Parse(GUID_STREAM), consumerName);
}
About this issue
- Original URL
- State: closed
- Created 9 years ago
- Reactions: 1
- Comments: 55 (29 by maintainers)
about 2 scenarios: why in the situation when two grain consumer processes one the same message you’re using to stream, but not for example
ObserverSubscriptionManager<IGrainObserver>
?As I touched on in my previous post. If there is some sort of state associated with a subscription (aggregates, routes, …?) you’ll need to persist this state associated with the subscription to re-associate it when rehydrating the grain and resuming the subscription. Serializable implementations of IAsyncObservable would be the method I’d suggest, but there are other options.
When storing stream state, I’d avoid serializing the IAsyncStream or the subscription handle, even though they are both serializable and intended to be used that way. The reason I discourage this is that these objects are still in flux and we don’t have backwards compatibility built in yet. Instead, I’d suggest storing the stream guid and namespace, then matching those with the related data in the handle to associate the correct handle during grain activation.
When it comes to passing a sequence token to a resume, at grain activation or on cache miss, I would also avoid this if you can. The issue here is that there are two types of PersistenStreamProviders.
From the above description of persistent stream providers, what you’ve been building more resembles a non-rewindable stream provider. These stream providers allow for producers to place data in the queue and for processers to process it, but do not have a strong recoverability story. Applications using this type of stream provider tend to focus on fast best effort stream processing, identifying when data loss occurs to successfully processing the data that is not lost.
Some things that would be helpful for us to understand when advising: How important is the data you’re processing? Mission critical, or is some data loss acceptable? How important is it that these data be processed in real time? On failures can backend, recovery batch process be used for eventual consistency or is the data of no value if it’s not served in real time?