Client subscription and resubscribe issues?

I’m writing a subscriber to all events using the .NET API and Event Store 3.5.0. I’m having issues getting durability built into my subscriber. I’m passing a method into the SubscriptionDropped parameter when I do my connection.SubscribeToAllFrom. All works great for happy path. Events appear and get processed as expected. To test durability and recoverability, I’m shutting down Event Store and starting it back up. When I do this, my SubscriptionDropped method gets called as expected and I perform a SubscribeToAllFrom again. However, it seems like even though the initial subscription is “dropped”, it’s still handling events that appear. I’m seeing this via Debug output where the last checkpoint record is being written multiple times on different managed threads after an event occurs. Is there something I need to be doing before resubscribing after a subscription drop?

My Debug output looks like this (commentary in bold):

(Initial subscribe on startup)

ReadModelEventSubscriber (1) subscribing to all events

(Shutting down and restarting Event Store)

ReadModelEventSubscriber (11) subscription dropped with reason ConnectionClosed. Exception: Connection was closed.

ReadModelEventSubscriber (11) subscribing to all events

ReadModelEventSubscriber (11) is now resubscribed after the subscription dropped

(Shutting down and restarting Event Store a second time)

ReadModelEventSubscriber (9) subscription dropped with reason ConnectionClosed. Exception: Connection was closed.

ReadModelEventSubscriber (9) subscribing to all events

ReadModelEventSubscriber (9) is now resubscribed after the subscription dropped

(I send a single event and the event is handled three times and the same checkpoint is written three times)

ReadModelEventSubscriber (10) writing checkpoint 2599714 for FieldDetailsChangedEvent

ReadModelEventSubscriber (12) writing checkpoint 2599714 for FieldDetailsChangedEvent

ReadModelEventSubscriber (13) writing checkpoint 2599714 for FieldDetailsChangedEvent

I’m trying to get verbose client logging to go along with this, but presumably I’m doing something wrong here. Despite running settingsBuilder.EnableVerboseLogging().UseDebugLogger() before creating the connection, I do not get any extra Debug output beyond what I’m writing with Debug.WriteLine.

Do you have a Debuglogger setup there is also a console logger.

I’m running from within Visual Studio and expecting the output to show up in the Output window under “Debug”. Console logger doesn’t show anything since I’m running as a website under iisexpress.

https://github.com/EventStore/EventStore/blob/release-v3.6.0/src/EventStore.ClientAPI/ConnectionSettingsBuilder.cs#L74
UseDebugLogger() on your connection settings will write to the debug
window.

Debug logger is pretty simple:
https://github.com/EventStore/EventStore/blob/release-v3.6.0/src/EventStore.ClientAPI/Common/Log/DebugLogger.cs

I found similar behaviour last week when testing a CatchupSubscription, where I assumed I had to restart the subscription, : However when I took out any processing from the SubscriptionDropped event, then bounced the database, the event was fired, and when the database came up the subscription continued on. Maybe Greg can confirm what is expected, and whether connection handling or timeouts are relevant here.

Clarified this a bit. There is a timeout, before which bringing back the event store recovers the subscription, however if you go past this then the Drop event is fired and it needs recreating. So, I have not had your issue, sorry.

I was able to build a small sample to demonstrate the issue (attached Program.cs). It’s got a hardcoded position in it for the stream to cut down on the output clutter, but otherwise should demonstrate the issue.

  1. I run this console app that subscribes to All with a catchup subscription.
  2. I wait for a $stats event to come through after live processing has started and I see it only handled once, as expected.
  3. I close my local Event Store application to simulate an outage and immediately see the “Resubscribing…” message.
  4. I start back up Event Store right back up and wait.
  5. Eventually, I see two “live processing started” messages from two different subscribers show up.
  6. Then I see a new $stats menu that appears and is handled twice.
    It’s worth noting that when I try this with a volatile subscription (SubscribeToAllAsync instead of SubscribeToAllFrom), I don’t see the same issue.

Program.cs (1.9 KB)

Nobody will be able to reproduce your behaviour due to:

_connection.SubscribeToAllFrom(new Position(76047864, 76047864) is
specific to your db. I will try getting something else working.

Attached is updated with Position.End.

Program.cs (1.85 KB)

Hopefully I’m not being too spammy here. I tried updating the sample (attached Program.cs) to at least account for and workaround the extra subscription callbacks that keep getting fired by stopping the subscription, however Stop() seems to have no effect. Even if I let it run for a while, the EventAppeared keeps getting fired off multiple times per event. On the plus side, running it as a console app did enable me to get verbose logging output which was failing under IISExpress. I’ve attached the verbose console output here. A find for “2016” quickly brings you through all the lines the app is writing out (as opposed to the verbose logging).

Program.cs (2.91 KB)

debug.log (66.8 KB)

I can see why you would expect that the connection will have been dropped, so I am following this thread. From reading around, this week I have taken the approach of making the connection a persistent connection. There is no longer any timeout, so the connection and catch up subscription happily wait around until the event store comes up again and then resume. This is working well in a windows service, and while the event store is down I am logging the retry messages so I can see what is happening. I found it really helpful to log all the subscription and connection events.

To be more precise, The type of subscription I am using for this is SubscribeToStreamFrom as I preferred to lessen the network traffic by doing a simple projection to filter the events I am interested in, and the persistent connection is set up by connectionSettingsBuilder.KeepReconnecting().KeepRetrying();

Looking at this now

So I looked through the code. What is happening is it is appropriately
dropping the subscription after the retry limits are hit. The code
then makes another subscription. However the first subscription seems
to still be getting events after it dropped which is why you are
receiving two. I will have to look through a bit to figure out why
this part is happening.

Also if you use up all of your connection retries this code will
infinitely loop calling subscribe on a disposed socket.

Object name: 'ES-89a7cf32-e73e-43ec-990b-fe6eea548dba'..
Subscription dropped with reason CatchUpError. Resubscribing...
Subscribing to all events
[11,14:36:58.580,DEBUG] Catch-up Subscription to <all>: starting...
[09,14:36:58.580,DEBUG] Catch-up Subscription to <all>: running...
[09,14:36:58.580,DEBUG] Catch-up Subscription to <all>: pulling events...
[09,14:36:58.580,DEBUG] EventStoreConnection
'ES-89a7cf32-e73e-43ec-990b-fe6eea548dba': enqueueing message
EventStore.ClientAPI.Internal.StartOperationMessage..
[09,14:36:58.580,DEBUG] Catch-up Subscription to <all>: dropping
subscription, reason: CatchUpError System.ObjectDisposedException:
Cannot access a disposed object.
Object name: 'ES-89a7cf32-e73e-43ec-990b-fe6eea548dba'..

Likely in this case we should throw an exception on the subscribe.

This code works as you would expecct.

    public class Program
    {
        static IEventStoreConnection _connection;
        static EventStoreAllCatchUpSubscription _subscription;

        static void Main(string[] args)
        {
            var sb = ConnectionSettings.Create();
            sb.SetDefaultUserCredentials(new UserCredentials("admin",
"changeit")).EnableVerboseLogging().UseConsoleLogger();
            _connection = EventStoreConnection.Create(sb.Build(), new
IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
            _connection.ConnectAsync().Wait();

            Subscribe();
            Console.WriteLine("Running. Press any key to exit...");
            Console.ReadKey(true);
        }

        private static void Subscribe()
        {
            Console.WriteLine("Subscribing to all events");
            _subscription = (EventStoreAllCatchUpSubscription)
_connection.SubscribeToAllFrom(Position.End, false, EventAppeared,
LiveProcessingStarted, SubscriptionDropped);
        }

        private static void
LiveProcessingStarted(EventStoreCatchUpSubscription obj)
        {
            Console.WriteLine($"{DateTime.UtcNow}: Live processing
started on managed thread {Thread.CurrentThread.ManagedThreadId}");
        }

        private static void
EventAppeared(EventStoreCatchUpSubscription subscription,
ResolvedEvent resolvedEvent)
        {
            Console.WriteLine($"{DateTime.UtcNow}: Received event
{resolvedEvent.OriginalPosition}
({resolvedEvent.OriginalEvent.EventType}) on managed thread
{Thread.CurrentThread.ManagedThreadId}");
        }

        private static void
SubscriptionDropped(EventStoreCatchUpSubscription subscription,
SubscriptionDropReason reason, Exception ex)
        {
            Console.WriteLine($"Subscription dropped with reason
{reason}. Resubscribing...");
            _subscription.Stop();
            Subscribe();
        }
    }

If you use the overload to pass a timeout to the _subscription.Stop(); call then this code always times out.

It seems strange to me that the subscription still seems to have a state of dropped=1 even after its reconnected…