my take on an Event Dispatcher

Well, I just now saw all the replies to my prior thread, seems google groups doesn’t respect my setting to send immediate email to threads I start… I’ve already started the below work, which I’ll try to explain then we can pick apart :slight_smile:

I took a shot at writing my own event dispatcher that will push events out to RabbitMQ. I know this has been discussed as overkill, but I like what Rabbit offers for my situation, and the ability to round-robin messages off a queue, etc).

The dispatcher based on Phil’s work is here:

And if you care to see the Rabbit Publisher, it’s here:

I am going for only-once publishing to Rabbit, with individual components handling their own subscriptions, and if catchup is required, they would have to handle that on their own, speaking directly to EventStore (at least that’s my current thinking).

I don’t intend to start at 0,0 and pump all my messages through this, I will just switch over to it live, so I only need catchup for if this dispatcher itself goes down, therefore I bypassed the initial historical read from 0,0 on first launch.

I am also doing some funky things with the messages themselves. I wanted to have them be json all the way down in Rabbit, but I didn’t want my dispatcher to have to know about all possible event types, and I wanted to preserve EventStore info in the message, such as stream id, number, etc (for catching up later, etc). So, I use naive json serialization to create an full json version of my contract, which on the subscriber side will be EventMessage.

I’m using a convention (similar to MassTransit) of Exchange per Type.FullName, so on the subscriber side I can subscribe and create a new queue bound to that exchange.

Probably the ugliest thing is that I am storing lastProcessed in an insane way currently (json to file, real-time), so as James mentioned, it’s not transactional.

Wondering what you all think, and how you’d recommend storing lastProcessed in a safer way? File System Transaction?



I was using RabbitMQ + eventstore as well (had switched from NEventStore and just didn’t have time to get rid of RMQ). I made a stream with a max length of 5 events and I just wrote the Position of the last processed event into it. Still not transactional but I was not to worried about the incredibly rare duplicate.

Interesting approach… you say you were using RabbitMQ, what do you do now?

I quit :slight_smile:

“I don’t intend to start at 0,0 and pump all my messages through this, I will just switch over to it live, so I only need catchup for if this dispatcher itself goes down, therefore I bypassed the initial historical read from 0,0 on first launch.”

Why not just a catchupsubscription in client api that handles all of this for you? eg eveyrthing handled (also handles switching between push vs pull etc).

Also be very careful about using a queue like this for dispatching to projections especially as it becomes next to impossible to replay a projection (I have watched many teams fall into this trap, I talk about it in my polyglot data talk which is currently online. Also as a heads up competing consumers will soon be internally supported for load balancing etc :slight_smile:


Ooh! Competing consumer support would be great! Do you have an ETA?

That would make me much less likely to bother with Rabbit. I’d probably use ES for commands and events.

And yes, I planned to handle catch up at the component level directly to event store. Was really just using rabbit for competing consumer stuff for live events.

But that raises another question - how do you handle competing consumers when catching up and/or rebuilding a readmodel?

You can use competing consumers easily if you are willing to accept the possibly out of order nature of events between consumers.

My guess is it will be in the Sept “Ouro’s birthday” release in mid-late september with support both over tcp and http

I am also looking forward to competing consumers. That will change some things for me. Do you have any details on what the client surface area will be like?

It will look very similar to a subscription. Basically you can setup arbitary groups (identified by string name). You can then have as many clients as you want (though I will likely limit this to something sane like 50) reading from a given group. There is also a method for acking a message of course.

The arbitrary groups… is that the hash method to prevent consumers from handling messages for the same aggregate? I was considering using competing consumers for command handlers (as another poster mentioned previously), and it would obviously be bad for multiple nodes to handle the same aggregate at the same time. I assume it will gracefully handle node death. (My first dummy thought was to partition the command stream into 3 partitions, one for each node, but if one failed, that would leave 1/3 of aggregates not getting serviced.) Or if not handled in the box, what do you recommend to handle this?

Really, a single active and the rest passive would probably do for my purposes.

No the group is a group of subscribers (basically they share server side state). Imagine that you have three subscribers connected to a subscriber group all 3 will start receiving messages (load balanced). If one dies then the other two will continue to share the load. This is the same as mq systems work with competing consumers, it is unaware of things like aggregates etc.

Okay, perhaps that’s not what I need. I really just need to tie my command service to the GES cluster node so that it knows when it is the active one.

This is very easy to do actually I have a card up on my board to write a blog post how to do this. You can very easily take advantage of our clustering/elections to make your code work the same way (projections does this as an example)

Okay, I will be looking out for this.

WRT ES dispatcher - what is the take on this:

Is that the work that is the result of this thread or is there some more code elsewhere better suited?

What is the general consensus on publishing these events to RabbitMQ or similar for picking up by readmodels further down the line (as opposed to directly subscribing to the events from within the readmodel and dispatching them directly)

The RMQ approach is one I am familiar and comfortable with (from previous ES implementations) - GES seems to take this away as a necessity - hense my question on opinions. I did see Greg mention this might come with some stumblers above which adds to the question I guess.


My question would be how do you replay with a queue in the middle?

So the problem comes from when I add a new readmodel or want to rehydrate an existing one?

Is there an example of an event dispatcher that subscribes direct? I guess I could rip the relevant code from the above if I’m feeling lazy.

I’m sure I could figure this out myself but my fear is that I’d miss something important.

With a news subscription just forget your checkpoint and you are done (start from beginning) with a queue I need a control channel to tell something else to resend all the messages

Are there any examples of a dispatcher that doesn’t use a queue?