Batched live processing for subscriptions

It is really unlikely to fall behind once you are caught up - unless your projection writes are really really slow.

Well, it depends on the frequency/distribution of events and the speed of your writes. Having a way to know if you are caught up at all times would be useful in some circumstances. Maybe your you need to do regular batch importing of events, for example.

Once you are live just put them in a local queue then you know if you are caught up :slight_smile:

That is what RX Buffer and TPL Dataflow BufferBlock do, but you still have to decide how long to wait before flushing and how long to wait between events. If there was an IsEndOfStream flag, we can flush immediately. Even better if eventstore supported batched subscriptions.

EventStore DOES support batches subscriptions (read forward) and there is an eof flag associated with it.

IsEndOfStream would be meaningless on a live pushed subscription as as of the time of sending it would always be true.

OK, I’m a bit confused by how the TCP client actually works. Does it have it’s own clientside queue? What happens if events are saved to eventstore faster than the client can handle them?

Also, what happens if you save multiple events in a transaction? Wouldn’t IsEndOfStream would be false until the last event sent?

Yes, there is a client side queue which is bounded. If the client cannot handle the rate, it will be dropped by the server to avoid service degredation, and the client has to fall back to pulling instead.

EndOfStream is only set for reads as far as I’m aware, not on push subscriptions. Transactions are a write-only concern so have no impact here.

James

Well there are multiple levels (let’s discuss live subscriptions first). Eg SubscribeTo (not subscribe from)

Internally there is an event that occurs when you append to a stream. https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Messages/StorageMessage.cs#L224

This event gets picked up by a subscription service who maintains which connections are subscribed to what things (note not discussing competing consumers here they are different). https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/SubscriptionsService.cs this will then route an EventAppeared message to your given connection. https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/SubscriptionsService.cs#L267

Connections have a sizable buffer of data that they are queuing in order to actually be put over the TCP socket data is stored here to be forwarded. https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/Transport/Tcp/TcpConnectionManager.cs#L349 if a buffer gets too big e.g. the socket can’t keep up the socket will be closed.

On the other side…

The client then receives the messages and they get raised back to the client code https://github.com/EventStore/EventStore/blob/dev/src/EventStore.ClientAPI/ClientOperations/SubscriptionOperation.cs#L248 (one item at a time on thread pool) its queued here as well (we should bound this queue as well just put up a card for it). Most however read off directly into their own client queue where they can control it.

This is for live subscriptions. A catchup subscription if it gets too many messages or if it drops connection etc will fall back to polling until it gets caught up (e.g. readeventsforward) until it gets caught up at which point it switches to a live push subscription (and does this internally so you don’t have to see any of the work involved).

Cheers,

Greg

https://github.com/EventStore/EventStore/pull/246

Thanks for the explanation.

Couldn’t the ExecuteActions method on SubscriptionOperation pass a bool to eventAppeared to indicate if there are anymore items in the queue? Or why not just store ResolvedEvents on the queue and send them all to eventAppeared as an array?

I guess I could write this into my own queue, but it seems like it would be nice if eventstore did it for me :slight_smile:

“Couldn’t the ExecuteActions method on SubscriptionOperation pass a bool to eventAppeared to indicate if there are anymore items in the queue? Or why not just store ResolvedEvents on the queue and send them all to eventAppeared as an array?”

This sounds like code very specific to your scenario. It is trivial to compose such behaviours (that is the goal).

OK, so got round to having a play with this. Ended up not using RX or TPL Dataflow, I think they would just complicate the situation. I copied the idea from the eventstore client API itself.

On eventAppeared I store the event in a BlockingCollection, then start a new thread to execute a batch delegate. New events coming in get stored on the original thread in the BlockingCollection while the batch is being processed, when the batch is finished, it just executes a new batch with all the stored up events. I give the BlockingCollection a max size equal to the desired batch size, so it blocks on add if the batch size is reached. In this situation the original eventAppeared delegate will block and the client API queuing will kick in. With this solution we don’t need to worry about whether the subscription is live or not.

Hope that makes sense. Here’s a gist of the (boiled down) code. Would appreciate any thoughts on this approach.

Cheers,

Jon

Refactored into an extension method on IEventStoreConnection.

What happens when the collection is empty? Does your thread spin or wait handle? I can’t imagine this being more efficient or less code than using an ActionBlock from TDF. The ActionBlock is both a concurrent queue and a worker, but it uses tasks on the thread pool (configurable) instead of tying up a thread indefinitely.

Okay, I actually looked at the gist, so nevermind the questions, but I still stand by my statements. Using those components, the stuff that is easiest to get wrong is done for you.

I’m not sure how you can achieve the same thing using ActionBlock. In the EventAppeared delegate we need to know if a batch is currently being processed, that’s what the usage of Interlocked is doing. You’d still need to do the same thing if you used ActionBlock as we need a way to signal back to the queuing thread that the batch is complete.

My code only creates a thread when there is work to do and yields back when it’s finished, I don’t see how Rx or TPL will help here. I guess I could replace the ThreadPool.QueueUserWorkItem with an ActionBlock, but I’m not sure that adds much value.

My changes to your code below (not tested). Note that I got rid of the subscription being returned to the batch processor. The only time I ever use the subscription is to stop it, and I have always done that from the one returned from a SubscribeXXX method. The only other thing you can check on the sub is whether it’s subscribed to all (should be obvious from code semantics) or the stream it’s subscribed to (stream is also on the event).

I added live processing awareness so it will wait for full batches in catchup, but trigger partial batches if the subscription is live but nothing is currently waiting.

class BatchedEventSubscriber

{

readonly IEventStoreConnection connection;

readonly string streamId;

readonly int? lastProcessedVersion;

readonly bool resolveLinks;

readonly Action<ResolvedEvent[]> batchAppeared;

readonly ActionBlock<ResolvedEvent[]> batchRunner;

readonly BatchBlock batcher;

volatile bool isLive;

public BatchedEventSubscriber(IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<ResolvedEvent[]> batchAppeared, int batchSize = 500)

{

this.connection = connection;

this.streamId = streamId;

this.lastProcessedVersion = lastProcessedVersion;

this.resolveLinks = resolveLinks;

this.batchAppeared = batchAppeared;

batchRunner = new ActionBlock<ResolvedEvent[]>(batchAppeared);

batcher = new BatchBlock(batchSize);

batcher.LinkTo(batchRunner);

}

public EventStoreStreamCatchUpSubscription Subscribe()

{

return connection.SubscribeToStreamFrom(

streamId,

lastProcessedVersion,

resolveLinks,

EventAppeared,

LiveProcessingStarted

);

}

void LiveProcessingStarted(EventStoreCatchUpSubscription subscription)

{

isLive = true;

}

void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent)

{

batcher.Post(resolvedEvent);

bool doPartialBatch;

lock (batchRunner) // lock is for batchRunner.InputCount, batcher.OutputCount, whose thread safety is unknown

doPartialBatch = isLive && batchRunner.InputCount == 0 && batcher.OutputCount != 0;

if (doPartialBatch)

batcher.TriggerBatch();

}

}

``

All of the constuctor parameters should be method parameters on subscribe and not stored in the class, but I didn’t change that.

Not sure if the lock is needed, but put it in there, in case.

InputCount can still be 0 when a batchRunner is busy, but the worst that happens is it queues a small batch behind the current one. If you were live processing 500 per second, then you’d probably just want to cut out the live part and leave it batching the full 500 every time.

Most of the constructor parameters. :slight_smile:

Cool, I like the code, didn't know about InputCount and OutputCount.
However, this is around 10-15% slower than the Interlocked
+ ThreadPool.QueueUserWorkItem version.