CQRS - How do I replay an entire category of streams to fill a new projection

One of the point of CQRS that has been brought up many times, is the ability to replay events in order to fill a new projection. Are the any examples showing how this works? The documentation talks about it vaguely, and I’ve seen the simple cqrs example project, but I’m not seeing any examples of these concepts using EventStore. I have a few ideas, but many of them seem to overlap which worries me, so I’d rather ask for a practical example. I’m working in .Net. I’d like to see how the following things work so that I can adopt it:

  • Denormalizers sitting on my query side waiting for events to come in so that they can be persisted to the db
  • Subscribing to these events by category because I don’t think subscribing to Account-1234 is useful. I need to subscribe to Account-*
  • Which subscription type should I use for something like this? Persistent? Volatile?
  • Creating a new projection that will be stored in my query db
  • I thought I would just have to create a new structure to be saved on the query side and use the event data to build it, but should I use the build in projections functionality or either way fine?
  • Replaying events for a category so that the new projection (or simple denormalized view) can be filled.
  • How would I do this in a production environment? Do I need to write a utility for this? How would you make it a one time operation?

And then some other questions

  • I’m using NServiceBus. Should I treat events on the bus separate of events on the event stream? I thought maybe there wouldn’t be a case where an event occurred that wasn’t the result of a command (resulting in an event on the event store), but Sagas use events in to track each step of a long running process. It seems like I should keep them separate which should also simplify the need to share event objects.
    TIA

enable $by-category projection use a catchupsubscription
(subscribetostreamfrom) $ce-account

Oh and many times you will just prefer subscribetoallfrom

So I’d need to know the last seen version which is 0 for a new projection, but what about existing projections? Am I expected to store the version? Where? In the query database?

it tells you the # of each event you receive you then pass the same #
back on a restart.

Well I think get that part; if the last version I’ve seen is 10, I’d say “catch me up from version 10 and on”. So if I started the application back up, my subscribers will need to know the last seen version. Does that mean I should store the last seen version of the stream in the database? If I do, for every ID? That’s where I’m getting lost. You recommended just listening to every event, and I’m assuming you mean to just shuttle each event off to it’s respective handler, but then how would I know what version I’m even asking for. I must be misunderstanding you because this seems very vague and jumbled up.

For example, here is the example for catchup subscription that I’m looking at: https://github.com/EventStore/EventStore.Samples.Dotnet/blob/master/CatchupSubscription/Program.cs

But this oversimplifies the stream name. If the format is Account-1234, Account-1235 and so on, how am I supposed to know what version I’m supposed to start if each stream is going to have it’s own set of versions?

For SQL read model you can define a separate table for pointers and update transactionally. by_category has a separate pointer, e.g for all accounts-*. The global stream stream has a pointer for all streams.

I apologize, I’m still fairly new to this and I’m looking for a process model for this. I’m only getting bits of answers, so i’m trying to figure how to ask it better in hopes of getting a more complete answer. We won’t be starting with SQL. Our read models will be starting as document databases (Mongo, specifically) and possibly adding a SQL db later on as we find a need for it.

Conceptually, I imagined that maybe I would store “last seen version” in the database, which I’m assuming that is what you’re referring to when you say “pointers”. That would mean that I had to query the database for these numbers before subscribing to the event store so that I knew what to request in the catch up subscription. You’re saying by_category has a different pointer and global stream has ‘a’ pointer for all streams, but i don’t understand what that means. Each stream has it’s own set of versions, correct? So account-1234 may have versions 1…5 and account-4321 maybe only have 1…3. How would I even subscribe to that? If I subscribed a single account (which is useless), then I could say “gimme everything after 3 since that’s the last one I saw”, but if I subscribed to a category, there’s no common starting point, so how does that work? Would I have to just request everything? That seems off.

I watched Greg’s 6 hour class on CQRS… well at least about 4 hours of it, and I felt like I had a pretty good grasp on the concept until I tried to implement event sourcing with EventStore. I’m struggling to align the concepts to the implementation. I really wish I could white board with someone that versed in this.

Some assumptions I’ve made so far, in my proofing:

  1. In a new service, the query database knows about accounts, so each time an (any) event comes in that deals with accounts, I use the event info to update its respective record. The entity id is included with the event, so I know what record in which collection to update. It makes sense to just listen to all events and then sort of dispatch them to their respective handler as I identify them in code.
  2. If I’m subscribed to all, I’m assuming I can’t use a catch up subscription since i don’t have a way to effectively communicate what I’m catching up to.
  3. I’m starting to feel like this is an inadequate approach after reading about “delivered once guaranteed” and so on. But, I’ll get back to this later, I think.
  4. Later on, when I create a new collection that gets its data from account events, for example a running count – I’ll need to replay all events for all accounts in order to backfill the new view (I’ve been calling them projections up until this point, so I don’t know if I’m conflating ideas here).
  5. This is where I start to get confused. So, existing subscriptions just update the database because they’re competing consumers; it just pops the latest event, so we should always be in sync (which I’m starting to question), but this new one needs a full replay of the entire category – but only once, then it can go back to popping the latest event.
    It might be helpful to just look at a sample project that has implemented these things with EventStore, if anyone can point me to one. Everything I’ve seen up to this point has been a piece of the puzzle but hasn’t demonstrated a full implementation, covering a practical production scenario. The conversation so far has been incredibly vague and I’m starting to feel really dense.

In a new service

on a single node you can subscribe to *all* events or you can
subscribe to a given stream of events.

all events gives back a uint64_t p, uint64_t c key.
streams give back a uint32_t (4.0 will be uint64_t).

if subscribing to all you hand back the key tuple<p,c> to the subscription.
if subscribing to a stream you hand back the singe key to the susbcription.

The best way to consider this is that the stream (and the store as a
whole) is an incrementing log. In a stream events get monotonically
assigned positions within the stream eg:

event A 1
event B 2
event C 3
event D 4
event E 5

You are remembering the last one you saw and telling it where to start
from again. So if I have seen 2 then I go down when I come back up I
say I last saw 2. It will start with event #3.

Does that make more sense?

“all events gives back a uint64_t p, uint64_t c key”
"The best way to consider this is that the stream (and the store as a

whole) is an incrementing log."

So this is like an index in the log? So I can think of this like a point in time, and it’ll return all events (regardless of which stream) that occurred after that point in time? (trying to visualize it)

imagine all is one big log and there is an index into that log for streams.

Got it.

So if I start a new view/projection, I’ll obviously need everything. How would I approach that as a one time operation? The last seen index/tuple wouldn’t apply, so I’m assuming I’d send 0 in so that raises a couple questions

  1. How do this as a one-time operation? Do I just reset the saved pointer to zero?
  2. That sounds like it would replay the entire event store, and subsequently all handlers would be “re-run” as well. Is that normal? That sounds like it would cause problems

the position is per handler. this is a client driven subscription

So are you saying to have each handler subscribe individually now, or something more like have a global subscriber dispatch all the events to their handler and just have the handler ignore the event if the version of the event is lower than the last seen?

Both are possibilities you have said nothing about what the situation
looks like (what projections there are, how big they are, how many
there are, etc, whether they are running on the same machines). This
isn't a right and wrong thing there are trade offs. Having a single
dispatcher supporting multiple projections that replay separately is a
bit more complex to get started with but can be more network friendly
(if hosting many on a single node) but have complexity in terms of
rebuilding a single projection. Subscription per is much easier in
terms of replays but can be quite bad from a network perspective.

I haven’t said much about the details because these are brand new services so there’s only the 1 default denormalized view for each entity/category so far, but I was just looking into the future to figure out a game plan on how to approach this as it presents itself (because it will in short time). But that does add some perspective; one complicates the other. I think I can work with that. I like the single subscriber idea, but it sounds like I’ll have to figure out a way to persist the last seen index. I was hoping to find others that have solved this would be able to share an example.