Does EventStore provide any capability of dispatching events to somewhere?
I remember J. Oliver’s EventStore was able to do it and I remember Greg saying that being able to migrate from JO to ES is kind of a big thing
If not, how would you implement it? I understand that it can be a separate service looking at “all events” stream and dispatching it to wherever we need. But how exactly it can be implemented right?
Send another “system” event EventDispatched(eventId) + build a projection to store the last dispatched event id? Or have a projection of all the events that are to be dispatched?
When this service crashes after the event is dispatched to the queue but before the EventDispatched published, would you say that it doesn’t happen often + it is “at least once dispatching” (so dispatch it again and let consumers deal with idempotency)? Or would yo do something else here?
Normally checkpointing + atleast once will be used here unless you are dealing with something where you can easily make transactional the handling + the checkpoint writing (writing to sql is a good example) in which case it will be transactional not atleast once.
For how to read it… Basically you mark a checkpoint (what you know you have seen and processed before). You subscribe and put all your messages into an in memory queue (can be bounded to avoid memory overflows). Then you start from your last known position requesting history. For each event you receive in history check if that event is the tail of your queue. If yes eat through the queue and you are now “live” else repeat history reading process.
We’ll post some code and docs for this either today or tomorrow morning. The basic pattern is “subscribe then catch up from last seen point”. As Greg says, if you can store the last seen point transactionally with whatever your handler is doing (e.g. SQL, RavenDB will allow this) then you get transactional rather than at least once.
There is a gotcha : subscribing to a stream ( or SubscribeToAllStreams) affetcs only new events, because you do not provide the Position since you need messages from, and writing the code to reconciliate from the startup phase and then switching to the push notification is very hard.
I was working on a feature where you would have specified that Position to the subscribing api, but the event store should then have kept a checkpoint for every subscription, and do the chasing itself.
I found it easier to just go polling the stream asking for batches of messages, like the chat example does (and save the checkpoint Position on whatever you want, maybe with events id to do client side dedup if something goes wrong on the batch. You can implement an exponential backoff to lower the pressure on eventstore. )
I don’t think that this polling executed from just few servers (publising messages on bus or doing projections) would affect performances too much, so i’m asking for feedback on that solution to the autors
The code isn’t that hard once you wrap your head around it. It’s just tricky the first time. Would have a stream that knew how to do it in e client API be useful?
I prototyped it but i kept finding edge cases so i stopped
I thouth that if is not a performance problem for the event store, having an application-server polling for all-events is not a problem and would keep the code very simple. Do you confirm that is a viable option ?
Could you elaborate more about “Would have a stream that knew how to do it in e client API be useful?”, do you mean being able to subscribe to a stream specifying the starting position ?
“I thouth that if is not a performance problem for the event store, having an application-server polling for all-events is not a problem and would keep the code very simple. Do you confirm that is a viable option ?”
Its not really going to cause a performance problem, its more so that it will increase latency due to polling as opposed to pushing. If you prefer to poll likely you are better off using the atompub API since it also supports intermediary caching etc etc.
So the “trick” to dealing with live subscriptions. Subscribe first. Yopur subscription takes messages and puts them into a queue (can be bounded for size). Now start reading your history. As you move in your history check each event to see if it is the same as the tail of your queue. If it is read through your whole queue (and now process on the fly) if not process and repeat.
“Could you elaborate more about “Would have a stream that knew how to do it in e client API be useful?”, do you mean being able to subscribe to a stream specifying the starting position ?”
var reader = new EventStoreStreamReader(connection, “MyStream”, new FileCheckPoint());
looks like my response got mangled somehow. Retry:
“I thouth that if is not a performance problem for the event store, having an application-server polling for all-events is not a problem and would keep the code very simple. Do you confirm that is a viable option ?”
Sure this is how atom feeds work. If I were doing it I would probably just use the atompub API (it also supports conditional gets/intermediary caching etc etc).
“Could you elaborate more about “Would have a stream that knew how to do it in e client API be useful?”, do you mean being able to subscribe to a stream specifying the starting position ?”
var reader = new ESSubscriptionReader(connection, “myStream”, new fileCheckPoint()); eg its built into the client API.
As for how it works its not that “hard” but a touch tricky. So what you do is…
Subscribe to the stream (your subscription writes any messages to a queue, your queue can be bounded)
start reading your history …
foreach event in history check if tail of queue = event
yes -> process queue and you are caught up just listen to subscription.
Yep ! it’s about what i spiked, but i was worried about boundaries and buffer sizes : if i have a long history (fromallevents) the subscription buffer is going to fill, the eventstore sending queue is going to fill. I do not have measured it but since you say it is not a concern i’m fine
I guess i can always process all events untill i decide to stop (do the most part), store the checkpoint, then Subscribe to the stream, and read history from that checkpoint like you said.
“I guess i can always process all events untill i decide to stop (do the most part), store the checkpoint, then Subscribe to the stream, and read history from that checkpoint like you said.”
You can’t! do this (race conditions). You must subscribe first.
" i have a long history (fromallevents) the subscription buffer is going to fill, the eventstore sending queue is going to fill"
To handle this you simply bound the size of the queue
Oh i could not see the race condition (my thoughts are always about the $all stream). Is the race condition between $all being updated and client being notified ?
Yes it isn’t a problem provided your event processing is idempotent, if not it is easy to deal with anyway by temporarily storing event id’s you see while catching up.
The inconvenience with at least one delivery + idempotent clients is that every client should now have this “already seen event ids” thing.
Also it is sometimes a big question of for how long already processed event ids should be stored because sometimes something can be (and therefore will be) down for a significant amount of time.
Another problem can be the atomicity of the checkpoint: you can process a message and die before writing a checkpoint or you can write a checkpoint and die processing a message… So you still may have an “at least once” but on the client (in case of using RabbitMQ, etc).
It would be really interesting to discuss how such atomicity can be achieved without using DTC. I know Udi was saying that there are ways to do it, but haven’t explained them yet…