Determine if event in (catch-up) subscription is last event

Hi,
I’m trying to implement some buffering and batching for catch-up subscriptions so that I can hopefully improve my processing performance.
My plans are to set up subscriptions to the all events stream, depending on the positions of the read side projections (cqrs/es) and utilise what TPL Dataflow offers to implement buffering and batching.
The areas I’m trying to improve are determining if an event is used in my projection and the actual deserialisation - I estimate to get a performance increase if I can transform multiple events in parallel (which I will test to verify).
How can I determine if an event in a catch-up subscription is currently the last event in the stream? Are there some properties or other information available?
I could not toggle this depending on liveProcessingStarted, because I might be able to buffer and batch during live-processing as well - if my projections are slower than the speed at which new events are generated.
Slow projections might be things like generating data for a DWH, where eventual consistency could be expected to be more relaxed.

Regards,
urbanhusky

I might be wrong here but in this setting (read an event or waiting the server to dispatch events to your subscription), you have only eventual consistency.

There is still a chance while the server sends an event to you, a new one just arrived. So you cannot guarantee at the event level that it’s actually the last one.

I don’t need a guarantee. The reason why I want to know if there are any other pending events is so that I don’t block waiting for new events when I’m at the head. I can trigger the current batch as completed if there are no pending events. I would get the next events anyway.

I guess this depends on how, and if, events are buffered in the ES client. What alternatives could I have? Timeout?

This is what liveProcessing is intended for (the subscription itself
handles buffering/batching internally).

Even if we were to tell you that it was "the last event" the chances
are good by the time you are receiving it that it is *not* the last
event due to buffering.

Seems all you need here is to read through the whole stream until you’ve reached end of stream. In this regard, using batch read is enough. It has a method the result that says if you’re at head of the stream. Why bother with subscription in this case ?

I need a subscription because I have to stay up-to-date too.
I guess what I need is a limited, blocking buffer that transforms items in parallel but consumes FIFO.

If new events arrive, they get added to the buffer (the operation blocks if the buffer is full).
All events in the buffer are transformed into their corresponding POCOs in parallel (or maybe just sequential, if there is nothing to be gained from having many parallel deserialisers running - N for each projection but M projections…).
These POCOs are then consumed in the same order as they were added to the buffer.

The primary reason why I want to do this is so that I can deserialise into objects while I’m busy waiting for the projection to finish processing previous events.
So, this is no longer an Event Store question. Thanks :slight_smile:

What is your message throughput? I would guess that deserialization is
a tiny amount compared to your io

To make a very simple subscription why not just iterate over
readstreameventsforward/readalleventsforward? If we are talking just a
few clients this will not cause too much load and you can read
frequently. If you fall behind this is what a subscription does
anyways.

My biggest issue is catching up from scratch, when I have to rebuild parts of the read model. Every 10-100ms of additional delay between processing of events quickly sums up to long delays and slow throughput.
We’re going to source existing data for events so we can expect to start out with millions of events already.

I’m most likely going to use the TPL Dataflow with a Transformblock (high maximum degree of parallelism, >1 queue size) for the deserialisation, consumed by the actual projection.
This lets me deserialise events while I’m waiting for I/O.

I can use this for both read forward and subscriptions. I’ll have to add batching I/O later (i.e. commit unit of work every X events or if queue is empty).

ReadForward is async. It really shouldn't be that hard to setup
readforwardasync->buffer->deserialize->io async.