CatchUpSubscription when running multiple instances of an application

We normally run 2 instances of an application, and as there is no consumer group functionality with CatchUpSubscription - this would mean that each event will be received by each subscriber in each instance of an application. I’m curious to what others using CatchUpSubscriptions are doing - are you only running one instance of an application?

It’s would be quite trivial to implement logic on the client/subscriber side so that each event only gets processed one time. But would this be considered an anti-pattern, and does it come with a performance penalty?

Hi Kristoffer,

It depends (of course) on what your use case is, but if you want to be able to scale the number of instances of your application, then it would simply require your application to store the checkpoint somewhere so that all the instances can read and write it.

Did you check the persistent subscription already? Persistent subscriptions | EventStoreDB Documentation It might solve your problem. Or is there another reason you want to use the CatchUp?

Cheers,

Peter

1 Like

Didn’t read through the documentation well enough. I will check out persistent subscriptions. thanks!

In many scenarios, in particular when using a subscription for projecting events to another database (update state snapshot), you want events to be processed in order. When running multiple consumers, that cannot be guaranteed, so persistent subscriptions don’t provide any guarantees about ordered event processing. Some consumer assignment strategies attempt to mitigate it, but it’s still not guaranteed. Consumer groups work fine on a partitioned log, but EventStoreDB has a single log.

Running a single instance of a catch-up subscription is fine in many cases, but you have to have properly defined SLAs for the projection lag and have established monitoring for SLA breaches.

For scaling out, you really need to evaluate the needs of your system. Of course, if you project events to another infrastructure and it takes, for example, 5ms to execute one operation, you will never be able to project more than 200 events per second. If you need to project more, you still have options:

  • Introduce batching. It might be that from those 5ms you’d have 4ms for the network round-trip, so if you make fewer calls things will be faster. Elastic is a good example, as it takes roughly the same time to call the index endpoint for 1 document and 100 documents. Batching, however, introduces a longer time gap, as you’d need to accumulate updates.
  • Shovel events linearly to a fast, partitioned log-based broker (Google PubSub with ordering key, PubSub Lite, Kafka) with a domain-specific partition key (customer id, order id, country, etc) and scale out the consumers using the capabilities of the broker.
  • Partition in-proc using something like https://eventuous.dev/docs/subscriptions/pipes/#partitioning-filter

Long time no see, Kristoffer, btw :slight_smile:

Long time no see, indeed :slight_smile:

I actually have a couple of event sourced systems running in production today. And more to come - most likely. Much of it based on what I learned from you earlier, Alexey :slight_smile:

It’s basically built on Scylla and Kafka. Storing all events in Scylla and publishing it to Kafka. This is risky, I know. Since this operation is not transactional. Which is one of the reasons why I want to migrate to EventStore. And now that EventStore is available as cloud solution in AWS - this is really getting interesting for us. Even though the price is a bit steep :stuck_out_tongue:

So right now I’m doing some research by refactoring one of these systems to EventStore. In the same process I’m looking into the projection side as well (infrastructure, event processing / projection management).

This is a system which is handling orders; placement of orders, accepting orders, closing orders, modifications of orders, cancellations etc. So we have Order as the aggregate root, very simplified - let’s say it looks something like this: Order(id, orderLines) where an order line maps to a specific product. And a couple of the projections that we have:

  • OrderProjection - which is more or less a reflection of the aggregate root (current state)
  • ProductTurnoverProjection - which keeps track of the accumulated turnover for each product.

Very high level how the projection management has been designed:

Projectionist(projectors):
  onEvent:
    projectors.forEach:
     fetch the projector's current position 
     if the event position matches the expected position: 
       projector.project
       projector.updateCheckpoint

Projector:
  project:
    fetch document to update (can be multiple documents to update pr event - but each document is updated sequentially using optimistic concurrency; each document is versioned)
    update document (if matches expected document version)

This has worked really well. But I see a big issue that the Projectionist can use ~200ms pr event. When we are using a partitioned kafka topic the projection code can scale - and this works for us at the moment. But I really need to address the projection management handling, and perhaps the infrastructure we use read side. Which is Scylla. I assume it get’s troublesome because of lots of read/write pr event. Which kind of is required since it does not support transactions. We are also pushing OrderProjection (current state) into Elastic - so that we are able to do more querying.

Another side note to further complicate:

  • The ProductTurnoverProjector depends on getting the current state from the OrderProjection (which then is required to have projected the incoming event beforehand). If this was not the case all the projectors could run in parallell.

So I’m a bit curious if you have any input to this flow, databases suitable for read side. I believe you’re a fan of RavenDB? :slight_smile: Which is both queryable and supports transactions. But this is quite expensive, right? Also it might be that we would be better off designing our projections differently.

I saw you mentioning in another post here that you have a event sourced subscrition handling ~1M events per hour. That’s quite impressive! Can’t be much interaction with infrastructure in the event handling here?

This post got both a bit long, and a bit off topic :stuck_out_tongue: But any input is appreciated :smiley:

Hi Kristoffer,

We are actually working with RavenDB for projections. What is really cool about Raven is that you can create indexes on data that act like materialized views. So whenever data changes, the indexes are updated in the background (stale). Also it is possibly to subscribe to Raven events as well. So when you have some sort of caching mechanism on the read side Raven events can be used to invalidate cache.

Cheers,

Peter

The problem with Raven, which is also reflected in the projector flow you described, is that the main flow for Raven is load-change-save. In ABAX they moved away from Raven long ago in favour of MongoDB. There, you can submit update operations with an upsert option, and in 99% of cases, it allows building idempotent projections that don’t care about the current document state. Of course, the requirement there would be that the update doesn’t manipulate the existing document values, but overrides them instead.

I know that it’s not always possible to predict what information will be needed in the event for such a model to work. But, when you find out that your events are too thin, I’d rather enrich them in the new version, so all the future projecting will happen this way, and create an upcaster as a fallback for historical events. We normally don’t replay history very often. Here also comes a frequently seen fallacy that when you need a new projection, you should let all the historical events to be processed by it. However, in many cases you don’t really want all the ancient stuff in the new read model. If you remember ABAX trips, it’s a large data set accumulated over 19 years, but 90% of the queries go 3 months back in time max and the remaining 10% - one year back in time. So, any new read model can choose the period of history it needs to rebuild, or even start afresh and declare that the new feature is only available from now and there’s no history there.

1 Like

Regarding the cache, I’d rather prefer to build cache as a read model, especially if it’s persisted (Redis and alike). Another option is to seed the cache from another read model like you do, but update it from a separate, independent subscription.