Reactive stream API with catchup semantics?

We were having problems with the regular catchup subscriptions throwing “consumer too slow” when we are doing rebuilds, and so wanted to switch to the reactive API (readStreamReactive), which on paper would solve it. Unfortunately it seems at the end of the stream it calls onComplete() on the subscriber instead of waiting for more events.

Is there a way to use the reactive streams API with catchup subscription semantics? If not, why? If yes, how?

thanks, Rickard

What Client is it ?
( I’m guessing Java)
ReadStream reads a stream until the end then closes . that’s the semantics of that operation.
typical usage: readStream, fold over the events, make some decision , append new events.
Catch-up semantics is , “give me everything from , … and keep sending any new events until the sucbscription is ended”
Can you describe what you do in those catch-up subscriptions ?

Building the read model for our application, specifically projecting events into Neo4j. During rebuilds some batches of events may take longer than others, and so the regular catchup subscription throws an exception. We were hoping that the reactive stream with backpressure would avoid that, since that is the typical semantics for reactive streams.

It is not defined in reactive streams itself that it has to close if there are no more events. That’s purely a design and implementation choice. I would consider that a fairly logical read option to provide on connect (close or wait for more events) actually.

If it was considered when adding reactive streams support to the API, then why was it decided not to support catch-up subscription semantics, i.e. simply wait for more events?

Hey @Rickard_Oberg, could you share what you did to make catch-up subscription reactive?

I think it is solvable by a separate product we are discussing now, which supposes to take away the pain of subscriptions that need to be always ready to receive things.

I was using the reactive stream API, hoping that it would have catch-up semantics (wait at end instead of close), but unfortunately that didn’t work, and there was no way to change it, hence this discussion.

Interesting. Is it more of a pull API like the Atom feed?

I’m asking you because we don’t expose a reactive streams api for catchup subscriptions. I was wondering what approach you did to see if I can help.

No, it’s more about an alternative way to push the events to you. Like we take care about managing the subscription with all the necessary bells and whistles, and you just need to comply with some API to receive the events (and checkpointing).

@alexey.zimarev any progress on this? We’re still having problem with this error, and seemingly no way around it.

Is there at least a way to configure what the timeout of this is? Again for reference, it’s the timeout that causes this exception:
“Operation timed out: Consumer too slow to handle event while live. Client resubscription required.”

If I could just increase that to a minute, that would solve it too.

To give you a sense of how bad it is my logs are currently showing the below. This is when an event producer creates events in eventstore, and the read model subscription throws these exceptions:

2023-10-20 17:26:33,210 [grpc-default-executor-25] WARN   c.e.c.c.s.e.s.EventsPublisher: EventStore aborted, reconnecting
io.grpc.StatusRuntimeException: ABORTED: Operation timed out: Consumer too slow to handle event while live. Client resubscription required.
	at io.grpc.Status.asRuntimeException(Status.java:539)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
2023-10-20 17:26:33,458 [grpc-default-executor-25] WARN   c.e.c.c.s.e.s.EventsPublisher: EventStore aborted, reconnecting
io.grpc.StatusRuntimeException: ABORTED: Operation timed out: Consumer too slow to handle event while live. Client resubscription required.
	at io.grpc.Status.asRuntimeException(Status.java:539)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

So on the first exception we reconnect, and then 200ms after that the new subscription times out again. So is the “we’re live, consumer is too slow” timeout 200ms? I feel like there’s something seriously wrong, but I can’t find any settings to change to fix this, and no notes in documentation about it. Please help :slight_smile:

We will announce the new tooling soon, it will be available as a preview component for all paying customers, but not in ES Cloud for some time. ES Cloud should get the feature somewhere around the year edge.

Concerning that particular issue, the best way to address it is to open an issue in the Java client repo, so the team can triage it and either fix it or route it to the ESDB team.

Looking some more at the docs, and what is happening when we get these issues, it feels like with the current client API it will never work, not easily anyway. Here’s why.

We are using a subscription so that we can have a read model being continuously updated. But when we do an import of events to a stream the “live subscription” handling will ALWAYS fail, because there are just too many events to process, temporarily, and since our SubscriptionListener.onEvent handler blocks while waiting for the read model to be updated, which is always going to be slower than the rate of events coming in during these situations, the connection fails with “consumer too slow”, we restart the subscription, fails again, etc.

But we can’t use the other read operations because they don’t have support continuous listening. With the reactive streams they stop when there are no more events, and with the normal reads they return empty if there are no more events to consumer, rather than wait.

If either reactive stream or normal read would have an option to “wait until you can return a result”, then all would be perfect, and there would be no blocking anywhere.

I really don’t understand how I’m supposed to use this client API at this point. It almost seems like I have to start with a reactive stream, switch to subscription when it ends, then switch back to reactive stream when I get “consumer too slow”, and then switch back to subscription again, and so on.

Is this really the client experience that is intended? Am I missing something here?

do you process the events one by one ?
can you process them in batches into the target store or shard the processing ?

in both cases you need a small infrastructure peice of code in that will either

  • wait for x events or time out
  • shard internally and have multiple handlers ( assuming 1 process & threading here )

The error you’re seeing typically occurs when the server is also having a lot of appends at the same time when you are caught up .
Is this the case here ?

All event processing in our code is batch oriented, using both reactive streams with batching and LMAX Disruptor in batch mode.

Yes, the specific case is that we are running a batch importer that quickly append a lot of new events into the stream, and yes the read model at that point is caught up.

For anyone who will hit this discussion, the issue will be fixed when this issue is completed Subscriptions using the enumerators to switch between live and catchup · Issue #4089 · EventStore/EventStore · GitHub