We currently have projections which filter events of interest for different denormalisers to process. We record the position in the stream of the last event processed by the denormliser in a DB. This allows us to work out how far behind the head of the stream the denormaliser is, by querying eventstore for the stream and seeing the index at the head and comparing it to the index in the DB, and we can use this to get notified if a consumer is slow, or stopped processing for some reason.
To avoid write amplification of the projections we are moving away from this model and instead having the catch-up subscriptions subscribe to the $all stream instead and filter on the client. This requires us to not store the event index, but the commit position and prepare position in the $all stream instead.
We are now not sure how we can workout how far behind the head of the $all stream we are, as the commit position and prepare positions are not sequential.
Any ideas on how we can solve this
In fact, prepare and commit positions are the exact offset in the event log. If you take the prepare position of $all/head and subtract the prepare position of the last processed event you will have exactly how many bytes are remaining to be processed. There’s no way to know the number of events in the $all stream though.
I’m not sure if commit and prepare positions are supposed to be opaque so no guarantee if this will always be the case. Another caveat is if you have scavenged data then the number of bytes will still include it.
Thanks. This is what I suspected, and this doesn’t tell us if any of the missed bytes are events that the denormaliser is interested in though so not sure how useful it is.
I’m not sure if commit and prepare positions are supposed to be opaque so no guarantee if this will always be the case. Another caveat is if you have scavenged data then the number of bytes will still include it.
Correct. It is the logical position. A reasonable representation of where you are at might be reasonable as an addition though (and isnt much work)
Do you mean this is something that could be added to event store API reasonably easily, to allow clients to have some sort of logical position upon which to make the sort of decision I alluded to?
I am not sure how such a value could be calculated at a reasonably low cost. I can give you “some idea” of it but an exact number is actually very expensive to calculate and it can change while you are running through (think a chunk gets scavenged that is ahead of you, you also have writes happening while you are catching up!)
Our initial thoughts were either that we have a single service , or each service manages this on its own, which subscribers to the all stream and knows what events each denormaliser subscribes to, which keeps track of the effective ‘count’ of all events for that denormaliser. Then each denormaliser keeps its own count and we can compare the two.
But obviously fraught with issues around the indexes maintained by that service. Is it not possible to get a single logical index for all events in the all stream? This wouldn’t be perfect but might go some way towards…
The $all stream and subscribing to $all does give you an exact index and ways of continuing for a subscription the same as a stream subscription. Why does this not work?
So in our current model, we can compare the event index of the denormaliser with the protection head to work out exactly how many events the denormaliser is behind, because the projected steam only contains events the denormaliser cares about.
In the new model with the all stream, we only write the index if we process an event the denormaliser cares about (maybe this is something we could change but I’m a little lothed to write to the dB for every event, even if are ignoring 1000 everts in a row). Because we don’t know how many events have been written which the denormaliser needs to process we don’t know if we are keeping up with processing, which is something we’d like to monitor, in case a service stops processing unexpectedly, or more likely gets overwhelmed. And even if we store the current position in the all stream on every event processed we still won’t know if the difference represents 1 event it 109
If you are going from a stream then you you just use the current position of stream (largest event number) - current position you are. You can do roughly the same on $all. The additional reads to calculate this I assume would be one read every few seconds (figure out where the head is) so it should be relatively cheap to keep calculated as well.