Looking for some feedback on an Event Dispatcher

If you are worried when putting into a queue to hold bound the size of
the queue eg:

void HandleLiveData(Event e) {
    if(catchingUp) {
         currentData.Enqueue(e);
         if(currentData.Length > limit) currentData.Dequeue()
    }

Ahh…I know you said that before and I didn’t understand. Now that I have been working with the code I see what you mean. The dequeued event will end up being processed from the pull queue, so there is no worry that it would be lost.

Thanks

Phil

Yep exactly.

In RecoverSubscription I disable processing from _liveEventsQueue by setting _livePublishingAllowed = false;
Then I have to wait while remaining enquequed events are processed by waiting on _liveDone event. There is no need to do this if you just start, but you need to wait for _liveQueue to stop publishing when connection was dropped and you try to reconnect so _lastProcessed won’t change in the middle of reading historical events. Other race conditions could apply, if we don’t ensure that live processing is stopped.

If you go the way Greg describes, where you subscribe simultaneously with reading historical events, it seems like you have to do more accurate synchronization, because you could end up simultaneously dequeuing event in case queue is full and trying to pop event from live queue in another thread. I haven’t thought through this algorithm thoroughly, so I can’t tell which way is simpler, though two approaches could be used, of course. The approach you used seems more “sequential” in nature, so is easier to reason and requires less threading-awareness, at least for me.

I see now. My mistake. I didn’t realize that ReadHistoricalEventsFrom would not return until no more slices were pulled (meaning the historical events are nearly complete).

I added

//Prevent live queue memory explosion.

if (!_livePublishingAllowed && _liveQueue.Count > LIVE_QUEUE_SIZE_LIMIT)

{

ResolvedEvent throwAwayEvent;

_liveQueue.TryDequeue(out throwAwayEvent);

}

to the HandleEventAppeared method, but it may be unnecessary unless there is a possibility that the second call to ReadHistoicalEventFrom could return some amount of events that would take a lot of processing. That seems unlikely to me though.

Thanks

Phil

You are asking for missing events with that code. It could happen that neither ReadHistoricalEventsFrom nor live messages processing will process some of events. I doubt business logic will tolerate this :slight_smile:

It is possible to add prevention of live queue overflow (that will be needed if you process received messages slower than they are appearing over subscription), but that requires more careful approach.

Hmm. I guess I am not seeing where the missing event would occur. Is it that somehow one/some of the thrown away live events could not get read from the pull just as the thread switches?

Thanks

Phil

Yes, you could discard live event exactly when you have already finished pull, but haven’t yet started processing live events. The probability is very low, but still not zero.

Would anyone care to provide some context how you would use this dispatcher? It’s not clear to me where this resides.

You could do something like this. Just start up the EventStore.SingleNode.exe first

private static readonly IPEndPoint TcpEndPoint = new IPEndPoint(IPAddress.Loopback, 1113);

var esDispatcher = new GetEventStoreEventDispatcher(EventStoreConnection.Create(), TcpEndPoint, evtBus);

System.Threading.Tasks.Task.Factory.StartNew(() => esDispatcher.StartDispatching());

Sorry…just kind of re-read your question and am not sure you were asking specifically how to start it as opposed to how to use it and where is resides.

The dispatcher is basically listening to the GetEventStore for events. As events come in they are deserialized and Published on the IEventBus that you provide. If you have read the entirety of the thread you will notice there is logic for catching up to the live stream by getting historical events published first.

You are responsible to handling the events as they come off the bus. The dispatcher just makes sure the event bus Publishes all the events that are added to the GetEventStore.

The code I posted could go into you application bootstrap if you want it to run in proc, but could also be added to a service as well that runs in a separate process.

Thanks, that’s what I figured. The part that has me a bit confused is who is catching up? The dispatcher? All subscribers? EventStore knows the last position?

I understand what the goal if this is; it’s just not clear how it’s doing that.

Regards.

The dispatcher and events handlers down stream are catching up. The GetEventStore could have been receiving events and persisting them just fine even when no dispatcher was listening.

In that scenario when the dispatcher is turned on it should publish all the events that weren’t previously published, so that the handlers can update the read store.

Depending on whose gist you look at… it may have some lastRead event persisting logic other than Position.Start. The one that Andrii and myself have posted recently does not persist that lastReadEvent information, so if you restart the dispatcher it will publish all the events in the GetEventStore streams.

Ok, that makes more sense. The part I couldn’t find isn’t there. :slight_smile:

Is it something you just haven’t addressed yet? I would think it’s an important part.

You say “when the dispatcher is turned on it should publish all the events that weren’t previously published”, then “if you restart the dispatcher it will publish all the events in the GetEventStore streams”. Is this not contradictory? Forgive my ignorance.

Sorry I didn’t articulate that very well.

If you are looking at the code at https://gist.github.com/pdoh00/4744120 you will see that the ctor sets _lastProcessed = new Position(-1, -1);

This means that each time you start this dispatcher up it will read all events from the very beginning. If you wanted to be more efficient you would save off _lastProcessed to non-volatile storage after you processed each event. If you brought the dispatcher down, or it crashed, you would restart it passing in the persisted last processed event position. That way it would not read from the very beginning. For me I just read from the very beginning and rebuild my entire read store since I don’t have many events and it will be very quick just to rebuild the entire thing.

In summary if you want “when the dispatcher is turned on it should publish all the events that weren’t previously published” to be true you need to persist the _lastProcessed and pass it into the ctor as a parameter.

Cheers

Phil

There we go! Thanks.

Interesting that you only refer to read stores. What about subscribers in other sub-domains that process these? Maybe my definition of read store differs.

In my domain I would be processing more than a billion events in less than a calendar year.

I think the number of events to process as well as how the events are processed by listeners play into how you proceed. Your system will need to be idempotent by either ensuring you never publish the same event twice or ensuring that all your down stream handlers make sure to ignore duplicate events. It sounds in your case you may want to do the former by persisting of the _lastProcessed variable and only publishing from that point forward when the dispatcher starts.

Yes, I think that’s the only way. For much of my system events are not idempotent (unless I am out to lunch). I will need to track event id’s on the subscriber to ensure duplicates are ignored.

Your dispatcher is a great help. Now that I understand what it does and does do, it gives me a running start.

If you store the _lastProcessed transactionally with the whatever the
handlers are doing you are transactional (not idempotent).

is this good, bad, or just an observation?