Dispatching events

Hi Guys,

Does EventStore provide any capability of dispatching events to somewhere?

I remember J. Oliver’s EventStore was able to do it and I remember Greg saying that being able to migrate from JO to ES is kind of a big thing :slight_smile:

If not, how would you implement it? I understand that it can be a separate service looking at “all events” stream and dispatching it to wherever we need. But how exactly it can be implemented right?

Send another “system” event EventDispatched(eventId) + build a projection to store the last dispatched event id? Or have a projection of all the events that are to be dispatched?

When this service crashes after the event is dispatched to the queue but before the EventDispatched published, would you say that it doesn’t happen often + it is “at least once dispatching” (so dispatch it again and let consumers deal with idempotency)? Or would yo do something else here?

What are the commonly used patterns here?

Cheers,

Alexey.

Normally checkpointing + atleast once will be used here unless you are dealing with something where you can easily make transactional the handling + the checkpoint writing (writing to sql is a good example) in which case it will be transactional not atleast once.

For how to read it… Basically you mark a checkpoint (what you know you have seen and processed before). You subscribe and put all your messages into an in memory queue (can be bounded to avoid memory overflows). Then you start from your last known position requesting history. For each event you receive in history check if that event is the tail of your queue. If yes eat through the queue and you are now “live” else repeat history reading process.

HTH,

Greg

We’ll post some code and docs for this either today or tomorrow morning. The basic pattern is “subscribe then catch up from last seen point”. As Greg says, if you can store the last seen point transactionally with whatever your handler is doing (e.g. SQL, RavenDB will allow this) then you get transactional rather than at least once.

There is a gotcha : subscribing to a stream ( or SubscribeToAllStreams) affetcs only new events, because you do not provide the Position since you need messages from, and writing the code to reconciliate from the startup phase and then switching to the push notification is very hard.

I was working on a feature where you would have specified that Position to the subscribing api, but the event store should then have kept a checkpoint for every subscription, and do the chasing itself.

I found it easier to just go polling the stream asking for batches of messages, like the chat example does (and save the checkpoint Position on whatever you want, maybe with events id to do client side dedup if something goes wrong on the batch. You can implement an exponential backoff to lower the pressure on eventstore. )

I don’t think that this polling executed from just few servers (publising messages on bus or doing projections) would affect performances too much, so i’m asking for feedback on that solution to the autors :smiley:

Valerio

The code isn’t that hard once you wrap your head around it. It’s just tricky the first time. Would have a stream that knew how to do it in e client API be useful?

Also I wouldn’t worry too much about the load from your subscriptions unless you are thousands/second,

I prototyped it but i kept finding edge cases so i stopped :smiley:

I thouth that if is not a performance problem for the event store, having an application-server polling for all-events is not a problem and would keep the code very simple. Do you confirm that is a viable option ?

Could you elaborate more about “Would have a stream that knew how to do it in e client API be useful?”, do you mean being able to subscribe to a stream specifying the starting position ?

Thanks, Valerio

“I thouth that if is not a performance problem for the event store, having an application-server polling for all-events is not a problem and would keep the code very simple. Do you confirm that is a viable option ?”

Its not really going to cause a performance problem, its more so that it will increase latency due to polling as opposed to pushing. If you prefer to poll likely you are better off using the atompub API since it also supports intermediary caching etc etc.

So the “trick” to dealing with live subscriptions. Subscribe first. Yopur subscription takes messages and puts them into a queue (can be bounded for size). Now start reading your history. As you move in your history check each event to see if it is the same as the tail of your queue. If it is read through your whole queue (and now process on the fly) if not process and repeat.

“Could you elaborate more about “Would have a stream that knew how to do it in e client API be useful?”, do you mean being able to subscribe to a stream specifying the starting position ?”

var reader = new EventStoreStreamReader(connection, “MyStream”, new FileCheckPoint());

eg in the client API this exists.

looks like my response got mangled somehow. Retry:

“I thouth that if is not a performance problem for the event store, having an application-server polling for all-events is not a problem and would keep the code very simple. Do you confirm that is a viable option ?”

Sure this is how atom feeds work. If I were doing it I would probably just use the atompub API (it also supports conditional gets/intermediary caching etc etc).

“Could you elaborate more about “Would have a stream that knew how to do it in e client API be useful?”, do you mean being able to subscribe to a stream specifying the starting position ?”

var reader = new ESSubscriptionReader(connection, “myStream”, new fileCheckPoint()); eg its built into the client API.

As for how it works its not that “hard” but a touch tricky. So what you do is…

  1. Subscribe to the stream (your subscription writes any messages to a queue, your queue can be bounded)

  2. start reading your history …

  3. foreach event in history check if tail of queue = event

yes -> process queue and you are caught up just listen to subscription.

no -> process event, move checkpoint

  1. all history is read. is queue empty?

yes -> process queue forward from here

no -> something is really wrong here.

Make sense?

Cheers,

Greg

Yep ! it’s about what i spiked, but i was worried about boundaries and buffer sizes : if i have a long history (fromallevents) the subscription buffer is going to fill, the eventstore sending queue is going to fill. I do not have measured it but since you say it is not a concern i’m fine :smiley:

I guess i can always process all events untill i decide to stop (do the most part), store the checkpoint, then Subscribe to the stream, and read history from that checkpoint like you said.

Thanks, Valerio

I think you are misunderstanding a bit.

“I guess i can always process all events untill i decide to stop (do the most part), store the checkpoint, then Subscribe to the stream, and read history from that checkpoint like you said.”

You can’t! do this (race conditions). You must subscribe first.

" i have a long history (fromallevents) the subscription buffer is going to fill, the eventstore sending queue is going to fill"

To handle this you simply bound the size of the queue

HandleSubscription(Event e) {

lock(myqueue) {

myqueue.Enqueue(e);

if(myqueue.Count > max) myqueue.dequeue();

}

}

Oh i could not see the race condition (my thoughts are always about the $all stream). Is the race condition between $all being updated and client being notified ?

Valerio

Race ->

Get Newest returns up to 15 …

Get Newest Events Since 15 -> returns none

Someone writes new one (16)

Subscribe

I will never get 16 (first new one will be 17).

Oh no, that’s not what i meant, i was thinking this to reduce the amount of data in flight.

Get Newest returns up to x and process them.

Subscribe

Get history since x

and follow you workflow.

Valerio

Ah ok yes that can work.

Interesting stuff. Maybe it would make sense to have these kinds of recipes on the EventStore Github wiki or something?

@Greg Is there not another race condition with the steps you posted?

Subscribe

Get events to 15

Process events 1-15

Someone writes event 16

Get events since 15

Process event 16

We have finished catching up -> start processing real time queue

Event 16 appears ?delayed for some reason? over subscription

We process 16 a second time

Depends how subscription works exactly I suppose, can you confirm if it is possible?

Thats just atleast once messaging. The other way prevented that from happening but most systems are already idempotent

Yes it isn’t a problem provided your event processing is idempotent, if not it is easy to deal with anyway by temporarily storing event id’s you see while catching up.

Just wondered if it is possible.

The inconvenience with at least one delivery + idempotent clients is that every client should now have this “already seen event ids” thing.

Also it is sometimes a big question of for how long already processed event ids should be stored because sometimes something can be (and therefore will be) down for a significant amount of time.

Another problem can be the atomicity of the checkpoint: you can process a message and die before writing a checkpoint or you can write a checkpoint and die processing a message… So you still may have an “at least once” but on the client (in case of using RabbitMQ, etc).

It would be really interesting to discuss how such atomicity can be achieved without using DTC. I know Udi was saying that there are ways to do it, but haven’t explained them yet…