Recieving the same events twice via EventStoreStreamCatchUpSubscription

We are implementing features and often “spin up” different environments for system- and user-testing, and about ½ of the time we do that (by appending a collection of events while catchup-subscribing), the same event is recieved twice.

They arrive basically simultaniously in 2 different workerthreads, calling the same method-handle, causing WrongExpectedVersionException…

What am I doing wrong?

Thanks in advance

/Julian

Code:

//Interface implemented by handlers

public interface ICatchUpSubscriptionsFromSpecificStream

{

IEnumerable Subscriptions { get; }

string StreamName { get; }

int? LastCheckpoint { get; }

Action OnCatchedUp { get; }

}

public interface ISubscription

{

Action HandleAction { get; }

Type RelevantType { get; }

string RelevantTypeName { get; }

string SubscriberHint { get; }

}

//In EventstoreStreamer (My event-dispatcher)

public void Start()

foreach (var catchupSubscriber in catchUpSubscribers) //my ICatchUpSubscriptionsFromSpecificStream’ers

{

var stream = catchupSubscriber.StreamName;

var checkpoint = catchupSubscriber.LastCheckpoint;

Action onCatchedUp;

if (catchupSubscriber.OnCatchedUp == null)

{

onCatchedUp = () => { };

}

else

{

onCatchedUp = catchupSubscriber.OnCatchedUp;

}

var handlers = catchupSubscriber.Subscriptions.ToArray();

var processor = new CatchUpSubscriptionProcessor(handlers, consumerBridge, _serializer, onCatchedUp, stream, _log);

_eventStoreConnection.SubscribeToStreamFrom(stream, checkpoint, true, (s, e) =>

processor.Process(e), (s) => onCatchedUp(),

subscriptionDropped: (subscription, reason, arg3) => { },

userCredentials: _credentials);

}

internal class CatchUpSubscriptionProcessor

{

private readonly string _streamName;

private readonly ILog _logger;

public Action OnCatchedUp { get; private set; }

public CatchUpSubscriptionProcessor(IEnumerable subscriptions, IEventConsumerBridge consumerBridge, IJsonSerializer serializer, Action onCatchedUp, string streamName, ILog logger)

: base(subscriptions, consumerBridge, serializer)

{

if (subscriptions == null) throw new ArgumentNullException(“subscriptions”);

if (consumerBridge == null) throw new ArgumentNullException(“consumerBridge”);

if (serializer == null) throw new ArgumentNullException(“serializer”);

if logger;== null) throw new ArgumentNullExceptionlogger;

_consumerBridge = consumerBridge;

_serializer = serializer;

_subscriptions = subscriptions;

_streamName = streamName;

_logger = logger;

OnCatchedUp = onCatchedUp;

}

public override void Process(ResolvedEvent re)

{

_logger.Trace(string.Format(“Recieved “{0}” from stream “{1}””, re.Event.EventType, re.OriginalStreamId));

//Only deserialize event if we are interested in the type at all

SubscriberEvent deserializedEvent = null;

foreach (var handler in _subscriptions.Where(x => x.RelevantTypeName == re.Event.EventType))

{

ChangeTracker.ClearTracking();

deserializedEvent = deserializedEvent ?? MapSubscriberEvent(re);

try

{

_logger.Trace(string.Format(“Handling “{0}” in subscriber “{1}””, re.Event.EventType, handler.SubscriberHint));

_consumerBridge.ConsumeEvent(deserializedEvent, handler.HandleAction);

}

catch (Exception exception)

{

_logger.Exception(exception, string.Format(“An exception ocurred while handling “{0}” in subscriber “{1}””, re.Event.EventType, handler.SubscriberHint));

throw;

}

}

}

protected SubscriberEvent MapSubscriberEvent(ResolvedEvent re)

{

var domainEvent = _serializer.Deserialize(Encoding.UTF8.GetString(re.Event.Data));

var correlation = GetCorrelationFromMetaData(re);

return new SubscriberEvent(Guid.NewGuid(), domainEvent, re.OriginalEventNumber, re.OriginalStreamId, re.Event.EventNumber, re.Event.EventStreamId, correlation);

}

protected Guid GetCorrelationFromMetaData(ResolvedEvent re)

{

var metaData = (Dictionary<string, object>)_serializer.Deserialize(Encoding.UTF8.GetString(re.Event.Metadata));

if (!metaData.ContainsKey(EventStoreConstants.CorrelationIdHeader))

return Guid.Empty;

Guid correlation;

if (Guid.TryParse(metaData[EventStoreConstants.CorrelationIdHeader].ToString(), out correlation))

return correlation;

return Guid.Empty;

}

}

What is in your logs around this point? Also how are you measuring
that you received it twice (doesn't seem to be any code here for that)

In general unless you are storing your checkpoint atomically with your
handler catchup subscriptions will be at least once either way.

I’ll get the exact log on monday, but I remember the log something it like his:
Recieved “RoomWasAllocatedForRoomstay” from stream "BoundedContext_Reservations

Recieved “RoomWasAllocatedForRoomstay” from stream "BoundedContext_Reservations

Handling “RoomWasAllocatedForRoomstay” in subscriber "RoomAssignmentDomainService

Handling “RoomWasAllocatedForRoomstay” in subscriber "RoomAssignmentDomainService

Retrieving “RoomAssignmentAR-d08d1bcd57994ab9a460d8ff1b74017e” from repository

*Saving changes to "*RoomAssignmentAR-d08d1bcd57994ab9a460d8ff1b74017e"

Retrieving “RoomAssignmentAR-d08d1bcd57994ab9a460d8ff1b74017e” from repository

*Saving changes to "*RoomAssignmentAR-d08d1bcd57994ab9a460d8ff1b74017e"

*Error: Concurrency Violation : WrongExpectedVersionException - Expected version 2 *

I have no code measuring the phenomenon, I saw it by breaking on exceptions in debugging and examining the threads at the time. Besides the stream with the WrongExpectedVersionException, was another stream on it’s way out of the consumerBridge after having succesfully processed the event. Exploring the callstacks I saw that the ResolvedEvent.EventId’s for the 2 threads where identical.

I guess i could store 2 checkpoints in the subscriber with a lock around it - the first being a “being processed” upon entry (to guard against doubles like this), and the second upon exit as a checkpoint for re-subscribing - But my understanding is that only the second should be necessary when recieving events via EventStoreStreamCatchUpSubscription?

I’ll definitely reproduce and doublecheck the log on monday and get back with something factual.

Thanks

/Julian

Looks like I should look closer at what happens when the subscriptions catch up. The stacktraces (for 2 simultanious threads having the same event) are actually different - i missed this before:    ReadEventsTill != ProcessLiveQueue
Is there a race-condition somewhere around "onCatchedUp" (besides in my own code probably) that I should guard against?


**Thread A:**
>	Xpectra.ReadServices.RoomAssignment.dll!Xpectra.ReadServices.RoomAssignment.Assignment.RoomAssignmentEventHandler.RoomstayPlanned(Xpectra.Events.Reservations.RoomstayPlanned roomstayPlanned, Spectra.Domain.Subscriptions.EventSubscriptionMetadata eventSubscriptionMetadata) Line 129	C#
 	Spectra.Domain.dll!Spectra.Domain.Subscriptions.EventSubscription.WithOriginMetadata.AnonymousMethod__0(Spectra.Domain.Subscriptions.SubscriberEvent se) Line 76	C#
 	Spectra.EventStore.dll!Spectra.EventStore.Subscriptions.InThreadEventConsumer.ConsumeEvent(Spectra.Domain.Subscriptions.SubscriberEvent event, System.Action<Spectra.Domain.Subscriptions.SubscriberEvent> handler) Line 11	C#
 	Spectra.EventStore.dll!Spectra.EventStore.GetEventStore.CatchUpSubscriptionProcessor.Process(EventStore.ClientAPI.ResolvedEvent re) Line 44	C#
 	Spectra.EventStore.dll!Spectra.EventStore.GetEventStore.EventstoreStreamer.Start.AnonymousMethod__3(EventStore.ClientAPI.EventStoreCatchUpSubscription s, EventStore.ClientAPI.ResolvedEvent e) Line 86	C#
 	EventStore.ClientAPI.dll!EventStore.ClientAPI.EventStoreStreamCatchUpSubscription.TryProcess(EventStore.ClientAPI.ResolvedEvent e)	Unknown
 	EventStore.ClientAPI.dll!EventStore.ClientAPI.EventStoreStreamCatchUpSubscription.ReadEventsTill(EventStore.ClientAPI.IEventStoreConnection connection, bool resolveLinkTos, EventStore.ClientAPI.SystemData.UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)	Unknown
 	EventStore.ClientAPI.dll!EventStore.ClientAPI.EventStoreCatchUpSubscription.RunSubscription.AnonymousMethod__0(object _)	Unknown
 	mscorlib.dll!System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(object state)	Unknown
 	mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)	Unknown
 	mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)	Unknown
 	mscorlib.dll!System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()	Unknown
 	mscorlib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	Unknown
 	mscorlib.dll!System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()	Unknown


**Thread B:**
>	Xpectra.ReadServices.RoomAssignment.dll!Xpectra.ReadServices.RoomAssignment.Assignment.RoomAssignmentEventHandler.RoomstayPlanned(Xpectra.Events.Reservations.RoomstayPlanned roomstayPlanned, Spectra.Domain.Subscriptions.EventSubscriptionMetadata eventSubscriptionMetadata) Line 129	C#
 	Spectra.Domain.dll!Spectra.Domain.Subscriptions.EventSubscription.WithOriginMetadata.AnonymousMethod__0(Spectra.Domain.Subscriptions.SubscriberEvent se) Line 76	C#
 	Spectra.EventStore.dll!Spectra.EventStore.Subscriptions.InThreadEventConsumer.ConsumeEvent(Spectra.Domain.Subscriptions.SubscriberEvent event, System.Action<Spectra.Domain.Subscriptions.SubscriberEvent> handler) Line 11	C#
 	Spectra.EventStore.dll!Spectra.EventStore.GetEventStore.CatchUpSubscriptionProcessor.Process(EventStore.ClientAPI.ResolvedEvent re) Line 44	C#
 	Spectra.EventStore.dll!Spectra.EventStore.GetEventStore.EventstoreStreamer.Start.AnonymousMethod__3(EventStore.ClientAPI.EventStoreCatchUpSubscription s, EventStore.ClientAPI.ResolvedEvent e) Line 86	C#
 	EventStore.ClientAPI.dll!EventStore.ClientAPI.EventStoreStreamCatchUpSubscription.TryProcess(EventStore.ClientAPI.ResolvedEvent e)	Unknown
 	EventStore.ClientAPI.dll!EventStore.ClientAPI.EventStoreCatchUpSubscription.ProcessLiveQueue()	Unknown
 	EventStore.ClientAPI.dll!EventStore.ClientAPI.EventStoreCatchUpSubscription.EnsureProcessingPushQueue.AnonymousMethod__1(object _)	Unknown
 	mscorlib.dll!System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(object state)	Unknown
 	mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)	Unknown
 	mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)	Unknown
 	mscorlib.dll!System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()	Unknown
 	mscorlib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	Unknown
 	mscorlib.dll!System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()	Unknown

I’ll close this and ask new (more focused) question regarding the other “problems”

Thanks

Ok - seems the problem is not only one but several problems.
I fixed a bug related to ambient context for change tracking, which (so far) has resolved my WrongExpectedVersionException - everything seems to be in order regarding EventStoreStreamCatchUpSubscription, as expected the problem is on our end.

The problem is defenetly related to my projections ( https://groups.google.com/d/topic/event-store/VQr_KEUP8wQ/discussion ) - The subscription is definitly working as expected…
… The log looks like this:
17:02:19 Trace Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 35 Recieved “RoomFutureAvailabilityChanged” from stream “Read_RoomAssignment_Candidates”

17:02:19 Trace Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 51 Handling “RoomFutureAvailabilityChanged” in subscriber “C:\Code\Xpectra\source\ReadServices\Xpectra.ReadServices.RoomAssignment\Candidate\RoomCandidateEventHandler.cs”

17:02:19 Debug Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 52 Event has Id “56a1820e-ddcd-49d4-9687-c398721d71a5”, and is Number “101” in the stream “Read_RoomAssignment_Candidates”

17:02:19 Trace Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 35 Recieved “RoomFutureAvailabilityChanged” from stream “Read_RoomAssignment_Candidates”

17:02:19 Trace Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 51 Handling “RoomFutureAvailabilityChanged” in subscriber “C:\Code\Xpectra\source\ReadServices\Xpectra.ReadServices.RoomAssignment\Candidate\RoomCandidateEventHandler.cs”

17:02:19 Debug Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 52 Event has Id “56a1820e-ddcd-49d4-9687-c398721d71a5”, and is Number “102” in the stream “Read_RoomAssignment_Candidates”

17:02:19 Trace Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 35 Recieved “RoomFutureAvailabilityChanged” from stream “Read_RoomAssignment_Candidates”

17:02:19 Trace Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 51 Handling “RoomFutureAvailabilityChanged” in subscriber “C:\Code\Xpectra\source\ReadServices\Xpectra.ReadServices.RoomAssignment\Candidate\RoomCandidateEventHandler.cs”

17:02:19 Debug Process in …\Spectra.EventStore\GetEventStore\CatchUpSubscriptionProcessor.cs, line 52 Event has Id “56a1820e-ddcd-49d4-9687-c398721d71a5”, and is Number “103” in the stream “Read_RoomAssignment_Candidates”