Query Event Stream by $CorrelationId For Batch Journaling

One of the use cases we have is to resume batch operations. For example, the system would receive a message from a message broker to being a long running (sometimes hours) batch process.

If the job fails, it will be retried, but we would want to resume the operation from the last known journal entry. How can I query the event store by the CorrelationId so we can calculate where to resume the batch operation. I’ve attached a code sample to help illustrate the use case.

retry_batch_message.rb (1.71 KB)

My answer might not be the one you’re looking for - especially since it requires elaboration of the implementation - but it seems like the tell-tale signs of some kind of state machine are not in the code. And without that state machine, it’s much harder to know whether something has been processed, and whether something should be processed again, or should be not processed because it was already processed in the past.

So, rather than restart at the exact last journal entry, you should be able to process the same messages more than once, and the business logic (rather than the messaging tech) should know whether a message should be acted upon or disregarded.

So, it’s not a matter of querying event store, but of keeping track of the state of these operations, and knowing whether something has already been done.

It seems like there’s some kind of natural grouping of 5 messages that represents some kind of cohesive unit. I wonder if that unit is the state entity that the state machine works on.

Since I don’t know what that unit is (or if it is really a cohesive unit), I’m going to call it: Thing.

For every 5 messages, a new Thing is represented. So, your event streams would represent those individual Things. Their stream names might be: thing-1, thing-2, thing-3, etc.

When processing an inbound message for Thing #1, query the events from the thing-1 stream, create an instance of a Thing object for it, and copy relevant info from the thing-1 events into that that object.

That object might have a method named “done?” or “closed?” or something along those lines.

So, when an inbound message comes on for a particular Thing, you can know whether or not that Thing is still in-process by deriving the flat, entity view of the Thing’s past events.

The problem I see with the implementation you provided is that an “InventoryItemUpdated” event is issued to a stream named “inventory”. It would seem more like the stream might be “inventoryUpdate” (which is what I believe this “Thing” is).

Furthermore, there should likely be a stream per individual inventory update. So, an “Updated” message should likely be written to the “inventoryUpdate-{id}” stream. An InventoryUpdate class would be the entity that is “projected” from the events in that particular stream (NOTE: I’m not using “projection” here the way it’s used within the EventStore database nomenclature. I’m talking about “entity projection” specifically).

The fly in the ointment might be that the notion of an Inventory Update is not reified from the origination point of those inbound messages, and that there’s no ID created by the originator for the Inventory Update. If that’s true, then these patterns are largely not applicable. But it also might mean that the design ethos of the system might not be ideally suited to event streams (but I don’t know nearly enough about your situation to say that as a fact).

Best,

Scott

Scott,

Thank you for your detailed response, I’ll follow up with additional details and clarification.

So, it’s not a matter of querying event store, but of keeping track of the state of these operations, and knowing whether something has already been done.

I was hoping to use EventStore for this, but you are correct that we are attempting to track state within the batch operation.

The “Thing” you’ve described represents an InventoryItem (with attributes sku, and quantity). The origination message represents an action/event in the system. Eg: User wants to push all InventoryItem items to external system.

The challenge we face is that there could be 12K (as an extreme) InventoryItems included in a single request. The receiving system is an external service (SOAP, REST, FTP, etc.), each with its own way to handle updates. For example 1 update per request, batch requests with an upper limit, batch request processed asynchronously. So ideally, we’d abstract these details and normalize the interface, which we have done.

The problem is when we are dealing with large update batches, and slow API responses. For example, if we have an update which includes 500 items, but the external system enforces updates in batches of 30. If there is a failure before all 500 items are updated, the operation will be retired from the beginning regardless of where the error occurred during the operation.

In the example provided the batch size of 5 is arbitrary, in reality this number will be reasonably small enough to provide feedback to the users that the batch operation has not stalled.

Furthermore, there should likely be a stream per individual inventory update. So, an “Updated” message should likely be written to the “inventoryUpdate-{id}” stream. An InventoryUpdate > class would be the entity that is “projected” from the events in that particular stream (NOTE: I’m not using “projection” here the way it’s used within the EventStore database >nomenclature. I’m talking about “entity projection” specifically).

This seems to strike at the heart of my (lack of) understanding at how to use EventStore in a real application. What I’ve concluded from the documentation is that stream names would be based on an Aggregate/Entity, in this case InventoryItem. But what you are suggesting is that the stream would be named after the Aggregate + some unique identifier. Is a general rule or applicable to this case? In other words, is it reasonable to expect EventStore to have hundreds of thousands of streams?

This would negate the need to query by the correlationId, I could instead use the entity ids contained in the batch to get the last event for each Entity, to see if it matched the current operation. The downside here is the chatter required to gather all of the existing events, but I suppose we would need to do some testing to see if this will have an impact.

I want to emphasize again that I don’t have a complete grasp of the solution and problem that you’re facing. I can speak with more confidence to the patterns than the particulars of your situation.

I should also say that I’m not in a position to conclude whether you’re doing things the “right way”. Again, I wouldn’t want to take such a strong stance until I felt as intimate with the business and the systems as you do. And it’s likely that such a level of knowledge transfer might take more time than I have.

To some specific points…

“InventoryItem” is a class of individual instances of an aggregate root. “inventoryItem-123” is a particular instance. The stream names are ideally named for the instance. And each instance is given it’s own stream. That’s how you can get the state of a single entity by directly querying that entity’s events.

PS: You can get all events for all instances of the InventoryItem class (or “category”) from the stream named “$ce-inventoryItem”. Note, though, that this feature (category streams) is not supported yet. They’re based on projections, and projections are not supported at this time (though they’re in-development, I understand).

There’s no issue of having one hell of a lot of unique streams and stream names as far as I know. It’s what the database is designed for. It probably feels a bit alien when thinking of the implications of having hundreds of thousands (or millions) or database tables in a more familiar database system, but EventStore is a different kind of thing.

There’s another big challenge to pre-conceptions that you might watch out for: Aggregate roots aren’t necessarily the “things” in your system. Or I should say, they are often the names of processes. The “things” in a system like this are in fact the major business operations. You might have aggregate roots that reflect the kinds of entities we have with ORM (which is largely the result of database modeling), but it’s largely a coincidence rather than a design objective.

So, rather than InventoryItem, you might in fact have InventoryUpdate. And that’s desirable. Or at least, it’s not out of the ordinary. The service that knows the inventory rules and data for an item might not know it’s price (for example).

Also, it’s normal to not have all the data for an InventoryItem in a single service/aggregate/boundary/component.

All that said, you might in fact have an InventoryItem. Again, I’m not familiar enough with the solution and the problem to be able to make a good judgement from this far away from it.

For the batching issue, you could receive those 12k updates, and break them into smaller batches or individual updates to reduce the risk of the whole batch needing to be re-processed. Once each individual update is processed, the batch would be considered complete/closed/done/etc. So, you might end up with a component that receives those batches from the outside, creates smaller batches (down to the size of 1), and another component that processes the individual updates. Each time an update completes, it records an event in the batch’s stream that the update was completed. Once all the individual updates are accounted for, the sub-batch replies to the larger batch that it was done.

This is more a matter of building in reply semantics, and that’s a bit of work up front if you don’t have a generalized solution for that yet. You’d want to include something like a “replyStreamName” attribute in the metadata, and then use that stream name as the place to record the reply event. Technically, it should be a command that’s recorded and then the original process should record it’s own events, rather than letting anything write into anything else’s event streams (but that’s a finer point)

Given that there are rules (policies) about how big the batch itself can be, this BatchUpdate appears even more to be a concern all on its own.

You can report back on the process by polling an interface (or using some push messaging) that can indicate the state of the process. There are good ways and bad ways to do this though. The most obvious - hanging a JSON API of the service that returns data - is the least best. Updating some other data store with summary information about these BatchUpdate instances might be the better thing to do.

I’m hypothesizing based on patterns and principles though, there are other factors that need to be considered that I don’t have (and again, might not be able to grasp).

If you can find a solution where you don’t need to query event store for the presence of a particular ID (or correlation ID), then you’ll be better off. Otherwise, you’ll end up doing a good deal of sequential scans to figure out whether or not to take some action or whether some action was already taken.

Also, EventStore’s replication might make it appear briefly that an event you just wrote is not readable yet. So, a read-modify-write approach might simply be impossible. That’s a more subtle thing, and it’s an edge case.

I don’t know if the http_eventstore gem supports leader election in EventStore’s cluster (which is when the cluster decides to select a new node to be the master, which it will do as it sees fit). The effect of this is that the master node you thought you were connected to is now a slave node, and it is susceptible to experiencing greater latency.

All of this means that you need to rely more on reply messages than on reading from EventStore and then making a decision as to whether to do (or not do) something. IE: You can’t add to a stock level and then check if there’s sufficient stock, and if so, allocate it to an order (etc). You need to tell the system to reduce stock, and it will in-turn reply to its caller with another message that says whether that stock reduction (allocation, etc) succeeded. And upon that reply, the order can proceed. This is why you might be able to purchase something from Amazon, and then get a message later saying that there was actually insufficient stock at the time that the order went through, and that the order is delayed/refunded/etc.

Finally, I think the metadata fields that start with “$” are for internal use. I was under the impression that these should not be used to store our application-specific stuff (like correlation IDs in our apps). I could have this wrong though.

So, I fear that I might just also be making things more confusing for you. I’m throwing a bunch of stuff into the mix that might make the whole thing feel burdensome. There may be other ways and other suggestions that others have. And others might also have a better picture of your problem/solution than I do.

Best,

Scott

"
Finally, I think the metadata fields that start with "$" are for
internal use. I was under the impression that these should not be used
to store our application-specific stuff (like correlation IDs in our
apps). I could have this wrong though."

$CorrelationId and $CausationId are a bit special since projections
also honours them

For the batching issue, you could receive those 12k updates, and break them into smaller batches or individual updates to reduce the risk of the whole batch needing to be re-processed. Once each individual update is processed, the batch would be considered complete/closed/done/etc. So, you might end up with a component that receives those batches from the outside, creates smaller batches (down to the size of 1), and another component that processes the individual updates. Each time an update completes, it records an event in the batch’s stream that the update was completed. Once all the individual updates are accounted for, the sub-batch replies to the larger batch that it was done.

Given that the initial batch is calculated, and doesn’t exist as an Aggregate, what I am understanding this to mean is that I’d create an stream based on this particular batch, each time a member of the batch is updated (wether it be a sub-batch or an individual item) I’d post an event to the batch stream. This seems pretty much what I suggested in the OP, except the corelationid is moved into the stream name. It seems like it not possible nor ideal to group events by a a corelationid.

In our case, if the batch was retried,we would need to scan the list of events to resume from the last successful location in the batch, but it is less of an issue compared to having to deal with eventual consistence between the cluster.

except the corelationid is moved into the stream name. It seems like it not possible nor ideal to group events by a a corelationid.

Also, you might (likely) need to include the batch stream name in the metadata of individual inventory update. When the inventory update message is processed, it will post some “InventoryUpdated” event to that reply stream (or send a command to the batch component to instruct it to record that event). This isn’t always the same as a correlation stream name (or ID). We tend to have correlation info as well as reply info in our metadata. But it’s possible that we’re doing it wrongly, as well :slight_smile:

In our case, if the batch was retried,we would need to scan the list of events to resume from the last successful location in the batch, but it is less of an issue compared to having to deal with eventual consistence between the cluster.

Indeed. If you build up the batch entity in-memory (by querying its stream), you could add these individual inventory updates to a list that is a collection on the batch entity. If the individual item isn’t in that list, then it likely still needs to be processed.

However, its absence from that list can still be a matter of latency. The expected version header should protect you, but you’ll need to keep track of the version (position) of the last item in the stream. That position should be set as the expected version header when writing to processed event to the stream.

If I’m confusing any of this, it’s likely because it sucks communicating something in natural language that is better described pictographically. Happy to do a screen share if that’s preferable.

Best,

Scott