There is something not really easy to get right with Position/StreamPosition but maybe I’m missing something.
When calling ReadStreamAsync / ReadAllAsync the position passed is the position of the first event we’d like to receive.
For the first call, it’s ok, since we can pass 0 (Position.Start, StreamPosition.Start).
Now, lets say we read some events. We can get the position of the last one in the EventData. let’s call it P.
If we do a new read from position P, we get the event at position P again . The reader code has to protect against this duplicate.
It should be possible to either get P+1 (it’s doable for StreamPosition since it’s an incremental int, but not for Position), or to ask a ReadStreamAsync/ReadAllAsync starting after given position.
This would be especially usefull for snapshots. We’d be able to store the snapshot along the position of the last processed event, and the reads would then start after this position.
How do you manage this on your side ?
Internally, in the server, the responses are sliced by pages, and the datastructure contains a NextEventNumber that can be used to request the next page.
This was also the case in the TCP client, that contained a NextEventNumber. But the GRPC client, using IAsyncEnumerable (which is overall easier to use) cannot return this NextEventNumber.
having an option in Read methods to provide a position and request for events After this position would do the trick. The only difficulty is for the Position 0… To get Event at Position 0 with the After flag, you have to request at Position -1…
This would be actually very similar to Position handling in Subscriptions:
Subscription basics | EventStoreDB Documentation
Can you go into more detail about your particular use case please? Without knowing more about it I would suggest just use the subscriptions API if you really need this.
My use case works like projections, but synchronous. This is also the case with snapshots.
I’d like to do:
after saving new events, Id like to refresh projections synchronously:
let snapshotPosition, snapshotState = loadSnapshot
let mutable position= snapshotPosition// this is the last event position
let mutable state = snapshotTest
let catchup() =
for e in store.ReadAll(stream,position) // it will return the last event again :/
state <- evolve state e
version <- e.Position // in the end it will be the last event pos again
It would work with something like:
let snapshotPosition, snapshotState = loadSnapshot
let mutable position= snapshotPosition// this is the last event position
let mutable state = snapshotTest
let catchup() =
for e in store.ReadAll(stream,position,Option.ExcludeStart) // it will return the event just after the last known :)
state <- evolve state e
version <- e.Position // in the end it will be the last event pos again
This is what subscriptions do internally, but they are async. And for simple scenarios, doing synchronous projections is faaaaaar easier.
The current situation also make snapshot versioning difficult. Because when we store the last event number in the snapshot, reading from this version will return the last event (already aggregated in the snapshot) again. So there is this +1 to add when reading but not at Start.
Being able to do store.ReadStream(stream, snapshotVersion, Option.ExcludeStart)
would make it more direct…
I don’t know yet the best name for this option: ExcludeStart, Next ?
It makes read composable so I think its a good point. Today it’s not the case. You cannot read a stream, and use the information in the result to request new events without getting some twice.