Batched live processing for subscriptions

I’m building a C# client using the TCP interface that will efficiently store a projection in a view model database (eg document or graph db). I can easily batch the events before the subscription catches up by flushing using the liveProcessingStarted delegate.

However, there is no way I can see to easily batch events when in “live processing” mode. The scenario I am envisaging is that lots of events have been generated quicker than they can normally be processed, and we now want the client to batch.

To facilitate this, would it be possible to add the position in the stream, plus the last known number at the point the event is sent? This way we could make an intelligent decision on whether to batch or not. Alternatively it could be a flag like EventSlice.IsEndOfStream, or even just sending all the events in a pre-configured batch size.

The only way I can see to do this at the moment would be to wait some period of time to see if another event comes in, before processing.

Cheers,

Jon

I’ve been playing with the .NET TPL Dataflow library lately, and they have BatchBlock just for this purpose.

Just keep posting all the events to the BatchBlock as you receive them and when the count meets the minimum batch size, it will send a batch of them to your handling method as an array. You may not even need to worry whether you are live or catchup.

In case a full batch was taking too long to populate, you could also call trigger partial batches on a timer like mentioned here.

Or after processing a batch, you might be able to use OutputCount to determine whether you just wanted to go ahead with the next partial batch and then call TriggerBatch().

I haven’t played with BatchBlock yet, so I’m just spit-balling. :slight_smile:

For easier use, link your BatchBlock to an ActionBlock. Here is an example (scroll down to Chunky vs Chatty)

http://www.mindscapehq.com/blog/index.php/2012/04/18/go-with-the-dataflow/

Interesting code snippet:

var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var batcher = new BatchBlock<string>(5);
var emailer = new ActionBlock<string[]>(s => EmailToBoss(s));  // Action now takes array of string
// ...
businessifier.LinkTo(batcher);
batcher.LinkTo(emailer);

``

Awesome :slight_smile: I’ll definitely switch to this approach.

However, I’d also like to know if my suggestions would be a good idea. Specifically setting a batch size with the subscription, as this would minimise network calls.

Not a GES guy, but I doubt any network traffic would be saved by batching on the TCP client. The connection is held open and messages are pushed from the server AFAIK.

Yes, I guess you are right. However, it would still be useful as it would mean no need for the timer or manual batching.

Do you have any code available?

I’m not using batching myself, but here is a rough draft (not tested):

private static void Process(ResolvedEvent[] batch)
{
// do work
}

static void Main()
{
// setup events to be batched, 50 at a time
var batcher = new BatchBlock(50);
// setup processing of the batch
var processor = new ActionBlock<ResolvedEvent[]>(Process);
// link batcher to processor, including completion
batcher.LinkTo(processor, new DataflowLinkOptions {PropagateCompletion = true});

// start the subscription and have it post to the batcher
var subscription = eventStoreConnection.SubscribeToAllFrom(..., eventAppeared:(sub, evt) => batcher.Post(evt));
// let it run for a while
Thread.Sleep(10000);
// shut it down
subscription.Unsubscribe();
// if you want to wait for queued messages to finish before stopping
batcher.TriggerBatch(); // flush partial batch
batcher.Complete(); // stop taking messages, also completes processor
processor.Completion.Wait(); // wait for queued messages to be processed

}

``

batcher will queue up messages until it has 50, then it will send those 50 to processor. processor in turn sticks each batch into a queue, then runs the Process method on them, one batch at a time.

Have a look at rx (Reactive Extensions) while you’re at it, it’s very well suited for these kind of tasks. (nothing wrong with DataFlow either)

/Peter

Any thoughts on rx vs Dataflow?

They can do pretty much the same thing, but I’m biased towards rx more functional style + you get a lot for free, like windowing, buffering, streams of streams, errorhandling etc. I get the impression there are more resources on the net for it as well. Dataflow is more like you’re basic lego blocks. But it’s very much down to taste and opinion…

/Peter

I’ve never used rx, only given it a cursory look, so I can’t do a comparison. The main thing I have used in TPL dataflow is the action block, because it’s a drop-in replacement for a standard queue + worker. It’s great for processing messages, and I add one as a private field to use for those types of components.

Found this, answers in comments.

http://stackoverflow.com/questions/14403054/what-are-the-differences-between-tpl-dataflowtdf-a-reactive-extensions

I’m gonna try both and see which I like. Rx looks nice:

var gapBetweenEvents = subscription.Throttle(TimeSpan.FromMilliseconds(100));

subscription

.Window(() => gapBetweenEvents)

.Select(x => x.Buffer(500))

.Switch()

.Subscribe(handleEvents);

``

This batches every 500 events, unless we don’t receive an event for 100ms, in which case it flushes.

Theres an overload on Buffer that takes a TimeSpan and and count, thats all you need for your desired behaviour (to be honest I’m not sure how your code would behave, I wouldn’t use either Throttle or Window for eventprocessing, but it might work due to the way you’ve composed them)

/Peter

This is the reason I only gave rx a cursory look. I am very comfortable with lambdas, but looking at this code, the language of it does not make it obvious to me what is happening.

It’s not quite the same behaviour. With the buffer overload, you have to make the timeout longer than the time to receive the batch, otherwise the timer will always trip. With the throttle, you can make the timeout much shorter, because if you receive an event within that period, it will reset. This gives you the best of both worlds, speed for individual events or batched for when you get bulk events.

Actually, you might want to use the buffer overload in the code I posted for the case where events are being received at a rate similar to the throttle.

@Jonathan Curtis,
Let us know which way you go and why. I will be making a similar decision soon and I’d rather you do all the work :). just kidding, but I’d like to hear your thoughts. I’ve been reading up on TPL and datablocks look pretty damn sweet.

r

OK, interesting! I’ve always gone for a max latency rather than optimal batchsize, but haven’t really measured the effects.

Normally you go for a heuristic that tries to optimize both. An example of dynamic batching can actually be seen in the event store in storagewriter. Basically the rules are quite simple. If your queue is empty write. Else watch how long operations are taking and dynamically make them bigger if your queue has items in it. This can be done on the outside by looking if you are caught up, if caught up write immediately else batch.

I can’t see a way with the TCP connection to know if you are behind once live processing has fired.