@yves.lorphelin @yves.reynhout just to make sure I got it right I created a JUnit test to demonstrate it:
// Create some events
JsonMapper mapper = new JsonMapper();
for (int i = 0; i < 10; i++) {
try {
WriteResult result = client.appendToStream("stream1",
EventData.builderAsJson(UUID.randomUUID(), "someEventType", mapper.writeValueAsBytes(new EventRecord(1, "foobar"))).build(),
EventData.builderAsJson(UUID.randomUUID(), "someEventType", mapper.writeValueAsBytes(new EventRecord(2, "foobar"))).build(),
EventData.builderAsJson(UUID.randomUUID(), "someEventType", mapper.writeValueAsBytes(new EventRecord(3, "foobar"))).build()
).orTimeout(10, TimeUnit.SECONDS).join();
LogManager.getLogger().info("Write complete");
} catch (Exception e) {
LogManager.getLogger().error("Write failed", e);
}
}
// Create many reactive stream clients which don't send enough requests
System.out.println("Starting slow subscribers");
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
Flux.from(client.readStreamReactive("stream1")).doOnNext(msg -> countDownLatch.countDown()).subscribe(new SlowSubscriber());
}
System.out.println("Waiting for slow subscribers");
countDownLatch.await();
System.out.println("Starting fast subscriber");
ReadMessage lastmessage = Flux.from(client.readStreamReactive("stream1")).blockLast();
System.out.println(lastmessage.getLastStreamPosition());
System.out.println("Done");
The SlowSubscriber will request 1 event, and once it receives it do nothing (i.e. no more requests), to simulate a slow subscriber. If you run the above it will create 1000 threads that are blocked in this method in ReadSubscription:
public void onNext(ReadMessage message) {
lock.lock();
while (requested.get() == 0 && !terminated.get()) {
hasRequested.awaitUninterruptibly();
}
if (!terminated.get()) {
subscriber.onNext(message);
requested.decrementAndGet();
}
lock.unlock();
}
in the awaitUninterruptibly() call. While this âworksâ it creates 1000 threads, which is not great.
It would be better to be able to provide a custom thread pool to the ConnectionState builder:
void connect(InetSocketAddress addr) {
this.closeChannel();
NettyChannelBuilder builder = NettyChannelBuilder
.forAddress(addr)
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_LENGTH)
.intercept(settings.getInterceptors());
but then the client can properly hang as it runs out of threads to handle requests, so it would require a change to ReadSubscription so that it is not blocking. Specifically, the client needs to send requests to the server when needed so that if the server sends data it doesnât get stuck in that loop. And then it becomes possible to have a truly reactive client, rather than the current fake one.
Looking at the code it should be possible to fix this. In the setup in AbstractRead in beforeStart it should call requestStream.disableAutoRequestWithInitial(0) which turns it into manual mode. When requests come into the ReadSubscription it should forward these to requestStream, but only in chunks according to some buffersize (configurable with reasonable default, say 512). When the onNext method is called you then never have to check if the client is ready and just pass on the data. If the client requested less than 512 you could always force 512 as request to server and have ReadSubscription contain a queue to temporarily buffer until client has requested up to 512 items. Then the client can request Long.MAX_VALUE and it still works (because you only send chunks of 512 to server) or 1 at a time, and still perform well.
Makes sense? Is this something that is fixable do you think?
ps. as a bonus it would be FANTASTIC if it was possible to configure reactive stream to not call onCompleted() when it runs out of events in the stream, and instead send them as they appear. This would replace the need for the Subscription API has similar issues with blocking threads. Then you have a perfect API for reactive streaming and scalable clients.