Hi all,
I hope you do not mind having a look at the code below. It broadcasts a bunch of events to handlers that potentially have long running times. Would this be a scalable way of achieving this? Any suggestions to improve the code would be very much appreciated. Thanks!
public class EventStoreBroadcaster
{
private IEventStoreConnection _connection;
private readonly Position? _latestPosition;
private readonly Dictionary<Type, Action> _eventHandlerMapping;
public EventStoreBroadcaster()
{
_latestPosition = Position.Start;
_eventHandlerMapping = CreateEventHandlerMapping();
}
private static Dictionary<Type, Action> CreateEventHandlerMapping()
{
return new Dictionary<Type, Action>
{
{typeof (FakeEvent1), o => FakeEvent1Listener.Handle(o as FakeEvent1)},
{typeof (FakeEvent2), o => FakeEvent2Listener.Handle(o as FakeEvent2)}
};
}
public void Start()
{
ConnectToEventstore();
}
private void ConnectToEventstore()
{
_connection = EventStoreConnectionWrapper.Connect();
_connection.Connected +=
(sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
}
private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
ResolvedEvent resolvedEvent)
{
if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") ||
resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
return;
var eventToHandle = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
if (eventToHandle == null) return;
var eventType = eventToHandle.GetType();
if (_eventHandlerMapping.ContainsKey(eventType))
{
_eventHandlerMappingeventType;
}
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
{
if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
{
//TODO: Wait and reconnect probably with back off
}
if (dropReason == SubscriptionDropReason.UserInitiated)
return;
if (SubscriptionDropMayBeRecoverable(dropReason))
{
Start();
}
}
private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
{
return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
}
private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
{
}
}
public class BaseFakeEvent
{
// some data
}
public class FakeEvent1 : BaseFakeEvent
{
// some more data
}
public class FakeEvent2 : BaseFakeEvent
{
// some more data
}
public class FakeEvent1Listener
{
public async static void Handle(FakeEvent1 eventToHandle)
{
// simulate some long running stuff
await Task.Delay(TimeSpan.FromSeconds(10));
}
}
public class FakeEvent2Listener
{
public async static void Handle(FakeEvent2 eventToHandle)
{
// simulate some long running stuff
await Task.Delay(TimeSpan.FromSeconds(10));
}
}