Competing consumers

Hi there!

This topic was already discussed here a while back but up until now I haven’t found a viable solution to the problem:

We would like to host 2 (or more) instances of a service that reads messages from a stream and then performs some work on them. Ideally the instances would share the load of incoming messages and if one goes down, the rest would take over. Messages should only be handled once. As Event Store currently doesn’t care how many subscribers are reading from a stream, the responsibility of checkpointing and failover handling falls to us. I think Greg mentioned a while back that there was some work under way to better support such a scenario with ES, so my first question is does anybody know the status on this is?

Did anybody find an easy solution that doesn’t involve implementing your own clustering logic like heartbeat messages, finding a quorum, etc. :wink:

Regards,

Sebastian

+1 here. Still cannot get around with this without doing stuff you mentioned. Currently this is the single point of failure in my system.

What is your goal? Do you need to load balance, or just HA?

In the HA case you could do with a master/slave process using a distributed coordination service like Zookeeper or Serf. For Zookeeper this is quite easy - Curator project has templated patterns for leader election.
In the load-balancing case you would still use the coordination service but split processing between nodes according to some semantic identifier in the events (e.g. client-id, job-id) using something like consistent hashing. You wouldn’t have to implement gossip or distributed coordination protocols, but you’d still have to implement the load balancing part.

Of course hosting a service inside a node provides this for you. We are however working in this. I should have time to look through things next week over the tcp API. Is this something people would be interested in initially testing?

Thanks for the great brain teaser Vadim! Of course I have heard of Zookeeper in the past, but never took a closer look at what it actually does. I’m currently researching possible alternatives to Zookeeper and in my opinion etcd (https://github.com/coreos/etcd/blob/master/Documentation/api.md) looks even more promising as it exposes it’s API over HTTP :slight_smile: Like! It also features Windows support since v0.3.

@Greg could you please elaborate a bit, not sure if I understood you 100% :wink:

Regards

Sebastian

EventStore already provides a quorum algorithm. If you hosted ES then you could easily handle moving subscribers node to node without issue (this is how projections work). We are however nearing this capability (possibly next week allowing for time constraints if you guys want to be early beta testers)

Terrific news Greg, if you would expose such functionality over http I would be definitely interested in beta testing! Right now I don’t use tcp api at all.

Would love to help out with testing this. Our current leader election/failover solution is something we are looking to replace and if eventstore can provide something itself to help out that would be wonderful.

Hi Andrew,

could you please describe your current solution and why you are looking for a replacement?

Thanks!

Sebastian

Leader elections are already implemented in event store (see cluster node). Do you also need competing consumers?

Hi Sebastian,

At the moment our consumers co-ordinate by the leader writing a hearbeat to a document in our database and the the non-leaders polling to make sure the leader is still running. It works for the moment and doesn’t need any extra infrastructure (ie zookeeper) but I think coordinating via writing and polling the database is clearly not an ideal solution.

Before this thread I was starting to look into zookeeper.

cheers

Andrew

Hi Greg,

I don’t think that competing consumers are really necessary for what we are doing at the moment.

At the moment we have the leader subscribe to all events and coordinate updating them in RavenDB - RavenDB is doing most of the work.

Should we need to scale out the subscribers my plan was to group streams using some sort of hash on the stream name and to have one consumer per group. In this scenario it would be great if we could elect one leader per group (ie multiple leaders at one time).

Competing consumers could probably handle this as well but the pattern I see in our system is that events tend to come in related groups - by splitting events into multiple consumers what ends up happening is contention for the same Raven document. My hope is that by instead having a single stream always delivered to the same consumer we can cut down on this.

cheers

Andrew

Have you considered just using event store for this? As a very simple example you could host event store with a message written very occasionally (with memdb). The node has an internal event called “became master” if a node goes down we will shift nodes etc then a different node would get became master. We have discussed doing this for outside services but have not done an example yet of it.

Andrew, how do you manage checkpoint positions when you read streams? e.g catch-up scenario. If you have some kind of master-slave scenario you some how need to ensure you will not read the same events twice, unless it is acceptable in your scenario.

This might be even more trickier in competing consumers scenario - checkpoint becomes some kind of a shared and very frequently updated state (with locking).

Hi Raimondas,

Events to go two places in our system at the moment: RavenDB and published to RabbitMQ

For RavenDB we use the document metadata to track what events have been processed. In the scenario where the slave replays a few messages they will just be ignored once the document is loaded and found to have already had that event applied.

For RabbitMQ you always have the possibility that message will be received twice because of how RabbitMQ works so any consumers there already need to be written to deal with at least once delivery.

cheers

Andrew

Andrew,

I am a bit confused, perhaps we should take this offlist for a longer discussion?

Greg

@Greg

Do you suggest to start a new inmemory EventStore process for every subscriber? Seems a bit heavy weight as every ES process consumes at least 40MB of memory.

@Andrews

I like your approach. For scenarios where you have a DB I think it makes sense to store current leader and checkpoints in it.

No you can run many subscribers through one node.

So let’s see if I understood you correctly. I have three servers, on each of them I have one cluster node of ES running. If I only wanted leader election for my services, I could deploy three instances of each service, and then I could piggyback the leader election that is happening in ES anyway? Sounds interesting! My only concern with this approach is, that it is not documented and not part of the public API, so it can change in the future without further notice. If it was in public API, it would be definitely the way to go for me.

BTW: Thanks everyone for the great input! :slight_smile:

Regards,

Sebastian

This is true. We have been discussing freezing part of the internal api around this and making it “public”. This is exactly how projections works btw (as well as some new services which are not yet released). Check out https://github.com/EventStore/EventStore/blob/dev/src/EventStore/EventStore.Core/Helpers/IODispatcher.cs