Deferred Processing

Can I defer a message in an event stream for later consumption (in a rational way)?

*Why do this with GES? *

I want to make my system CQRS/ES structured, and I want the underlying systems to be as simple as possible. It is clear to me that I can do what I want with EventStore+MessageQueue, but I’d love to have a EventStore-only solution.

Why do I want to defer messages?

I envision a scenerio where I’m putting in “Buy” orders. The command to buy can be sent in a number of times, but I want to reject/fail buys that take the user’s cash balance below zero.

So far my modelling looks like (BuyOrderCreatedEvent)->(BuyOrderExecutedEvent|BuyOrderRejectedEvent)->(BuyOrderRecorded).

If the sequence is Created->Executed->Recorded, Created->Executed->Recorded, Created->Executed->Recorded, there is no problem. But Created,Created,Created->Executed,Executed,Executed,Recorded,Recorded,Recorded might create a state where the resultant cash balance goes below zero.

For this reason, I think the handler that transitions between Created and Executed|Rejected should only process after there are no orders that are between Executed and Recorded; only the first BuyOrderCreatedEvent can be processed until it has a corresponding Rejected/Recorded event processed.

With a Message Queue, I would check that I’m up to date on processing messages for the aggregate, then only process the Created message if there are no Executed messages dangling. If there is a dangler, I would tell the Queue to defer the message and send it to me in a few seconds, then move forward in the queue until I got a BuyOrderRecorded to clear the BuyOrderExecutedEvent.

Is my modelling incorrect, is there a way of deferring events in GES, or should I just bit the bullet and pull a Message Queue into my solution?



It has occurred to me that I can simulate some message queue behavior with projections.

Worst idea: a projection that adds links to a stream when the primary event occurs, and then reposts them when the time expires on ‘defer’ messages in a second stream. Can a projection re-trigger when time changes? Or would I bring in processing against one of the $stats streams in order to re-trigger? (I believe the correct answer is “NEVER DO THAT JOSH”, but I’m just thinking out loud)

Probably better: have the projection stream implement the business logic of when a BuyOrderCreatedEvent can be processed (i.e. it doesn’t show up in the projection unless it is the next and only BuyOrderCreatedEvent that has no subsequent events in the workflow).

I’m going to go poke around on the DDDCQRS group and see if I can find any modeling advice (and if anyone has any RTFM links they’d like to send me, I’d be thankful!).


Sure why not say “I’m not processing event 17 until I get a close of correlation Id 12” then when you get other event just read event 12 back?

Beyond that why not just process synchronously (would get rid of problem). Or simply fine gradine pause processing eg “synchronous per order context”

You could of course also just delay any of them by putting them in another stream.

Ok. I think I’ve got it. I’ve got event streams that are partitioned by event type and aggregate:





My process handler waits for BuyOrderCreatedEvent and then begins processing. It won’t handle an additional BuyOrderCreatedEvent for userId until it’s gotten a BuyOrderRejectedEvent or BuyOrderRecordedEvent for that userId. Then it will start again.

In this case, I have the choice of either implementing some code to keep track of which in the events in the stream need to be returned to later for processing, or I can plop RabbitMQ or something in between eventstore and my process handler to track that for me. This also would give me an infrastructural solution for future events.

Is that close?


ps. I have a natural urge to make GES a golden hammer, and I’m trying to learn where GES does more than I think it does (once I understand it), and when I should be reaching for screwdrivers and sawzall.

There is some support for this on the competing branch (competing consumers)