Load Balancing using persistent subscriptions

I tried to read through a lot of discussions about ordering but answers were not clear, so i’m asking:
imagine that i have three streams:

  1. profile-user_id_1
  2. profile-user_id_2
  3. profile-user_id_3

we use $by_category projection and project all streams into $ce-profile
now we can run two persistent consumers in one consumer group(using PINNED strategy) to load balance loads. so there is two clear questions:

  1. is there any chance that i get out of order events for one stream? (i.e. profile-user_id_1)
  2. if yes, what can i do to prevent that?

@sinoohe.mkh

  1. Yes, as stated in docs, persistent subscriptions do not guarantee ordering. It’s the best effort, e.g. if the event delivery has to be retried, then events may get out of order. Persistent Subscriptions implement “Competing Consumers” pattern, read more on its benefits in limitations: https://docs.microsoft.com/en-us/azure/architecture/patterns/competing-consumers.
  2. If you need a strict ordering guarantee, I recommend you use Catch-up subscriptions. They do guarantee sequential processing. Then you can define your own strategy for scaling and handling retries, etc.

You can also consider dealing with the ordering issues strategies. I explained some of them in my talk: https://www.youtube.com/watch?v=0pYmuk0-N_4.

@oskar.dudycz thanks for your response.

but if we use catch up, how can we do load balancing? i could not find a proper way to partition aggregate ids and consume them in parallel. do you have an example or article that i can follow?

They don’t have a built-in partitioning mechanism. You’d need to route them on your own. Could you explain why you need to have load balancing and sharding based on aggregate ids? Having more details, I could give a better suggestion.

For my take around scaling, you can check https://event-driven.io/en/how_to_scale_projections_in_the_event_driven_systems/.

so imaging we have N aggregates:

  • profile-user_id_1
  • profile-user_id_2
  • profile-user_id_3
  • profile-user-id_N

so each event related to user_id_1 will be written to stream profile-user_id_1. so we need to have load balancing in reading from datastore to create our read models into MongoDB.(if we don’t have load balancing we have to consume all events in sequence, and our read model for aggregates not having a lot of events will be updated with huge delay)

So do you think we can do load balancing using catch subscription?

ordering is important for us in each aggregate, so we expect to get events from profile-user_id_1 in order always

balancing in reading from datastore

I assume you mean EventStoreDB here .

if we don’t have load balancing we have to consume all events in sequence

I would also suggest you make measurement and set targets performance criteria befor ging into a more complicated design.

assuming Catch-up Subscription.
not all events : you essentially have 1 ordered sequence of events.
=> and you need to checkpoint from time to time (i.e you keep the last EventNumber you saw )
when restarting that process , it will read that checkpoint and ask to catch-up from that position.
That checkpoint can be kept inside mongo itself :

I see how checkpoint works, but how do you run multiple consumer and do the partitioning?
I basically want to partition streams and subscribe to the changes in parallel

Are you sure you need it ?
I.e is it not fast enough with a catch-up subscription and a single thread of execution ?

if not , after you measured & it does not meet your requirements
you can fan-out internally or have multiple process doing the catch-up and configured to only process certain ranges . (that’s the simplest thing to do )

1 Like

Did you consider batching? This can/should be more effective and easier to maintain than load balancing.

Also as Yves, suggested, did you already verify that sequential processing will be too slow? If yes, then what’s the biggest bottleneck?

we have too many streams, so one thread will not be performant enough. We want more threads to handle a set of streams in parallel.

about batching: batching doesn’t help us to load balance between our nodes in cluster.

If you do batch, you might not need load balancing. Typically from my experience, the bottleneck lies more in the multiple tiny operations overhead like networking, connection management etc.

You can also spin up multiple catch up subscriptions. If you have consistent stream names or event type names, you can do sharding based on that. You can also consider using filtered catch-up subscriptions that filter out the unwanted database events. See more: https://developers.eventstore.com/clients/grpc/subscriptions.html#server-side-filtering

we have too many streams, so one thread will not be performant enough. We want more threads to handle a set of streams in parallel.

How many ?
have you measured / tried out how many streams you need to do in // and how fast you can read ?
do you know where the bottelneck is , is it reading form eventStore, writing to Mongo , the deserialisation of events , the (de)serialisation of documents ?

1 Like

My underlying question is

  • do you have an issue today or are you in exploration phase ?
2 Likes

We’re at exploration phase, we are considering using ESDB to see if that fits with our needs and then we can switch to it.

we currently have 200k aggregates in one aggregate type

the read is fast, the read is not the bottleneck

at the time of consuming events when we want to update the read model, we have to call another downstream to fetch more details and then update the read model. the downstream call might be slow in some cases.
by being able to consume streams concurrently, we’ll sure that we’ll not lag too far behind.

so as i previously described above, my stream names are like:

  • profile-user_id_1
  • profile-user_id_2
  • profile-user_id_3
  • profile-user-id_N

how can i do partitioning based on stream name?
one option is to partition the events at the time of writing to eventstoredb and prefix the partition number with stream name(i.e profile-{partition-0}-user_id_1
but i don’t think that’s the solution because if we increase/decrease number of consumers in future, we have to deal with past data too.

about the fanout internally, do you mean we run one consumer, consume one by one message and publish the massage to some broker(for example using google pubsub) and setting partitioning key and then running multiple consumers?

We’re at exploration phase, we are considering using ESDB to see if that fits with our needs and then we can switch to it.

It’s always worth setting certain expectations, then verifying potential options. Assuming that a single writer won’t be enough without benchmarking may give you wrong conclusions. It may be too slow, but until you benchmark it, you won’t know.

at the time of consuming events when we want to update the read model, we have to call another downstream to fetch more details and then update the read model. the downstream call might be slow in some cases.
by being able to consume streams concurrently, we’ll sure that we’ll not lag too far behind.
we currently have 200k aggregates in one aggregate type

The number of aggregates is not an issue by itself, as EventStoreDB scales well with the number of streams. It’s critical to gather information about the “working set” so how many of those streams are accessed actively and in what distribution, and the expected throughput.

at the time of consuming events when we want to update the read model, we have to call another downstream to fetch more details and then update the read model. the downstream call might be slow in some cases.
by being able to consume streams concurrently, we’ll sure that we’ll not lag too far behind.

I think that you should try then to focus on optimising the downstream call, as that sounds like a bottleneck. Could you provide the details of this call? We could try to provide better guidance. What type of database are you using for read model?

one option is to partition the events at the time of writing to eventstoredb and prefix the partition number with stream name(i.e profile-{partition-0}-user_id_1

I think that you should work on your stream design. Why would you want to partition stream? In general streams should be short, so there shouldn’t be partitioned? Could you explain us your stream design? Typically, then lowest partition level is per stream, not per stream partition.

the downstream service is not in our control, and we can do nothing about it. that’s why we want parallel processing.

because in each aggregate id, we want to receive messages in order. i said one option is to partition the stream names, but do you have any other solution?

@sinoohe.mkh, unfortunately, it’s hard to give you more detailed guidance if we don’t know more details. Performance optimisation should be always done knowing the specific of the case. If we don’t know the answers to our questions then I can only give general guidance.

Is that a 3d party service ?
Do check if it’s not rate limited, I’ve seen people hit hard not taking that into account and //ing things will not help , on the contrary .

if the 3d party is indeed rate limited / slow …
here are a few options:

  • can you cache the data ?
  • can you update later ? (so you update your read model as fast as possible with the info you have, then another subcription is updating it with the info from that 3d party service
1 Like

that 3d party service is not in our control and we have 100ms delay which can make delay in consumption.

thanks for the answers and help
we expected a feature like consumer groups from apache kafka
https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm
to be a native feature in ESDB

that 3d party service is not in our control and we have 100ms delay which can make delay in consumption.

As Yves, asked do you need to update that together with read model, or can you do it in the separate process?

we expected a feature like consumer groups from apache kafka
https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm
to be a native feature in ESDB

We have persistent subscriptions for the built-in scaling horizontally. Tho, it’s a different mechanism than Kafka Consumer groups. However, Kafka consumer groups won’t help you if you want to distribute the workload from a single partition. There will be only a single consumer that will be processing the specific partition. Unless you put your stream events in different partitions then you won’t be able to scale it. If you do, you lose ordering guarantees.

What I’m trying to suggest here is to consider the general design, as tools can be helpful and better in specific cases, but to scale the solution properly, it’s sometimes unavoidable to change the initial design. Tools by themselves won’t solve the root cause of the scaling issues.