I’m currently working on a prototype CQRS/ES implementation with Event Store.
We will have multiple read models and read model generators. A generator uses a stream of events to generate the read model. Multiple event types might be involved in that, so I would think that it makes sense to subscribe them to the all stream and ignore the events we don’t handle.
I see two different approaches to how the read models can be generated:
A) A single subscription which invokes all the generators for the received event in parallel (i.e. Parallel.ForEach(GetGeneratorsForThisEvent(), …). This has the drawback that all generators only work at the speed of the slowest generator. One might argue that it is a positive side-effect that all read models are always of the same version. Having read-models with different version however complicates this logic slightly (having to start the subscription at minimum event position of all generators and only invoke those that are not of a higher event position).
B) A subscription per generator. This would mean that each generator can work at its own speed. There might be many generators in the final project though.
Which approach is better suited - or which approach am I completely missing here?
Right now, I’m somewhere at a processing speed of ~100 events/second with variant A. It could be that I have a general flaw in my prototype though (for example: every event causes an EntityFramework DbContext.SaveChanges() in each generator). If I don’t invoke the generators, I’m well beyond 100k/s.
Regards,
urbanhusky
my two pennies…
If each “generator” use a different db/storagetype I suggest a separate “generator”/subscription per db/storage.
If the data that you are denormalising are in the same db serving the same app you can probably optimize performances and complexity with one “generator” for all “related” events. If this is the case you can also distribute the workload across multiple copies of the same generator using competing consumer subscriptions.
hope this help
Riccardo
There are many variations on this. We use a single es-subscriber per process, dispatching to an in-memory queue per bounded context. These subscribers holds all the projections/generators/denormalizer and manages the transaction. This way different bc’s can have different SLA on readmodel staleness. For instance, reporting has a 1 minute buffer, while a user facing screen might have 100ms.
At startup the dispatcher queries all the bc’s for their position, subscribes from the oldest one, and filters out events for the rest. This is pretty much the only complication with above model. If we’d known all systems would run on decent hardware, we’d probably use an ES-subscription per BC, but some customers run on really low-end machines, so this has proven more stable and efficient.
Buffering is essential for throughput, transaction per event will fall apart pretty quickly for us at least.
/Peter
Buffering to an in-memory queue will inevitably lead to an out of memory exception. We’re expecting millions, if not billions of events to process.
Buffering to a queue does not mean you will leave it unbounded.
My description was for a running system. We haven’t load enough to make it an issue.
When doing replays/rebuilds we have a special tool that just read batches from ES.
YMMV, it was just an example, to illustrate that there are different solutions, depending on your requirements.
/Peter
The subscribers aren't (that) costly, so our vNext of readmodel generation has one subscriber per View/object type. Let's say we have an object called UserStats. It is of course a projection, but is stored as an object. It gets its own subscription. So would every other object. FooReview, AccountSummary etc etc. So the subscriptions are created according to relevance for write conflicts, i.e. partitioned per object type.
The subscribers aren't (that) costly, so our vNext of readmodel generation has one subscriber per View/object type. Let's say we have an object called UserStats. It is of course a projection, but is stored as an object. It gets its own subscription. So would every other object. FooReview, AccountSummary etc etc. So the subscriptions are created according to relevance for write conflicts, i.e. partitioned per object type.
I’m most likely going to use a subscription for each generator. I don’t expect to gain any speed by invoking the corresponding generators in parallel for each event.
Using separate subscriptions also makes handling different read positions easier. There is a drawback of having to deal with event deserialisation for each subscription, but that could possibly be improved on with some caching and parallelisation.
I’ve benchmarked this on my local development machine in a very rudimentary fashion.
Single subscription reads at ~100k events per second
Splitting this over 11 blank generators in parallel, asynchronously (total execution time would be merely the maximum execution time plus parallelisation overhead) breaks down to a few hundred events per second (~400), on the entire subscription.
Splitting over 11 blank generators synchronously (i.e. sequentially - which would mean that total execution time is the sum of all execution times) yields 60k events per second (each generator didn’t do anything). This is not really a useful metric because the generators did not do anything.
As soon as I hit SQL with Entity Framework with 5 simple generators, asynchronous execution is at ~300 events per second.
5 subscriptions with a single SQL EF generator behind each of them increases the rate to ~700 events per second.
Obviously my bottleneck here is SQL.
There are lots of patterns you can use to make that faster such as
batching. If you look at the subscription it also tells you when you
are live. Generally you want to batch when not live and do singles if
you are caught up
Sure, but that’s more a topic of optimisation then
I’m shifting my focus to making SQL faster (ditch EF, benchmark more lowlevel; try to optimise that as the biggest performance impact is here).
…I might explore NoSQL databases later, but I don’t want to focus on that just because the current approach failed. NoSQL might not even be an option I have available (management concerns etc.).