Subscription doesn't receive events.

Hello,

I have found an issue with CatchUp subscriptions. When I subscribe for events just after EventStore server is started sometimes new events are not delivered to that subscription.

Subscription is not dropped, just waits infinitely for events. That was tested on in-memory EventStore using following program:

class Program
{
static void Main(string[] args)
{
for (int i = 1; i <= 10; ++i)
Test(i);
for (int i = 1; i < 30; ++i)
Test(10 + 5 * i);

        Console.ReadLine();
    }

    private static ManualResetEvent connected = new ManualResetEvent(false);
    private static void Test(int delay)
    {
        connected.Reset();
        var old = Process.GetProcessesByName("EventStore.ClusterNode");
        foreach (var p in old)
            p.Kill();
        Thread.Sleep(1000);

        var path = @"C:\Projects\EventStore";
        ProcessStartInfo info = new ProcessStartInfo();
        info.Arguments = "--mem-db";
        info.FileName = path + @"\EventStore.ClusterNode.exe";
        Process.Start(info);
        ConnectionSettings cs = ConnectionSettings.Create().
                    KeepReconnecting().
                    KeepRetrying().
                    SetHeartbeatInterval(TimeSpan.FromMinutes(10)).
                    SetHeartbeatTimeout(TimeSpan.FromMinutes(11)).
                    WithConnectionTimeoutOf(TimeSpan.FromMinutes(10)).
                    SetOperationTimeoutTo(TimeSpan.FromMinutes(10)).Build();
        IPEndPoint ip = new IPEndPoint(new IPAddress(new byte[] { 127, 0, 0, 1 }), 1113);
        var connection = EventStoreConnection.Create(cs, ip);
        connection.Connected += connection_Connected;
        connection.ConnectAsync().Wait();
        connected.WaitOne();
       
        Thread.Sleep(delay);

        var received = false;
        var dropped = false;
        EventStoreAllCatchUpSubscription subscription = null;

        for (int i = 1; i < 100; ++i)
        {
            if (subscription == null)
            {
                subscription = connection.SubscribeToAllFrom(Position.Start, false,
                    (s, e) =>
                    {
                        received = true;
                    },
                    null,
                    (arg1, arg2, arg3) =>
                    {
                        dropped = true;
                        subscription = null;
                    },
                    new UserCredentials("admin", "changeit"));
            }
            Thread.Sleep(50);                
        }

        if (received)
            Console.Write("Events received, ");
        else
            Console.Write("*NO* events received, ");

        if (dropped)
            Console.Write("dropped, ");
        else
            Console.Write("not dropped, ");
        Console.WriteLine("delay " + delay);
    }

    static void connection_Connected(object sender, ClientConnectionEventArgs e)
    {
        connected.Set();
    }
}

``

Here is an example output showing when errors appears:

Problem usually occured with subscription with inital delay around 40-100ms. For subscription established after about 2 seconds (when EventStore has completed initailization, based on its output) that error never happened. The reason why EventStore is started manually is that this scenario is used in unit tests.

We have found workaround for that - subscription is tested if it received any events (including system one). If no events appeared in ~200ms, subscription is closed and established again. Is there an other way to make sure, that subscription is established correctly and it will return events ?

Cheers,

Roman.

This code is pretty interesting. What are you trying to do here exactly?

for (int i = 1; i < 100; ++i)
        {
            if (subscription == null)
            {
                subscription = connection.SubscribeToAllFrom(Position.Start, false,
                    (s, e) =>
                    {
                        received = true;
                    },
                    null,
                    (arg1, arg2, arg3) =>
                    {
                        dropped = true;
                        subscription = null;
                    },
                    new UserCredentials("admin", "changeit"));
            }
            Thread.Sleep(50);                
        }

If I wasn’t clear what I meant by “interesting” is that your code is highly dependent on assumptions of when we call back synchronously vs asynchronously and the speed at which we may call you.

eg:

       for (int i = 1; i < 100; ++i)
        {
            if (subscription == null)
            {
                subscription = connection.SubscribeToAllFrom(Position.Start, false,
                    (s, e) =>
                    {
                        received = true;
                    },
                    null,
                    (arg1, arg2, arg3) =>
                    {
                        dropped = true;
                        subscription = null;
                    },
                    new UserCredentials("admin", "changeit"));
            }
            Thread.Sleep(50);                
        }

Subscription will be null the second time through if we:

a) call back synchronously

b) call back asynchronously within 50ms

I would be interested in seeing what “i” is in your test output

count = 2Events received, dropped, delay 1

count = 2Events received, dropped, delay 2

count = 3Events received, dropped, delay 3

count = 2Events received, dropped, delay 4

count = 2Events received, dropped, delay 5

count = 1Events received, not dropped, delay 6

count = 2Events received, dropped, delay 7

count = 3Events received, dropped, delay 8

count = 1Events received, not dropped, delay 9

count = 1Events received, not dropped, delay 10

count = 1Events received, not dropped, delay 15

count = 2Events received, dropped, delay 20

count = 3Events received, dropped, delay 25

count = 1Events received, not dropped, delay 30

count = 3Events received, dropped, delay 35

count = 3Events received, dropped, delay 40

count = 2Events received, dropped, delay 45

count = 2Events received, dropped, delay 50

count = 2Events received, dropped, delay 55

count = 2Events received, dropped, delay 60

count = 1Events received, not dropped, delay 65

count = 1Events received, not dropped, delay 70

count = 1Events received, not dropped, delay 75

count = 1Events received, not dropped, delay 80

count = 2Events received, dropped, delay 85

count = 2Events received, dropped, delay 90

count = 3Events received, dropped, delay 95

count = 2Events received, dropped, delay 100

count = 2Events received, dropped, delay 105

count = 2Events received, dropped, delay 110

count = 1Events received, not dropped, delay 115

count = 2Events received, dropped, delay 120

count = 2Events received, dropped, delay 125

On a side note (though I think its more likely to be the answer) if you have a simple connection problem it would cause a “bad subscription” in your test

                    KeepReconnecting().
                    KeepRetrying().
                    SetHeartbeatInterval(TimeSpan.FromMinutes(10)).
                    SetHeartbeatTimeout(TimeSpan.FromMinutes(11)).
                    WithConnectionTimeoutOf(TimeSpan.FromMinutes(10)).

The one I would be worried most about is: https://github.com/EventStore/EventStore/blob/dev/src/EventStore.ClientAPI/ConnectionSettingsBuilder.cs#L302 but all of them can cause such an issue. You are testing to see if you get events or an error within 5 seconds and have timeouts measured in minutes. If we hit any of those timeouts you will deem the subscription to be “broken” when its just doing what you told it to.

Try turning on client logging in your test I would love to see one of them when it failed (I can’t get a failure here), my guess is we will see the above.

Cheers,

Greg

Hi,

I modified a little my example program, here is the code:

    private class Logger : ILogger

    {

        private static StreamWriter file = File.CreateText("clientLog.txt");

        public Logger()

        {

            file.AutoFlush = true;

        }

        private string ExMessage(Exception ex)

        {

            if (ex == null) return "";

            return "EXCEPTION: " + ex.Message;

        }

        private string FormatMessage(string format, params object[] args)

        {

            var result = "[" + DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss.fff") + "] ";

            if (args != null && args.Length > 0)

                return result + String.Format(format, args);

            return result + format;

        }

        public void Debug(Exception ex, string format, params object[] args)

        {

            file.WriteLine(FormatMessage(format, args) + ExMessage(ex));

        }

        public void Debug(string format, params object[] args)

        {

            file.WriteLine(FormatMessage(format, args));

        }

        public void Error(Exception ex, string format, params object[] args)

        {

            file.WriteLine(FormatMessage(format, args) + ExMessage(ex));

        }

        public void Error(string format, params object[] args)

        {

            file.WriteLine(FormatMessage(format, args));

        }

        public void Info(Exception ex, string format, params object[] args)

        {

            file.WriteLine(FormatMessage(format, args) + ExMessage(ex));

        }

        public void Info(string format, params object[] args)

        {

            file.WriteLine(FormatMessage(format, args));

        }

        public void Flush()

        {

            file.Flush();

        }

    }

static void Main(string[] args)
{
var rnd = new Random(DateTime.Now.Millisecond);

        var result = false;
        while (!result)
        {
            var delay = rnd.Next(1, 150);
            Console.WriteLine("Test started, delay "+delay.ToString());
            result = TestFailed(delay);
        }
        Console.WriteLine("Test Failed!");
        Console.ReadLine();
    }

    private static ManualResetEvent connected = new ManualResetEvent(false);

    private static bool TestFailed(int delay)
    {
        connected.Reset();
        var old = Process.GetProcessesByName("EventStore.ClusterNode");
        foreach (var p in old)
            p.Kill();
        Thread.Sleep(1000);

        var path = @"C:\Projects\BRuNET\EventStore";
        ProcessStartInfo info = new ProcessStartInfo();
        info.Arguments = "--mem-db";
        info.FileName = path + @"\EventStore.ClusterNode.exe";
        Process.Start(info);
        var logger = new Logger();
        logger.Info("=======================================", new String[0]);
        ConnectionSettings cs = ConnectionSettings.Create().EnableVerboseLogging().UseCustomLogger(logger).
                    KeepReconnecting().
                    KeepRetrying().
                    SetHeartbeatInterval(TimeSpan.FromSeconds(250)).
                    SetHeartbeatTimeout(TimeSpan.FromSeconds(250)).
                    WithConnectionTimeoutOf(TimeSpan.FromSeconds(250)).
                    SetOperationTimeoutTo(TimeSpan.FromSeconds(250)).Build();
        IPEndPoint ip = new IPEndPoint(new IPAddress(new byte[] { 127, 0, 0, 1 }), 1113);
        var received = false;
        var dropped = false;
        var connection = EventStoreConnection.Create(cs, ip);
        connection.Connected += connection_Connected;
        connection.ConnectAsync().Wait();
        connected.WaitOne();

        Thread.Sleep(delay);

        EventStoreAllCatchUpSubscription subscription = null;
        subscription =
            connection.SubscribeToAllFrom(Position.Start, false,
                    (s, e) =>
                    {
                        received = true;
                    },
                    null,
                    (arg1, arg2, arg3) =>
                    {
                        dropped = true;
                        subscription = null;
                    },
                    new UserCredentials("admin", "changeit"));

        for (var i = 0; i < 100; i++)
        {
            Thread.SpinWait(50);
            Thread.Sleep(50);
        }
        var result = (!received && !dropped);
        if (subscription != null)
            subscription.Stop();
        connection.Close();
        logger.Flush();
        return result;
    }

    static void connection_Connected(object sender, ClientConnectionEventArgs e)
    {
        connected.Set();
    }

``

I can’t repeat error with short heartbeat intervals (around 200ms). With interval 250 s, error is reproduced. I have captured log of one failed attempt, see attached log file. Failed attempt is at the end, after last separator line with ‘======’.

BTW. While I was capturing logs i found situation when logger object was called with string format contains parameters but parameter array was empty. One of such message was:Catch-up Subscription to {0}: hooking to connection.Connected. Those messages are included in log file so you can check them there.

Cheers,

Roman

clientLog.txt (284 KB)

"I can't repeat error with short heartbeat intervals (around 200ms).
With interval 250 s, error is reproduced. I have captured log of one
failed attempt, see attached log file. Failed attempt is at the end,
after last separator line with '======'."

So you are hitting a case where your connection is in a dead state and
you are waiting on timeouts to occur but have made them very long. How
is this a bug?

OK, It seems that it is the case.

Connection is broken, but we don’t know about it yet, so subscription isn’t dropped yet.

We extended timeouts for debug purposes.

Thank you for your time.

Now we know how it works exactly.

Regards

Roman

W dniu wtorek, 31 marca 2015 11:41:45 UTC+2 użytkownik Greg Young napisał: