This dispatcher uses the ClientAPI to receive messages and then publishes to an event bus.
https://gist.github.com/pdoh00/4744120
I welcome any feedback.
Thanks
Phil
This dispatcher uses the ClientAPI to receive messages and then publishes to an event bus.
https://gist.github.com/pdoh00/4744120
I welcome any feedback.
Thanks
Phil
The obvious weakness here is that it is only able to dispatch from the “live stream” of events.
I think the most common and useful scenario would be to “catch up” with the events from the last checkpoint and then switch to the subscription channel as Greg suggested earlier.
Regards,
Alexey.
I don’t quite understand. Is that because the dispatcher may be down when events are still populating the event store? Basically missing events. Or is the live stream volatile?
Thanks
Phil
This looks good however it is missing a bit of code.
Right now it will subscribe and give messages but what happens if I
lose my connection for some period? We can imagine a network gnome
unplug the cable. It will only receive live subscriptions and is
missing the bitof code to ensure that everything will see a message.p
before b
To do this you need to make a differentiation between live messages
and historical messages. I have put it up before but basically the
solution is:
Subscribe -> queue
Asynchronously start reading my history
In each event check if its my tail of queue
if reached tail process queue and move to live
Greg
Cool. Thanks for the feedback. I'll update the code and repost.
Thanks
Phil
Hi Phillip,
Szymon Pobiega has a nice blog post exactly about persistent subscription. See also my comment to that post. The mechanics he describes is almost the same as Greg suggests, with a slight difference that Szymon does reading of history events until nothing is left, then subscribes and buffers events, then does some more reading of history and only then switches to online events processing from live subscription. The way Greg proposes is prone to unnecessary memory usage, especially if history is long and there is a lot of writing to the stream you are subscribing to.
But the very first thing that caught my attention: you use not thread-safe Queue from multiple threads. That’s very bad. I’d suggest using ConcurrentQueue, which is perfect for you. Also, there is no need in creating separate Thread just to dispatch events, you could use ThreadPool, while ensuring that dispatching is strictly one-threaded and lock-free. See code in https://github.com/EventStore/EventStore/blob/master/src/EventStore/EventStore.Core/Bus/QueuedHandlerThreadPool.cs for how we can do that. Pay attention to two methods: ReadFromQueue and Publish and their usage of _isRunning and proceed variables.
Thanks Anrii for the info. Very helpful. I will have a look at the blog post and the code.
Thanks
Phil
re memory. with queue you can say exactly how much you are willing to read.
queue.Enqueue(message);
if(queue.Length == max) queue.Dequeue();
New gist posted at https://gist.github.com/pdoh00/4744120. Andrii thanks for all the pointers. I have tried to take them into account with this latest revision.
Few issues still:
In ReadHistoricalEvents() I am double reading some events. I think it has something to do with me incorrectly using the slice.NextPosition incorrectly. Any feedback would be appreciated. Line 122
Currently it only reads history from the StreamPosition.Start. I think some kind of mechanism to write the last processed event to non-volatile memory could allow for only a partial rebuild of the read store. Right now if the dispatcher crashed I think to bring it back up I would either have to rebuild the read store or make all my event handlers idempotent. I think I would prefer to make sure events are fired only once as opposed to hoping all the event handlers get implemented correctly. With that said rebuilding the entire read store may be so fast that I really don’t care.
I think I have the _liveProcessingGate.Wait in StopDispatching() wrong because it ends up throwing the TimeoutException. Line 96
Thanks
Phil
re: double reading.
This is because when you send in you are saying "give me the next n
events from this point forward". This is how it is for two reasons.
1) think about what happens with the first event in the system.
2) the system can be scavenged during your walk of all the events. The
logical position that you send may not exist anymore. The backend will
handle this case and properly get you the next one.
Greg
Phillip,
If you read from (0,0), you’ll get n events from that point (but no event will ever be at 0,0).
If you read from, say, (143, 0), you’ll get n events from that point (including 143, 0 if there is an event there).
So if you read from your last processed position, you need to ‘ignore’ the event IF it’s at the same position as your last seen. However, it doesn’t always follow that you will always get a duplicate (scavenging makes this interesting!), so you do have to check.
Here is my current version of this (which doesn’t deal with reconnections etc, so it’s not really that durable at the moment hence the class name) - https://gist.github.com/jen20/ec6cd45bac755979e16a - it’s pretty similar in many respects.
Cheers,
James
Is a durable reader something we should add to the client API?
You give us an ICheckpoint?
Ok…the event dispatcher is finally working. It’s posted at
https://gist.github.com/pdoh00/4744120
Thanks to Andrii Nakryiko, James Nugent and greg Young for all the input. Some of the stuff would have taken me much longer to solve on my own. Also thanks to Szymon Pobiega for the algorithm steps for history catch up logic.
I have only done some testing so far and will continue to test/enhance/post. There are still some \TODO: in the code especially concerning dropped connection. It does what it is supposed to in the Happy Path scenario though.
Thanks
Phil
Absolutely. It would save everyone from implementing it again and again.
Cheers,
Alexey.
I think I am being a bit dense here, but I am not sure I understand the ICheckpoint in the context of the durable reader. Is the ICheckpoint the object that would be receiving the events?
Checkpoint is to save where you have received to (so in case of power outage we can bring you forward from that point)
So the pipeline is
GetEventStore saves event =>
GetEventStore Durable Reader publishes to my implementation of some IBus (or the like) =>
I track the event Position(check point) as I receive them in my bus(somewhere non volatile) =>
OnCrash() I give the check point to the GetEventStore Durable Reader which I previously stored =>
GetEventStore Durable Reader starts publishing from check point Position
Is that about right?
Hi Phillip,
Sorry for late answer. Here is my reworked variant of you dispatcher with some comments: https://gist.github.com/anakryiko/4968492
That is not production-ready code as it lacks in some details like StopDispatching doesn’t wait for everybody to stop doing their work, also there are possibilities for race conditions, etc. But as example of the algorithm itself (including restoring after subscription drop) should do, though algorithm itself seems more complicated than it seems to be (that doesn’t mean I did the simplest possible solution I went slightly different way than you in some places, but overall idea seems ok.
As for durable _lastProcessed position. You have to either remember _lastProcessed position atomically with the result of event at _lastProcessed position (so you can’t have _lastProcessed set to some position, but result of processing last event is not stored, and vice versa). The other option is to make sure your handling of events is idempotent, so you don’t crash or err when processing the same event twice. If you have handling idempotency, then you can always first save result of processing event and then store _lastProcessed in non-volatile memory. In that case, when crash occurs between you saving result and saving _lastProcessed position, on restart you will handle event at _lastProcessed again (as _lastProcessed at that time will point to previous event). Hope that makes sense to you.
I’ve found ways to simplify things slightly and magically most of previous problems were fixed (seems like StopDispatching can wait for end of processing, also many race conditions are gone now, hopefully).
But, please, be warned that I just compiled that code, haven’t event run it. Updated version of code is at the same place: https://gist.github.com/anakryiko/4968492
That looks good Andrii. Thanks for taking the time to add comments as well. I think your revisions have made the code more elegant…and correct :). I’ll do some testing with it today.
One question I have is around starting the live subscription. One reason I had the synchronization was to prevent the pull from starting while the first batch of historical events was still being processed. In the event that there are a lot of historical events I thought that the live pull could fill up memory before the catch up was mostly complete. I may be missing that in yours, but it appears that the first reads of historical event do not block the live pull.
Thanks
Phil