CatchUp Subscriptions in a (scalable) Microservices Environment

Hello dear community,

I am pretty new to EventStoreDB, and I am excited to learn more about it. In fact, I am proud to cover EventStore in my Master thesis, for which I am going to implement a PoC.

The proof of concept covers, among others, how event sourcing and CQRS can be applied for (an event-driven) microservices architecture.

Let’s suppose we have a Write Microservice and a Read Microservice. The Write Microservice uses an instance of EventStoreDB. The Read Microservice uses an instance of MongoDB. Now, the main question is: What is the optimal approach for projecting events from the Write Microservice to the Read Microservice. For me, there are several options, each with different trade-offs:

  1. Message Queues: In literature, you often read about message queues that transport events from the write side to the read side. However, I also read about the out-of-order problem, especially when using multiple instances of the read microservices.

  2. CatchUp Subscriptions: The Read Microservice could use a catch-up subscription to project the data from the event store to the read model. However, what if I use several instances of the read microservice (sharing the same database)? The other read microservices would also create their own catch-up subscription, resulting in a lot of network traffic. Furthermore, the out-of-order processing problem also occurs here…

  3. A separate “projection process” or microservice (currently my favorite): In this approach, there is another simple process in between the write and read microservice; let’s call it the “projector”. The projector runs one catch-up subscription and saves the position of the last processed event. The projector populates the query database. The main advantage that I see with this approach is that we now can scale the read microservice without any problems, as the projection logic is not part of the read microservice anymore.

  4. Persistence Subscriptions: Currently not an option, as events can get out-of-order.

What do you think about these options? Furthermore, any best practices on projecting the read model so that it allows scaling of the read microservice?

Thanks in advance for any answer.

Best regards
Domenic

1 Like

Option 3 with the small change of transactionaly storing the position with the target read model is your best bet.
-Chris

1 Like

Hi @chris.condron , thanks a lot for your reply.

So this means that the latest position is stored in the target read model together with the change itself, using one transaction? What is when the target read model does not support transactions?

(Sorry if this a noob question)

You only need transactional checkpointing if your projections are not idempotent. I consider it as the anti-pattern and advise people to aim for idempotent projections.

The question itself, however, is a large topic, as there are quite a few factors not mentioned in the question.

The need for scale

I often see people spinning multiple service instances for redundancy rather than scale. A single instance of the command service can handle hundreds to potentially thousands of concurrent requests, do you expect the load that would require the command side to scale out beyond that?

When you scale the write side, are you sure that the database that supports command handling (ESDB in this case) would handle the load you expect? Scaling out the database consumer often moves the problem elsewhere. The main bottleneck on the command side is usually the speed of read-append operations with the events database.

I always use catch-up subscriptions for projectors, and I normally run them as a single instance workload. I am not sure what’s the benefit of having multiple services projecting to the same read model database except for redundancy. The redundancy concern is valid, but in most cases, the issue is mitigated by a proper backplane and observability, so the failed workload gets restarted. If you manage to run multiple instances using a control plane that will coordinate the active and stale workload, you’d still have one of them doing the work at the time as you want to preserve the global order of projected events.

If you want to scale out to improve the speed of projecting events, you might want to check the bottleneck first. If the target database is not fast enough, you won’t achieve anything by scaling projections out, as the database won’t go any faster. If you experience speed issues caused by projecting events one by one, you can use the target database optimisations like batched updates. If the issue is the time of a single roundtrip to the database and you can’t afford to batch, you can partition projections in-proc as I did here (it’s not easy, I admit).

Scaling out

If you are certain that the projection needs to be scaled out to multiple instances, there are a few ways to do it.

One way is to project events to a partitioned message log like Kafka. Then you can have a consumer group with the number of projectors matching the partitions count. Each partition will then run independently and will have its own checkpoint. You can use brokers with ordering support like Amazon SNS FiFo topics or Google PubSub with an ordering key.

Another way is to spin up multiple projector instances where each one of them would subscribe to $all with a server-side filter. It would require that the stream name contains enough information for creating a non-conflicting filter in each of the subscriptions. This way you can also use follower nodes, so the read load gets distributed across multiple EventStoreDB nodes as well.

With the last one you might get the network traffic concern, but, honestly, I am yet to see the network traffic being a bottleneck. The most obvious bottleneck is always the database, not the network. n many cases you can force the workload affinity so the network is in fact virtual.

Finally, I don’t really understand the “out of order” concern for catch-up subscriptions as they always run linearly.

1 Like

Thanks a lot for your detailed answer :slight_smile:

Yeah, thanks a lot for the advice.


Your reply covers a lot of useful advices and hints. However, I think there were some little misunderstandings, which might be due to my rather short description for which I am sorry.

Never (in my question) did I intend to scale the command side.

I also never wanted to scale a projection itself. Rather, the question is which “process” should be responsible for projections, the read microservice or a separate process that is in between the write microservice and the read microservice? Eventually, I want to be able to scale the read microservices.

I think I never said that catch-up subscriptions are out of order. I know that they are not. However, when using message brokers like Kafka or RabbitMQ with several consumers, they actually can get out of order, as the messages (events) are distributed to several instances. And this is the only thing I stated, when I remember correctly.

Kafka won’t deliver messages out of order without one partition, so you need to choose the partition key correctly.

Scaling out the command side makes total sense as it is stateless. The subscription itself, as I mentioned, is harder to scale out, and the question is still if you need to do it. I described several ways to scale out the projection itself if you need it.

If you need to scale out the query API, there’s no issue with that.

1 Like

Thanks @alexey.zimarev for your reply.

I quickly draw and attached the figure below, illustrating the approach I want to follow in my proof of concept.

projection

Maybe I was not clear enough in the beginning:
The main goal was always to scale out the query API, as I expect more queries than commands in my domain.

This is the reason why I wanted the “projection process” to be separated; simply because it does not need to scale.

I got headaches when thinking about the projection process to be included in the “Query API”. At first, I thought we could mix those two, which, eventually I realized, might not be the best approach.

We actually came up with option 3 as the best solution as well. We use it successfully in production for about a year now. The only downside is we cannot scale that process, as you mentioned. Our read and our write side is completely scalable though. For each subscription, we store its position alongside the read model.

1 Like

Thanks a lot for your reply. Can tell me a bit more about how you store the position. Do you store it in the read model or somewhere else? If the latter one, do you do it transactionally? Are your events idempotent?
Thanks a lot for your reply :slight_smile:

Our microservices containing the read models are Ruby On Rails api applications. We store the read models themselves in a Postgres db and the stream positions for each subscription in Redis. But actually it might be better to store them alongside the read models in Postgres, because then the stream positions will not be out of sync when restoring the read models from a db backup, which we had to do sometimes already. Most of our events are idempotent, but at the moment we do not assume they are ;-). We update the subscription position inside the loop that receives the events for the subscription, just before notifying the subscriber.

1 Like

Hey @nico.ritsche thanks a lot for your reply. Your explanations helped me a lot :slight_smile:

Our first attempt at redundancy had a readstore per node, but decided not to go into production like that. (had mainly to do with Windows/SQLExpress). Instead we have multiple instances of the projection writer, but only one at the time is allowed to write. Reasonably simple to do with optimistic concurrency in the readstore, if it’s supported (we use SQL Server and MariaDb).

(Checkpoint is stored transactionally with the read model, by far the easiest solution. We also have local readmodels using Lucene, with checkpoint in textfiles, not optimal)

1 Like

I had terrible experience with storing the checkpoint in Redis as the Flush All operation is too easy to execute. Your observation is correct, it’s always better to store the checkpoint in the same place as the read model is. If the whole thing is gone, it will be reprojected.

1 Like

A bit late, but here are my usual suggestions:

  • Not related to microservices, doesn’t scale out, but makes things a lot faster: partition in-proc https://eventuous.dev/docs/subscriptions/pipes/#partitioning-filter
  • Shovel to a log-based partitioned broker like Kafka or Google PubSub (with ordering key). Then, you also get partitions, and consumers that can consume and project in parallel.
  • Multiple subscribers, the same executable, but configured to process a single partition. Events that don’t belong to a given partition are ignored. Can be done by both client- and server-side filter on the subscriber