Best Practice on Populating a Read Model from EventStore

In light of the experience that people have in developing an event-sourced system, I wanted to ask what is the recommended manner of populating a read model from Event Store?

If, for example, I bring up a Docker container with Redis to hold the current state of orders in it what is the recommended way (1) to bring the Redis instance to a non-stale state and (2) to ensure that the read model is kept up to date?

It seems pretty clear to me that a catch-up subscription is the way to go insofar as it can read events from the beginning of a stream and fires a callback when you have caught up. However, where I am not so clear is what I should be reading from: a projection? Can I get state from a projection without needing to replay events?

I built “synchronizer” processes that are responsible to keep up to date read models (relational an nosql dbs). They subscribe domain events with catch up subscriptions (no need to use projections currently in my scenario) and they also store the EventStore position of the last processed events. In that way I don’t have to replay old events.

The way we have it now:

n different handlers for the type of read model you have.

These all implement some interface IMessageHandler and by that at startup the base class of the handler extracts all valid types by reflection.

The msg processor subscribes to all stream. It has a list of handlers that it will feed with events. It checks for every handler if the current received event is among valid ones, and if not already processed (processed event ids are stored in memory by the handlers). Additionally every handler has it’s own stream with maxCount 1 where it puts a checkpoint entry after successfully handling an event.

We use document db in azure to store the view models. A handler has a method where it takes the event, it creates/loads the viewmodel document and updates properties on it, saves it back.

There’s a rebuild function. It will - parallel to the live subscription to all stream - open a new subscription from start, it will rebuild all the viewmodels in memory. When catched up, the original all subscription will be closed and all viewmodels kept in memory will be upserted, effectively replacing current ones. Then the original subscription to all stream will be started again from where it left off (reading the checkpoint).

It works well. I had concerns of the model being out of synch. Haven’t noticed any of that yet.

The rebuilding (well in fact, the upserting, that’s the heavy part…) can however become lengthy eventually, when the db grows. I will be looking at specifying individual viewmodels that I suspect are out of synch, I will rebuild the entirety in memory, and then upsert only the models suspected of being out of synch.

Do you use the same classes to populate the read model with events as you use in the write model?

We see ourselves having, for example, a product service with an event-sourced Product aggregate and an order aggregate with an event-sourced aggregate (and so on). We would want a read model (Redis or ElasticSearch) to hold the latest state of the orders with a description of the products ordered. Would you say that reusing these aggregates to build the read model makes sense?

Normally no. The state required in the write model tends to be
different than in the read model

That makes sense.

I guess where I am unclear is whether the state of the read model is built up in code by subscribing to events or it can be derived from some kind of stateful projection in EventStore.

Built from events, you could do the same in an ES projection but
likely you want more than just key/value querying semantics of
resulting state

Thanks for your detailed response, Jarchin.

A handler has a method where it takes the event, it creates/loads the viewmodel document and updates properties on it, saves it back.

So, like rehydrating aggregates by replaying events in the write model, you are also effectively rebuilding the view models by replaying events?

That’s right.

It’s amazing how fast it is now to implement new functionality, and the possibilities for building views is endless.

The documents are schema free, I can at any time decide that I want to change a view model entirely, I just let the builder listen to other events or use other properties of events. Then in frontend servers I change mapping and in UI accordingly.

I started out by letting every handler carry out updates on several documents. It’s obviously risky, but I wanted to keep number of handlers low. With the possibility to rebuild it is not something critical. However, I’m moving away from that.

All events are handled sequentially in one thread. The subscriber dumps the events into a queue and the builder works it down in it’s pace.

When running scenario tests where I simulate activity (20 users register, do certain things etc.) it will take a little while for the read model to catch up. I’m not sure how performant this is. Having a couple of 100 users at the same time would present some problems as it seems. The issue here being one upsert per event. I was thinking that maybe I should do a periodical upsert instead, every 10 s or so, of changed entities only, and have them all in memory, checkpointing after upsert instead. Maybe there’s some better solution to it.

"

I started out by letting every handler carry out updates on several
documents. It's obviously risky, but I wanted to keep number of
handlers low. With the possibility to rebuild it is not something
critical. However, I'm moving away from that.
All events are handled sequentially in one thread. The subscriber
dumps the events into a queue and the builder works it down in it's
pace.
"

Queue/Thread per view is the natural partition point. And would likely
solve many performance problems

For performance, subscribing to or handing events on a per view basis is probably ideal if you can do that.

For me, all my event handlers subscriber to $all and I built a consumer who chooses to handle events based on the stream id’s hash. Just a simple (StreamId.ToHash() % instance count) == instance id you can see the code here https://github.com/volak/Aggregates.NET/blob/master/src/Aggregates.NET.Consumer/Internal/CompetingSubscriber.cs

It allows me to setup 12 handlers for the same set of view models with each handling a small piece of the stream. Of course it doesn’t work 100% well if you depend on ordering but there is a retry mechanism also built in with a strong UOW pattern to prevent mangling the database when processing events out of order and when working smooth I rarely see any retries.

Projections should be persisted once they are built.
Given there are a lot of deployments per day when using continuous delivery - you dont want to replay all events (it could be millions)

To continue building read model after process restart - you can store some kind of “position” - global sequence number for example (no idea how to do it without global sequence though)

In my implementation we had few steps in RM building evolution.

  1. Just listen to “All” sequence, and pass event to all interested handlers

Here the most obvious problem is with events processing in memory.

suppose we are on the step of processing event number 255

and there are two handlers interested in it A and B

if we just do it like :

for(var handler in handlers) handler.Process(e);

if A fails - B cannot process it,

if we wrap processing in try catch or if we do it thread per handler a bit different problem appears

for(var handler in handlers) handler.ProcessAsync(e);

A is ok, but B fails - - but the position is marked as processed (read model is marked to be caught up with 255 position)

but that’s not true

now if B tried to push this to dead letter queue - that’s not guaranteed to be done if the process is exiting for example

So the second stage was creating second version of processor:

  1. Do processing in two stages.

in first stage one handler reads the “All” and sends to secondary event store by inserting to as many streams as are views to be built - event per view, write are idempotent - so

the code

for(var view in views) view.Enqueue(e);

may fail - but thats ok - after restart or re-connection it can be idempotently written (or not) to the stream again

second stage handlers having their own stream save their positions independently

also having second ES - the processing can be done on different server

the secondary handlers also log per-view dead events in per - view dead streams (for debugging) - I sketched down the idea of secondary processing here http://blog.devarchive.net/2015/11/how-to-process-sequence-of-events.html

The system became stable

but the most uncomfortable problem still exists

You need to rebuild read model after schema changes.

So I have idea about 3rd rewrite

This will differ from previous by inserting one processing stage between per-view stream and original stream

So the next version will have 3 stages

  1. main handler writing to secondary ES, by writing event type per stream

  2. per view intermediate handlers keeping per view streams by tracking all type streams as input - these are known from projections itself

  3. per view handlers work as before

so now when schema changes you will need to increment a “projection version” - this version will be used to locate correct per view stream (it’s name will include version)

now - the input to the 3-rd handler will be 2 channels

a) the “main” handler provider reading all secondary ES events - and whoever is up to date will use them

b) the per view stream - and if its not kept up with main - will be rebuilt from per-event-type events

this way once you change read model schema and mark it’s version with higher number - the system will catch up very quickly once started

One might think there can be high latency for 3 stage processing

but I use secondary “fast” channels for triggering next stage faster (http://blog.devarchive.net/2015/11/reducing-latency-in-cqrs-applications.html)

But in summary I think the way I chose led me to re-creating map-reduce framework, so I try to find some existing alternative which provide some good and reliable map reduce (because I still not sure about performance implications when using home grown processor)

Hi David,

something to add…

As we know the Aggregate is the Domain context boundary where behaviors are exposed. In my scenario the aggregates don’t expose any public properties or other objects. In my unit tests I only test the events that are generated from the aggregate public behaviors.

Talking about read models, as an example, I can build a synchronizer for ElasticSearch and another one for Neo4J. Each of those synchrizers are using special DTOs classes that are shaped and decorated with the specific requirements coming from these different storages and they are adherent to each view or part of a view if you work with a composite UI.

I’m also saving the EventStore’s Position (long commitPosition, long preparePosition) of the last synced event in order to avoid replay from the start any time I restart the synchronizer process. In ElasticSearch the position is just another document in the same index. In Neo4J it’s a node. Each synchronizer saves and reads its Position.

Hope this help.

Riccardo

Thanks, Riccardo.

I’m planning to implement a first pass at something that can generate read models this week. So, I’ll see how it goes!

Thanks to all who have responded!

These are my thoughts about a solution to populate a read model which I’ve based on this and previous discussions on the Google Group. Some people mention using a queue to process the events but it seems that this solution (based on a previous comment from Greg) is more pull based and in less need of throttling events.

Does this seem sensible?

And #4:

Once all events less than the stored event number have been processed switch to a catch-up subscription and for each subsequent event load the appropriate view model from the data store, apply the event, and persist it with the new state.

But the more I think about this the more concerns I have.

Greg if I follow your advice here wouldn’t that mean I am just rehydrating the same aggregates in my write model?

No read models are different.

Your "write model" state is a projection. Your "read model" state is
also a projection. They may look similar they may look vastly
different (the usual case)

Are there any code samples out there that you would point to as showing how it should/could be done?

I think I’ve definitely taken a wrong turning in that I am just reading all events from an aggregate stream back into a viewmodel object which is just giving me - an aggregate.

catchupsubscription (from all/from stream) is probably the easiest.
Just dispatch events after and keep checkpoint (it takes a checkpoint
on the subscribe)