Support for non-blocking client?

I know this topic has come up before, but what are your current thoughts on providing a non-blocking client?

The issue with the current API will always be that threads get blocked in SubscriptionListener.onEvent, or a variation of it. So if you have 100 connections to EventStore that will take 100 threads most of the time, as usually the client is slower in processing events than EventStore can provide them.

The only way around this, that I can think of, is to properly implement reactive streams so that the back-pressure goes all the way back into the EventStore server rather than getting handled with blocking code (e.g. in ReadSubscription.onNext in Java). Without proper back-pressure there will always be a blocked thread somewhere, as far as I can tell.

Sooo… any plans to fix this? WDYT?

@yves.lorphelin @yves.reynhout just to make sure I got it right I created a JUnit test to demonstrate it:

        // Create some events
        JsonMapper mapper = new JsonMapper();
        for (int i = 0; i < 10; i++) {
            try {
                WriteResult result = client.appendToStream("stream1",
                        EventData.builderAsJson(UUID.randomUUID(), "someEventType", mapper.writeValueAsBytes(new EventRecord(1, "foobar"))).build(),
                        EventData.builderAsJson(UUID.randomUUID(), "someEventType", mapper.writeValueAsBytes(new EventRecord(2, "foobar"))).build(),
                        EventData.builderAsJson(UUID.randomUUID(), "someEventType", mapper.writeValueAsBytes(new EventRecord(3, "foobar"))).build()
                ).orTimeout(10, TimeUnit.SECONDS).join();

                LogManager.getLogger().info("Write complete");
            } catch (Exception e) {
                LogManager.getLogger().error("Write failed", e);
            }
        }

        // Create many reactive stream clients which don't send enough requests
        System.out.println("Starting slow subscribers");
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        for (int i = 0; i < 1000; i++) {
            Flux.from(client.readStreamReactive("stream1")).doOnNext(msg -> countDownLatch.countDown()).subscribe(new SlowSubscriber());
        }
        System.out.println("Waiting for slow subscribers");
        countDownLatch.await();
        System.out.println("Starting fast subscriber");
        ReadMessage lastmessage = Flux.from(client.readStreamReactive("stream1")).blockLast();
        System.out.println(lastmessage.getLastStreamPosition());

        System.out.println("Done");

The SlowSubscriber will request 1 event, and once it receives it do nothing (i.e. no more requests), to simulate a slow subscriber. If you run the above it will create 1000 threads that are blocked in this method in ReadSubscription:

    public void onNext(ReadMessage message) {
        lock.lock();
        while (requested.get() == 0 && !terminated.get()) {
            hasRequested.awaitUninterruptibly();
        }
        if (!terminated.get()) {
            subscriber.onNext(message);
            requested.decrementAndGet();
        }
        lock.unlock();
    }

in the awaitUninterruptibly() call. While this “works” it creates 1000 threads, which is not great.

It would be better to be able to provide a custom thread pool to the ConnectionState builder:

    void connect(InetSocketAddress addr) {
        this.closeChannel();

        NettyChannelBuilder builder = NettyChannelBuilder
                .forAddress(addr)
                .maxInboundMessageSize(MAX_INBOUND_MESSAGE_LENGTH)
                .intercept(settings.getInterceptors());

but then the client can properly hang as it runs out of threads to handle requests, so it would require a change to ReadSubscription so that it is not blocking. Specifically, the client needs to send requests to the server when needed so that if the server sends data it doesn’t get stuck in that loop. And then it becomes possible to have a truly reactive client, rather than the current fake one.

Looking at the code it should be possible to fix this. In the setup in AbstractRead in beforeStart it should call requestStream.disableAutoRequestWithInitial(0) which turns it into manual mode. When requests come into the ReadSubscription it should forward these to requestStream, but only in chunks according to some buffersize (configurable with reasonable default, say 512). When the onNext method is called you then never have to check if the client is ready and just pass on the data. If the client requested less than 512 you could always force 512 as request to server and have ReadSubscription contain a queue to temporarily buffer until client has requested up to 512 items. Then the client can request Long.MAX_VALUE and it still works (because you only send chunks of 512 to server) or 1 at a time, and still perform well.

Makes sense? Is this something that is fixable do you think?

ps. as a bonus it would be FANTASTIC if it was possible to configure reactive stream to not call onCompleted() when it runs out of events in the stream, and instead send them as they appear. This would replace the need for the Subscription API has similar issues with blocking threads. Then you have a perfect API for reactive streaming and scalable clients.

PR with suggestion for how to implement it:

1 Like

Thanks @Rickard_Oberg !
You’re in good hand with Yorick :slight_smile: