We are implementing features and often “spin up” different environments for system- and user-testing, and about ½ of the time we do that (by appending a collection of events while catchup-subscribing), the same event is recieved twice.
They arrive basically simultaniously in 2 different workerthreads, calling the same method-handle, causing WrongExpectedVersionException…
What am I doing wrong?
Thanks in advance
/Julian
Code:
//Interface implemented by handlers
public interface ICatchUpSubscriptionsFromSpecificStream
{
IEnumerable Subscriptions { get; }
string StreamName { get; }
int? LastCheckpoint { get; }
Action OnCatchedUp { get; }
}
public interface ISubscription
{
Action HandleAction { get; }
Type RelevantType { get; }
string RelevantTypeName { get; }
string SubscriberHint { get; }
}
//In EventstoreStreamer (My event-dispatcher)
public void Start()
foreach (var catchupSubscriber in catchUpSubscribers) //my ICatchUpSubscriptionsFromSpecificStream’ers
{
var stream = catchupSubscriber.StreamName;
var checkpoint = catchupSubscriber.LastCheckpoint;
Action onCatchedUp;
if (catchupSubscriber.OnCatchedUp == null)
{
onCatchedUp = () => { };
}
else
{
onCatchedUp = catchupSubscriber.OnCatchedUp;
}
var handlers = catchupSubscriber.Subscriptions.ToArray();
var processor = new CatchUpSubscriptionProcessor(handlers, consumerBridge, _serializer, onCatchedUp, stream, _log);
_eventStoreConnection.SubscribeToStreamFrom(stream, checkpoint, true, (s, e) =>
processor.Process(e), (s) => onCatchedUp(),
subscriptionDropped: (subscription, reason, arg3) => { },
userCredentials: _credentials);
}
internal class CatchUpSubscriptionProcessor
{
private readonly string _streamName;
private readonly ILog _logger;
public Action OnCatchedUp { get; private set; }
public CatchUpSubscriptionProcessor(IEnumerable subscriptions, IEventConsumerBridge consumerBridge, IJsonSerializer serializer, Action onCatchedUp, string streamName, ILog logger)
: base(subscriptions, consumerBridge, serializer)
{
if (subscriptions == null) throw new ArgumentNullException(“subscriptions”);
if (consumerBridge == null) throw new ArgumentNullException(“consumerBridge”);
if (serializer == null) throw new ArgumentNullException(“serializer”);
if logger;== null) throw new ArgumentNullExceptionlogger;
_consumerBridge = consumerBridge;
_serializer = serializer;
_subscriptions = subscriptions;
_streamName = streamName;
_logger = logger;
OnCatchedUp = onCatchedUp;
}
public override void Process(ResolvedEvent re)
{
_logger.Trace(string.Format(“Recieved “{0}” from stream “{1}””, re.Event.EventType, re.OriginalStreamId));
//Only deserialize event if we are interested in the type at all
SubscriberEvent deserializedEvent = null;
foreach (var handler in _subscriptions.Where(x => x.RelevantTypeName == re.Event.EventType))
{
ChangeTracker.ClearTracking();
deserializedEvent = deserializedEvent ?? MapSubscriberEvent(re);
try
{
_logger.Trace(string.Format(“Handling “{0}” in subscriber “{1}””, re.Event.EventType, handler.SubscriberHint));
_consumerBridge.ConsumeEvent(deserializedEvent, handler.HandleAction);
}
catch (Exception exception)
{
_logger.Exception(exception, string.Format(“An exception ocurred while handling “{0}” in subscriber “{1}””, re.Event.EventType, handler.SubscriberHint));
throw;
}
}
}
protected SubscriberEvent MapSubscriberEvent(ResolvedEvent re)
{
var domainEvent = _serializer.Deserialize(Encoding.UTF8.GetString(re.Event.Data));
var correlation = GetCorrelationFromMetaData(re);
return new SubscriberEvent(Guid.NewGuid(), domainEvent, re.OriginalEventNumber, re.OriginalStreamId, re.Event.EventNumber, re.Event.EventStreamId, correlation);
}
protected Guid GetCorrelationFromMetaData(ResolvedEvent re)
{
var metaData = (Dictionary<string, object>)_serializer.Deserialize(Encoding.UTF8.GetString(re.Event.Metadata));
if (!metaData.ContainsKey(EventStoreConstants.CorrelationIdHeader))
return Guid.Empty;
Guid correlation;
if (Guid.TryParse(metaData[EventStoreConstants.CorrelationIdHeader].ToString(), out correlation))
return correlation;
return Guid.Empty;
}
}