orleans: Talk about the stream (Feedback on the stream and QueueCacheMissException)

Continuing the theme https://github.com/dotnet/orleans/issues/844 at the request of @gabikliot, I give feedback. For starters, I would like to say I could be wrong in my statements because I may not know some of the nuances in the script and the principles of implementation or maybe I do not fully understand the work scenario.

Scenario example: lets says you subscribed at offset X, then produced event with sequence X, X was delivered to consumer, now it maybe be deleted from the cache, now you subscribe again to X from a different consumer, then you will get cache miss exception, rightfully, for the 2nd subscription.

Overall I completely agree with this scenario, but there are a number of issues. During testing I noticed that if you receive an error QueueCacheMissException I lose data flow (queue), based on the described scenario it is logical, because there is a false consumer that does not process messages, and all messages go nowhere But is it right? How can I recover this data? Why not leave in the stream and in the queue until the consumer who will be able to handle them? As the message can be considered processed if it has not been processed?

About the subscription: Of course one the consumer needs to sign only once. But I faced a problem, what if multiple consumers subscribed to the stream, or rather how to understand what the current consumer I don’t have a subscription again? While I always work with only one consumer because to me it is important to follow FIFO So I want to show my subscription option with one consumer within the solution of the problem which I described https://github.com/dotnet/orleans/issues/844 Would welcome comments on the code and the answer to the question how to track the subscription of current consumer if you know what subscriptionHandles.Count > 1 ?

Consumer

        public async override Task OnActivateAsync()
        {
            var streamProvider = GetStreamProvider(PROVIDER_NAME);
            _stream = streamProvider.GetStream<string>(Guid.Parse(GUID_STREAM), this.GetPrimaryKeyString());
            var subscriptionHandles = await _stream.GetAllSubscriptionHandles();
            if (!subscriptionHandles.IsNullOrEmpty())
                subscriptionHandles.ForEach(async x => await x.ResumeAsync(OnNextAsync));
        }

        public async Task SubscribeAsync()
        {
            var subscriptionHandles = await _stream.GetAllSubscriptionHandles();
            if (subscriptionHandles.IsNullOrEmpty())
                await _stream.SubscribeAsync(OnNextAsync);
        }

Producer

        private async Task<IAsyncStream<string>> GetStream(string consumerName)
        {
            var streamConsumerGrain = GrainFactory.GetGrain<IConsumerGrain>(consumerName);
            await streamConsumerGrain.SubscribeAsync();
            var streamProvider = GetStreamProvider(PROVIDER_NAME);
            return streamProvider.GetStream<string>(Guid.Parse(GUID_STREAM), consumerName);
        }

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Reactions: 1
  • Comments: 55 (29 by maintainers)

Most upvoted comments

about 2 scenarios: why in the situation when two grain consumer processes one the same message you’re using to stream, but not for example ObserverSubscriptionManager<IGrainObserver>?

As I touched on in my previous post. If there is some sort of state associated with a subscription (aggregates, routes, …?) you’ll need to persist this state associated with the subscription to re-associate it when rehydrating the grain and resuming the subscription. Serializable implementations of IAsyncObservable would be the method I’d suggest, but there are other options.

When storing stream state, I’d avoid serializing the IAsyncStream or the subscription handle, even though they are both serializable and intended to be used that way. The reason I discourage this is that these objects are still in flux and we don’t have backwards compatibility built in yet. Instead, I’d suggest storing the stream guid and namespace, then matching those with the related data in the handle to associate the correct handle during grain activation.

When it comes to passing a sequence token to a resume, at grain activation or on cache miss, I would also avoid this if you can. The issue here is that there are two types of PersistenStreamProviders.

  • The first is non-rewindable. These use a persistent queues which have no ordering or reliability guarantees. An example of this is AzureQueueStreamProvider, which you seem to be modeling off of. For these stream providers, the tokens are volatile and only carry meaning during delivery. That is, if you restart the silo and try to use a previously delivered token to subscribe to a particular place in the stream, the operation will likely fail or not act as you’d intended, because that token may not be valid any more. These are mainly best effort stream providers that use a persistent queue to help reduce data loss. When subscribing or resuming to streams from this type of stream provider, providing null tokens is the correct action most of the time.
  • The second type of Persistent stream providers are rewindable. These use queues that support ordering and reliability guarantees, like kafka, or event hub. With these kinds of stream providers, one can persist the sequence token with the grain state, and use it to resume the subscription, because these queues provide the ability to redeliver data (to a degree) from reliable locations that can be encoded into the tokens. Unfortunately, we do not yet have stream providers of this type in Orleans. All implementations of these types of stream providers have been customer specific.

From the above description of persistent stream providers, what you’ve been building more resembles a non-rewindable stream provider. These stream providers allow for producers to place data in the queue and for processers to process it, but do not have a strong recoverability story. Applications using this type of stream provider tend to focus on fast best effort stream processing, identifying when data loss occurs to successfully processing the data that is not lost.

Some things that would be helpful for us to understand when advising: How important is the data you’re processing? Mission critical, or is some data loss acceptable? How important is it that these data be processed in real time? On failures can backend, recovery batch process be used for eventual consistency or is the data of no value if it’s not served in real time?