azure-sdk-for-net: Event Hub Producer Client should expire transport producers after a period of inactivity

Summary

In order to publish events to a specific partition, the EventHubProducerClient will need to manage a pool of transport producers which are bound to a specific partition. These transport producers are created on-demand when a request is made for a specific partition. Once created, their lifespan is tied to that of the EventHubProducerClient that created them.

In general, this is not a concern, as the scenario in which a producer targets a specific partition is not a mainline scenario. As an efficiency, however, it would be desirable to expire those transport producers that have not been accessed in a given period of time (configurable) such that unused transport producers are disposed when possible, independent of the EventHubProducerClient.

Scope of Work

  • Implement a means of tracking the last time that a transport producer for a given partition was last used by the EventHubProducerClient that owns it.

  • Implement a periodic check for the transport producers owned by an EventHubClient instance and dispose of those which have not been accessed for a given (configurable) period of time.

  • Ensure that removing the existing transport producer is an atomic operation and does not cause corruption or unexpected results should there be a request to publish to that partition during the check/removal.

  • Ensure that removing the existing transport producer does not block creation of a new instance in the event of concurrent calls, nor cause the publishing of events for other partitions to be blocked. The producer client should be able to handle concurrent calls.

  • Tests have been added or adjusted to cover the new functionality.

Success Criteria

  • The refactoring and enhancements detailed by the scope have been completed.

  • The tests necessary for its validation have been created or adjusted and pass reliably.

  • The existing test suite continues to produce deterministic results and pass reliably.

References

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 25 (25 by maintainers)

Commits related to this issue

Most upvoted comments

Hope you won’t mind me fiddling you a little bit on this point by including some more details to our picture:

Of course not. We can’t refine things if we don’t bounce them around. 😄

The lock on every call concerns me. Though I don’t expect a partition-bound producer to be the norm, having it that way removes any benefit from the concurrent dictionary. When I had mentioned something two-phase, I was thinking along the lines of something like:

internal class TransportProducerPool
{
    private readonly object RemoveSync = new object();
    private ConcurrentDictionary<string, TrackingStuffs> PartitionProducers { get; } = new ConcurrentDictionary<string, TrackingStuffs>();

    public TransportProducer GetPartitionProducer(string partitionId,  EventHubConnection connection,  EventHubsRetryPolicy retryPolicy)
    {
        var trackedProducer = PartitionProducers.GetOrAdd(partitionId, id => connection.CreateTransportProducer(id, retryPolicy));
        
        // If expiration is not within a buffer, say the next 5 minutes and ToBeRemoved is not set.
        trackedProducer.RemoveAfter = DateTimeOffset.Now.Add(TimeSpan.FromMinutes(10));
        return trackedProducer;
       
        // If within the buffer or it is flagged to be removed

        lock (RemoveSync)
        {
              trackedProducer = PartitionProducers.GetOrAdd(partitionId, id => connection.CreateTransportProducer(id, retryPolicy));
              trackedProducer.RemoveAfter = DateTimeOffset.Now.Add(TimeSpan.FromMinutes(15));
              trackedProducer.ToBeRemoved = false;
              PartitionProducer[partitionId] = trackedProducer;
        }
        
          return activeProducer;        
    }
    
    public TimerCallback ProducersCleanerHandler()
    {
         // Get Producers where expire time is now or earlier and are marked for removal. 

         lock (RemoveSync)
         {
              // Remove from the set, then track.
         }

         // Close those that we removed from the set.

         // Get Producers where expire time is now or earlier and not marked for removal.    Loop and mark them for removal.
     }

    private class TrackingStuffs
    {
        public readonly TransportProducer Producer;
        public bool ToBeRemoved;
        public DateTimeOffSet RemoveAfter;
    }
}

Obviously, that is just a rough sketch off the top of my head. It certainly needs more refinement, but the idea is that we avoid taking the lock path unless we think there’s a pending removal. Since this isn’t critical resource management, we don’t have to be super-eager about cleaning, we can wait an extra tick or two.

I’d still rank this as a fallback to the cache if that doesn’t work out.

Your turn for riffing on thoughts. 😄

Would you say this test could be conclusive?

Oh yeah, I’d definitely call that conclusive; you went right down to the metal and grabbed the transport producer directly. That’s about as low-level as we functionally can get. If we had a problem with opening multiple links to that same resource, there would be a hard error rejecting or force-closing one.

I’d like to spin some further thoughts that I think could simplify things further and help to unify the “what happens with a MemoryCache” and “What happens with a ConcurrentDictionary” scenarios.

Here’s the gist:

  • When the TransportProducerPool returns an producer, the caller provides a key that is logged as an active user of the item, and the “last used” time would be updated.
  • When the pool item expires, the handler (be it cache remove callback or timer callback) takes that producer out of the cache. If it is has any active users, it does not close it, just removes it.
  • The pool item is still disposable. When disposed, it will check to see if the item exists in the pool and, if so, that its key is still registered as an active user.
    • If the producer is still in the pool, it will update the “last used” time and remove the associated identifier from the active users.
    • If the producer is not in the pool, it will remove the associated identifier from the active users. If there are no active users, it will close the transport producer.

That does come with a potential trade-off. In an exception situation, it is possible that a producer would be removed from the pool, but the last active user would not properly close it. This would leave the producer in limbo. However, the resources used are bound to an idle timeout. So, in the worst case, the producer would live until the idle timeout closed the underlying AMQP link. Since we own the code that would be responsible for managing the pooled producer and closing things, I think that’s a small exception case that I’m comfortable with.

I would absolutely not trust this code. It’s just to illustrate, and definitely requires thinking about more deeply than I’ve done yet.

internal class TransportProducerPool : IDisposable
{
	private ConcurrentDictionary<string, PoolItem> Pool { get; } = new ConcurrentDictionary<string, PoolItem>();
	private Timer ExpirationTimer { get; }
	
	public TransportProducerPool()
	{
		ExpirationTimer = new Timer(PerformExpiration, null, TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10));
	}
	
	public PooledProducer GetPartitionProducer(string partitionId, EventHubConnection connection, EventHubsRetryPolicy retryPolicy)
	{
		// Get or add the producer.
		
		var identifier = Guid.NewGuid().ToString();
		var item = Pool.GetOrAdd(partitionId,id => new PoolItem(new TransportProducer())); 
		
		// These introduce a slight race, but it's benign.  The timing for removal has a large enough
		// window that it doesn't need to be exact.
		
		item.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
		item.ActiveInstances.TryAdd(identifier, 0);

		return new PooledProducer(item.Producer, producer =>
		{
			// If the item is in the pool, extend it and remove the active instance.
			
			if (Pool.TryGetValue(partitionId, out var cachedItem))
			{
				cachedItem.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
				cachedItem.ActiveInstances.Remove(identifier, out _);
			}
			
			// If the item is not in the pool and there are no active instances, then return
			// close the producer.  
			//
			// I'm probably being overly defensive here, but the second
			// TryGet runs after the extension would have been seen, so it 
			// is intended to be sure that the item wasn't removed in the meantime.
			// 
			// We're not in a performance-critical area, so the aditional cost for 
			// synchronization isn't a concern.
			
			if ((!Pool.TryGetValue(partitionId, out _)) && (!item.ActiveInstances.Any()))
			{
				return producer.CloseAsync();
			}
			
			return Task.CompletedTask;
		});
	}

	private void PerformExpiration(object State)
	{
		// Capture the timestamp to use a consistent value.
		
		var now = DateTimeOffset.UtcNow;
		
		foreach (var key in Pool.Keys)
		{
		   if (Pool.TryGetValue(key, out var poolItem))
		   {
		   	  // Check to see if this should be removed.
			  
		   	  if (poolItem.RemoveAfter <= now)
			  {
			  	  // Remove it, if it should be removed and see if there are any instances 
				  // using it.  If not, close the producer.
				  
			  	  if ((Pool.TryRemove(key, out _)) && (!poolItem.ActiveInstances.Any()))
				  {
				  	  poolItem.Producer.CloseAsync().GetAwaiter().GetResult();
				  }
			  }
		   }
		}
	}

	public void Dispose()
	{
		ExpirationTimer.Dispose();
	}

	internal class PooledProducer : IAsyncDisposable
	{
		private Func<TransportProducer, Task> CleanUp { get; }
		public TransportProducer Item { get; }		
		
		public PooledProducer(TransportProducer producer, Func<TransportProducer, Task> cleanUp)
		{
			Item = producer;
			CleanUp = cleanUp;
		}
		
		public ValueTask DisposeAsync() => new ValueTask(CleanUp(Item));
	}

	private class PoolItem
	{
		public static readonly TimeSpan DefaultRemoveAfterDuration = TimeSpan.FromMinutes(10);
		
		public TransportProducer Producer { get; }
		public ConcurrentDictionary<string, byte> ActiveInstances { get; } = new ConcurrentDictionary<string, byte>();
		public DateTimeOffset RemoveAfter { get; set; }
		
		public PoolItem(TransportProducer producer, DateTimeOffset? removeAfter = default)
		{
			Producer = producer;
			RemoveAfter = removeAfter ?? DateTimeOffset.UtcNow.Add(TimeSpan.FromMinutes(1));
		}
	}
}