Efficient rebuilding of read model

I’m trying out eventstore and have created a stream per aggregate (aggregate has several different event types), ie:

Foo-1

Foo-2

Foo-3

I’m trying to create a read model from these events using Elasticsearch. What’s the most efficient way to rebuild the read model from this data?

I’ve looked at using catchup subscriptions on the category “Foo”, but the problem with this is I receive the events in time order, I don’t get each stream separately. I could process them all in-memory then flush on liveProcessingStarted, but this could take up a lot of memory. I really want a way to process each stream separately in-memory, then flush to disk.

Any advice for this?

Cheers,

Jon

Ah, I just thought of a different way to do it:

  1. Use ReadStreamEventsForward to get all events (since last known)

  2. Partition them in-memory

  3. Process each partition separately

  4. Subscribe using catchup to last known after processing

Does this sound like a good idea?

Hmmm, that method could be problematic too. Maybe I should just process events in memory then flush after a set number.

Anyone have any strategies/examples to share?

Do you only have foo aggregates?

No, but this read model is only interested in Foos

Do you have system projections running? If so you can go to the system projection for by-category and all events of streams in category foo will be in there.

Yes, I have that working, but I want to avoid flushing to disk for every catchup event when there might be many events for each aggregate. I’d like to process all past events in memory, only flushing when it’s caught up to the present.

Without knowing your volumes, I’d say thats problematic. Configurable thresholds and throttling of events depending on memory usage is what got me out of major performance issues.

Flexible buffering really helps. If you’re on .net, rx is a good shortcut, IObservable maps very nice to EventStore subskriptions.

This sounds interesting. Do you have any code to share?

I’m wondering if I could use projections somehow. I only care about the current state of the data.

Sorry, not at liberty to share productioncode, but just subscribe passing in last checkpoint, return an IObservable and you’re off. From there you can use rx extensions like Buffer, wich takes both a timespan and a max buffersize.

Also might be worth checking out Reactive Trader on github (https://github.com/AdaptiveConsulting/ReactiveTrader),,) they’ve taken the IObservable route all the way to the ui.

It’s a shame rx isn’t in the bcl, you’d avoid some of those ‘not another framework’-comments, and it’s really just the realtime version of linq…

http://seabites.wordpress.com/2012/08/29/the-money-box/

https://github.com/thefringeninja/derp.inventory/tree/master/Derp.Inventory.Web/Projections/Raven

This solution works, but is not optimal if the events are evenly distributed across aggregates, each batch of updates might only have 1 event per aggregate.

I thought of another solution: what if we use projections to store the current state of aggregate, but only keep the latest snapshot and also store the original stream position (from the category)? This way, when we are rebuilding the view model we can read the stream that contains all the current snapshots, then, once processed, start reading from the original category stream at the highest stream position we saw whilst copying the snapshots to the read model.

Is this a good idea? Is there a better way? Is it possible to delete a previous event via a snapshot?

Out of interest why would it use a lot of memory to process them all in memory before you commit to elastic search? Is your document growing with every event?

The document doesn’t grow, but the number of aggregates does. This is listening to multiple aggregate streams via a category.

Why not use windows? EG process 200k events in memory … then handle your updates. Then process 200k etc etc

That’s the same as the rx buffering. It would be good to handle all events per aggregate as one chunk, then move onto the next aggregate, then bulk insert to elastic search every x aggregates. If we use windows, then we might be inserting the same aggregate several times even though there have been no new events.

Maybe I’m over-thinking this. I should probably measure some numbers, it’s possible all my use cases will fit into memory.

If you just want to read all the streams then that’s probably the best thing to do - however you then likely won’t be able to sensibly serve everything off one catch up subscription doing your own dispatch.

This sounds like over-optimisation - Elastic Search should be able to deal with a bunch of updates every few thousand events without a problem.

James

You can do this relatively easily. I imagine you have an event on the Created of the aggregate yes? Say FooCreated

Read from $et-FooCreated when you get the event start reading its stream to the end doing your projection. Then do on next etc.

This will however introduce some complications in terms of switching from this mode to a real time mode. The way to do this is to read the last event from $all at the time you start your process of replay. When you have read through all of your aggregates (but never events past the marked point) then switch to $all or your bycategory and only read from that position forward.

This is how foreach projections work btw

Cheers,

Greg

We havent paid any attention to what aggregate produceds the events, this is handled at the projection level. Besides, some events produce several rows in one readmodel, some are aggregated etc, so I’ve never seen any reason to optimize at the level. But that depends entirely on your use case of course.

/Peter

Hi Jonathan,

We are using RavenDB as our read store and ran into similar problems - needing to batch the writes. A write per event was fine with a small amount of data but over time full rebuilds are becoming painful. We also started to have some batch jobs that would dump a heap of events into the system that would often update both many small documents and also update some aggregated value in some big document - being able to write in batches makes this much faster.

The model we have taken is to have two sets of in memory queues for catch up.

Events are taken from EventStore subscription (we just subscribe to all for the moment). All events are run through a grouping function that routes events to the queue(s) for the appropriate destination document(s).

The processor for the per document queues does the following:

  1. Grab the next batch of events

  2. Load the document (possibly from cache)

  3. Apply the events that are queued for that document.

  4. Put the document on the write queue.

The write queue then gets batches of documents and writes them in one round trip to Raven (in your case it would be elastic).

So in this way we are getting events applied to documents in batches as well as documents written in batches.

The code is open source - you can see the main piece here:

https://github.com/adbrowne/Eventful/blob/master/src/Eventful.RavenDb/BulkRavenProjector.fs

Using this method we have taken a full production rebuild from 30 mins down to 3 mins and there is still plenty of room for improvement. In particular the queue that does the grouping needs some optimization. One nice thing we have found is that during a rebuild we can set the consumer of the Raven queue to write batches infrequently (maybe once every few seconds) and it just means bigger batches in both queues.

Also each queue is bounded which should cover your concerns about memory - this means if either queue fills up then the subscriber is slowed down - you can then adjust the queue sizes to match the memory available in your environment.

The code is not quite in production yet but should be sometime in the next two weeks.

The interface is quite simple and I am hoping to support things other than Raven. Neo4j is probably next but elastic is definitely on the list of things I would like to support.

cheers

Andrew