I can’t find it ATM, but I had a class for this that was non blocking. I don’t think it covered all the edge cases either. I’ll try and find it tomorrow. Anyway this is the code that came before that:
private void EventAppeared(EventStoreCatchUpSubscription _, ResolvedEvent resolvedEvent)
{
if(resolvedEvent.Event.EventType.StartsWith("$"))
{
return;
}
if(_queue.Count >= MaxQueueSize)
{
_subscription.Stop(); // handle backpressure here
return;
}
var streamEvent = new StreamEvent(
resolvedEvent.Event.EventStreamId,
resolvedEvent.Event.EventId,
resolvedEvent.Event.EventNumber,
_subscription.LastProcessedPosition.PreparePosition,
resolvedEvent.Event.Created,
resolvedEvent.Event.EventType,
Encoding.UTF8.GetString(resolvedEvent.Event.Data),
Encoding.UTF8.GetString(resolvedEvent.Event.Metadata));
_queue.Enqueue(streamEvent);
}
private Task ProcessQueue()
{
if(_isDisposed.IsCancellationRequested)
{
return Task.FromResult(0);
}
StreamEvent streamEvent;
if(_queue.TryDequeue(out streamEvent))
{
return _streamEventReceived(streamEvent)
.ContinueWith(_ => ProcessQueue(), TaskContinuationOptions.NotOnFaulted);
}
return Task.Delay(1).ContinueWith(_ => ProcessQueue());
}