I’m trying out eventstore and have created a stream per aggregate (aggregate has several different event types), ie:
Foo-1
Foo-2
Foo-3
I’m trying to create a read model from these events using Elasticsearch. What’s the most efficient way to rebuild the read model from this data?
I’ve looked at using catchup subscriptions on the category “Foo”, but the problem with this is I receive the events in time order, I don’t get each stream separately. I could process them all in-memory then flush on liveProcessingStarted, but this could take up a lot of memory. I really want a way to process each stream separately in-memory, then flush to disk.
Do you have system projections running? If so you can go to the system projection for by-category and all events of streams in category foo will be in there.
Yes, I have that working, but I want to avoid flushing to disk for every catchup event when there might be many events for each aggregate. I’d like to process all past events in memory, only flushing when it’s caught up to the present.
Without knowing your volumes, I’d say thats problematic. Configurable thresholds and throttling of events depending on memory usage is what got me out of major performance issues.
Flexible buffering really helps. If you’re on .net, rx is a good shortcut, IObservable maps very nice to EventStore subskriptions.
Sorry, not at liberty to share productioncode, but just subscribe passing in last checkpoint, return an IObservable and you’re off. From there you can use rx extensions like Buffer, wich takes both a timespan and a max buffersize.
This solution works, but is not optimal if the events are evenly distributed across aggregates, each batch of updates might only have 1 event per aggregate.
I thought of another solution: what if we use projections to store the current state of aggregate, but only keep the latest snapshot and also store the original stream position (from the category)? This way, when we are rebuilding the view model we can read the stream that contains all the current snapshots, then, once processed, start reading from the original category stream at the highest stream position we saw whilst copying the snapshots to the read model.
Is this a good idea? Is there a better way? Is it possible to delete a previous event via a snapshot?
Out of interest why would it use a lot of memory to process them all in memory before you commit to elastic search? Is your document growing with every event?
That’s the same as the rx buffering. It would be good to handle all events per aggregate as one chunk, then move onto the next aggregate, then bulk insert to elastic search every x aggregates. If we use windows, then we might be inserting the same aggregate several times even though there have been no new events.
Maybe I’m over-thinking this. I should probably measure some numbers, it’s possible all my use cases will fit into memory.
If you just want to read all the streams then that’s probably the best thing to do - however you then likely won’t be able to sensibly serve everything off one catch up subscription doing your own dispatch.
This sounds like over-optimisation - Elastic Search should be able to deal with a bunch of updates every few thousand events without a problem.
You can do this relatively easily. I imagine you have an event on the Created of the aggregate yes? Say FooCreated
Read from $et-FooCreated when you get the event start reading its stream to the end doing your projection. Then do on next etc.
This will however introduce some complications in terms of switching from this mode to a real time mode. The way to do this is to read the last event from $all at the time you start your process of replay. When you have read through all of your aggregates (but never events past the marked point) then switch to $all or your bycategory and only read from that position forward.
We havent paid any attention to what aggregate produceds the events, this is handled at the projection level. Besides, some events produce several rows in one readmodel, some are aggregated etc, so I’ve never seen any reason to optimize at the level. But that depends entirely on your use case of course.
We are using RavenDB as our read store and ran into similar problems - needing to batch the writes. A write per event was fine with a small amount of data but over time full rebuilds are becoming painful. We also started to have some batch jobs that would dump a heap of events into the system that would often update both many small documents and also update some aggregated value in some big document - being able to write in batches makes this much faster.
The model we have taken is to have two sets of in memory queues for catch up.
Events are taken from EventStore subscription (we just subscribe to all for the moment). All events are run through a grouping function that routes events to the queue(s) for the appropriate destination document(s).
The processor for the per document queues does the following:
Grab the next batch of events
Load the document (possibly from cache)
Apply the events that are queued for that document.
Put the document on the write queue.
The write queue then gets batches of documents and writes them in one round trip to Raven (in your case it would be elastic).
So in this way we are getting events applied to documents in batches as well as documents written in batches.
The code is open source - you can see the main piece here:
Using this method we have taken a full production rebuild from 30 mins down to 3 mins and there is still plenty of room for improvement. In particular the queue that does the grouping needs some optimization. One nice thing we have found is that during a rebuild we can set the consumer of the Raven queue to write batches infrequently (maybe once every few seconds) and it just means bigger batches in both queues.
Also each queue is bounded which should cover your concerns about memory - this means if either queue fills up then the subscriber is slowed down - you can then adjust the queue sizes to match the memory available in your environment.
The code is not quite in production yet but should be sometime in the next two weeks.
The interface is quite simple and I am hoping to support things other than Raven. Neo4j is probably next but elastic is definitely on the list of things I would like to support.