For example, I’ve been playing with some settings with CatchUpSubscription such as maxLiveQueueSize and readBatchSize, but I must admit that I’m not 100% how those work. I did notice that they are a quick way to but the exception with no errors being reported. At one point, I set live queue to 10000 and read batch size to 1000 and then the subscription would consistenly only read 1000 events and then quitting. When I lowered it to 500, it started working fine again. But now that I’ve moved this to a different environment, it’s only reading 500 events and then quitting.
Is there are guide out there that explains it? The docs don’t seem to say anything about it. If not, any help right here in the group would be be helpful. Thanks.
If a subscription quits it tells you a reason, what is the reason?
There’s no reason because it’s not actually getting disconnected or closing, it’s just stopping – No more events are being delivered. There are a several hundred thousand events in this stream but it stops at 500. If i restart the service, it’ll process the next 500 and then stop and so on. If I up the batch size to 1000, then it’ll process 1000 and stop. If I restart, it’ll process the next 1000 and then stop and so on.
But more to the point, I’m searching for understanding so that I can better troubleshoot; it could very well be something else downstream that’s hanging and not outputting any errors, but I need to be able to ensure that my EventStore setup and usage is correct lest I spend my day chasing my tail.
Are you acking them? Can you put up your config + your subscription code? There are many people running without such issues.
Do I need to ack? How do I do that? Here’s the setup code, nothing special
var settings = new CatchUpSubscriptionSettings(
liveQueueSize,
readBatchSize,
enableVerboseClientLogging,
resolveLinkTos);
var sub = _eventStore.SubscribeToStreamFrom(
streamName,
startIndex,
settings,
EventAppeared);
_disposableActionCollection.Add(() => sub.Stop());
There is a subscriptionDropped parameter that you need to set in order to get the reason for a subscription failing or dropping.
It would look something like the following:
var sub = _eventStore.SubscribeToStreamFrom(
streamName,
startIndex,
settings,
EventAppeared,
(subscription, dropReason, exception) => {
// Log out drop reason and exception
});
``
Ok I only had the disconnected and close events with logs on them. I’ll add this in and see if I get anything useful. Still looking to understand live queue size and batch size and how they could affect everything.
the read batch size says how many events will be read per request in polling mode. The live buffer size is how many events to buffer in the subscription of the volatile subscription.
Ok so the problem that I seem to be tracking right now is downstream. I’m trying to denormalize events to MongoDB (you may remember this from an earlier question) but the the MongoDB C# driver is being overloaded with connections. To help prevent this, I tried to throttle EventStore from handing me events too quickly by placing a semaphore in my denormalizer. That denormalizer processes events using a pattern in one of your examples (DynamicWrapper) which calls Process on the child class. That process method makes async calls to MongoDB but obeys the the throttle.
Does the pollling you speak of wait until all EventAppeared processes have completed? In other words, if the batch size is 100, does it want for all handlers to process the event before polling the next 100 events? That being said, here’s what I’m thinking at this moment. Is it possible that this.AsDynamic().Process(theEvent, index);
may actually be returning before the downstream async task has completed, basically ignoring the semaphore? If so, how would I even fix that?
I think I was told at one point that events were delivered as tasks through the client. The EventAppeared action that is provided to the subscription as a handler is of type void which I found curious and makes it difficult to avoid async void in my denormalizers. I’m now wondering if that was by design. Is that action wrapped in a task? Should I just make everything downstream synchronous? Should I provide an async void method as a delegate to the subscription so that I can async the rest of the way down the pipe? What’s the intended practice here?
Do NOT use async void here - you will not be able to guarantee ordering. What you can do instead is put a ConcurrentQueue in the middle, then write a class that reads from this concurrent queue and dispatches using async correctly.
I think it would be good to have an alternative api, that handles async and batching.
I just rewrote my projections to handle this, and it took a while to figure out a way that felt robust and efficient (and it’s not even properly tested yet, so not exactly done). My previous solution used rx, wich worked great, except that there was no way to handle backpreassure, making it hard to control memory during catchup.
/Peter
I don’t recommend rx here either because you will still be blocking a thread.
Interesting. Do you have an example of this queue and reader that you’d be willing to share?
event store has an implementation internally actually see the varying queuedhandler implementations here https://github.com/EventStore/EventStore/tree/release-v4.0.2/src/EventStore.Core/Bus
So how does that work then? What tells the client API to start polling for more events? For example, does it wait for the batch be completely handled first (i.e. if the batch is 500, does it wait for all 500 invocations of the EventAppearedHandler to return before asking for the next batch?)
I’m trying not to overload the mongo c# driver. It defaults the max connection pool size to 100, I have the event store subscription batch size set to match that (100) but I’m not sure if that’s necessary. In addition to that I have a semaphore in my denormalizer that is also set to 100 which I’m also not sure is necessary. Looking for advice on tuning these.
Just pause the thread of the handler at a queue size > n. The client connection handles everything else.
Oh wait, maybe I misunderstood. Were you saying that ES has an queue implementation so that I don’t need to worry about it, or were you pointing me at the ES queue implementation as an example of how I would implement it on my side?
As an example that you can just grab. ES does it internally anyways in client API but if you want more control just drop them in a queue and block on the subscription callback