eventstore broadcaster to event handlers

Hi all,

I hope you do not mind having a look at the code below. It broadcasts a bunch of events to handlers that potentially have long running times. Would this be a scalable way of achieving this? Any suggestions to improve the code would be very much appreciated. Thanks!

public class EventStoreBroadcaster

{

private IEventStoreConnection _connection;

private readonly Position? _latestPosition;

private readonly Dictionary<Type, Action> _eventHandlerMapping;

public EventStoreBroadcaster()

{

_latestPosition = Position.Start;

_eventHandlerMapping = CreateEventHandlerMapping();

}

private static Dictionary<Type, Action> CreateEventHandlerMapping()

{

return new Dictionary<Type, Action>

{

{typeof (FakeEvent1), o => FakeEvent1Listener.Handle(o as FakeEvent1)},

{typeof (FakeEvent2), o => FakeEvent2Listener.Handle(o as FakeEvent2)}

};

}

public void Start()

{

ConnectToEventstore();

}

private void ConnectToEventstore()

{

_connection = EventStoreConnectionWrapper.Connect();

_connection.Connected +=

(sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);

}

private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,

ResolvedEvent resolvedEvent)

{

if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") ||

resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))

return;

var eventToHandle = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);

if (eventToHandle == null) return;

var eventType = eventToHandle.GetType();

if (_eventHandlerMapping.ContainsKey(eventType))

{

_eventHandlerMappingeventType;

}

}

private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)

{

if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)

{

//TODO: Wait and reconnect probably with back off

}

if (dropReason == SubscriptionDropReason.UserInitiated)

return;

if (SubscriptionDropMayBeRecoverable(dropReason))

{

Start();

}

}

private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)

{

return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||

dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;

}

private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)

{

}

}

public class BaseFakeEvent

{

// some data

}

public class FakeEvent1 : BaseFakeEvent

{

// some more data

}

public class FakeEvent2 : BaseFakeEvent

{

// some more data

}

public class FakeEvent1Listener

{

public async static void Handle(FakeEvent1 eventToHandle)

{

// simulate some long running stuff

await Task.Delay(TimeSpan.FromSeconds(10));

}

}

public class FakeEvent2Listener

{

public async static void Handle(FakeEvent2 eventToHandle)

{

// simulate some long running stuff

await Task.Delay(TimeSpan.FromSeconds(10));

}

}

Would persistent subscriptions be better for this so you could have multiple nodes as well?

Thanks. I am only aware of Live-only and Catch-up subscriptions. Can you please provide some more details? Thus far, I think catch-up subscriptions are the way forward in my scenario.

C

Persistent subscriptions are on dev. They implement a competing consumer model (server controlled subscription state). They are generally better for when you want to do a task based on an event and support having say n consumers on 5 servers working on the same subscription. Here are overview docs waiting acceptance on docs.

https://github.com/EventStore/docs.geteventstore.com/pull/68/files

Thanks. I will try to digest this (-:

However, if I should want to use Catch-up subscriptions, is the code provided ok?

Thanks.

C

You probably dont want to use catch up subscriptions for doing tasks
(as discussed in the linked document). The reason why is you can only
have a single subscriber (what if it goes down?)

I intent to put this into a windows service … or is this not ok? It can automatically restart (will use persist _latestPosition to start at the the correct position).

C

Persistent subscriptions lets you have say 2 windows services (one
goes down and no problem)

I assume your long running tasks are idempotent?

I will ensure that they are (-: Thanks.