Connection Handling Object Disposed

Environment Windows Server 2012 and Event Store 3.9.2

We have two types of behaviour expected with connections when the event store becomes unavailable

  1. Domain model building on request : Expected to time out if connectivity fails within about 30 seconds (to prevent requests building up after the normal request cancellation timeouts)

  2. Subscription reading : Expected to wait indefinitely until connectivity is available

Subscriptions are easy as we can set the keep retrying, no problem there

The domain connection exhibits interesting behaviour. We do not set it to keep retrying, and so get the default behaviour to retry 10 times.

The problem comes that after the 10 retries the static connection becomes disposed, and anyone else trying to get the connection gets “Object Disposed” messages, and just for fun, a series of further connection attempts getting this message seem to cause the application domain to get stuck, requiring a reset of the IIS Application Pool.

We have coded so that the next user to use the connection, on a relevant error messages (“ClusterException”, “ObjectDisposed” etc) tries to create a new connection in case the event store is available again, and that is avoiding the issue for us.

However, I have a nagging feeling that there might be something we have missed or misunderstood architecturally.

Anyone got input?

I recently was working on a class to manage Eventstore connections. This code was just a proof of concept and was never deployed to any environment. I was not confident about the concurrency/timing of connections.

Concepts

  • Should act like a static connection, reusing the same connection as long as it is good
  • There is a list of connections that are good
  • When connection is closed, it should be removed from the collection. It is added to a queue and a background worker works on removing them from the list. this is the part that is more complex than it probably needs to be. I wanted to off load the work of removing collections of the main processing thread. Good? Bad? ¯_(ツ)_/¯
  • When any condition that would make the connection unusable, it should be removed from the pool of valid connections. Right now, it looks like I only subscribed to the Closed event. It should listen to the other events too.
  • There is some metrics code collection that I cannot include, This code used to collect metrics on time required to open, number of open connections, times connections are closed etc.
  • There is another interface implementation that is not provided, IEventStoreConnectionFactory. This is used to create the event store connection anyway the user wants.
  • Client code should ask for a connection and use it for the shortest possible time frame as to avoid having the connection closed/disposed and obtaining an exception
  • Client code still needs to implement retry logic when the connection is closed/disposed because the connection could be closed/disposed right after getting the connection.
    I would be interested if anyone else has an implementation to share or if anyone wants to collaborate on creating a re-usable set of classes anyone could use in their project.

public interface IEventStoreConnectionFactory

{

IEventStoreConnection Create();

}

public interface IEventStoreConnectionManager

{

Task GetConnectionAsync();

}

internal class EventStoreConnectionManager : IEventStoreConnectionManager, IDisposable

{

private readonly object _syncRoot = new object();

private readonly IEventStoreConnectionFactory _connectionFactory;

private List _openConnections;

private ConcurrentQueue _closedConnections;

///

/// Initializes a new instance of the class.

///

/// The connection factory.

///

/// is null.

///

public EventStoreConnectionManager(IEventStoreConnectionFactory connectionFactory)

{

if (connectionFactory == null) throw new ArgumentNullException(“connectionFactory”);

_connectionFactory = connectionFactory;

_openConnections = new List();

_closedConnections = new ConcurrentQueue();

}

public async Task GetConnectionAsync()

{

IEventStoreConnection connection = Get();

if (connection != null)

{

return connection;

}

connection = _connectionFactory.Create();

connection.Closed += ConnectionClosed;

using (EventStoreMetrics.OnConnectToEventStore())

{

await connection.ConnectAsync();

EventStoreMetrics.OnConnectionOpened();

}

SaveConnection(connection);

return connection;

}

private void ConnectionClosed(object sender, ClientClosedEventArgs e)

{

EventStoreMetrics.OnConnectionClosed();

lock (_syncRoot)

{

// the connection could be closed before it is added to the collection

if (_openConnections.Remove(e.Connection))

{

return;

}

}

// save the closed connection for later processing

_closedConnections.Enqueue(e.Connection);

if (Interlocked.CompareExchange(ref _isRemoveProcessingRunning, 1, 0) == 0)

{

ThreadPool.QueueUserWorkItem(RemoveClosedConnections);

}

}

private void SaveConnection(IEventStoreConnection connection)

{

lock (_syncRoot)

{

_openConnections.Add(connection);

}

}

private int _isRemoveProcessingRunning;

private void RemoveClosedConnections(object state)

{

do

{

IEventStoreConnection connection;

while (_closedConnections.TryDequeue(out connection))

{

lock (_syncRoot)

{

_openConnections.Remove(connection);

}

}

Interlocked.Exchange(ref _isRemoveProcessingRunning, 0);

} while (0 < _closedConnections.Count && Interlocked.CompareExchange(ref _isRemoveProcessingRunning, 1, 0) == 0);

}

private IEventStoreConnection Get()

{

lock (_syncRoot)

{

if (_openConnections.Count == 0)

{

return null;

}

return _openConnections[0];

}

}

public void Dispose()

{

lock (_syncRoot)

{

var connections = _openConnections.ToArray();

_openConnections.Clear();

foreach (var connection in connections)

{

try

{

connection.Close();

connection.Dispose();

}

catch

{

}

}

}

}

}

We did a similar thing without building a full flyweight connection due to DI frameworks disposing things and due to some errors making it disposed. … Note the factory only creates a new connection if disposed ( an error also clears it ) . I use the name to trigger the Disposed Exception. Seems clumsy but best I could do without a connection state.

    public IEventStoreConnection Unwrap()
    {
        try
        {
            if (conn != null && string.IsNullOrEmpty(conn.ConnectionName) == false)
            {
                numOfFails = 0;
                return conn;
            }
        }
        catch (ObjectDisposedException)
        {
            conn = null;
            logger.LogWarning("cluster connection disposed recreating");
        }

this.conn = buildConnection();

return this.conn;

While there is always scope for improvement (https://github.com/EventStore/EventStore/issues/1694), my load testing shows no need to induce complexity of the nature you describe.

I’d hence strongly recommend against this. We do significant load using a single EventStoreConnection without any such pooling. Wrt preventing requests building up, there is a facility in 4.1.1 client https://github.com/EventStore/EventStore/pull/1582 which allows one to manage timeouts a lot better - there are also lots of important client resilience improvements I’d recommend making the investment to get access to if at all possible your side.

Specifically, the behavior where after 10 discovery attempts it fails is addressed by KeepDiscovering (which was added in 4.0 or 4.1, cant recall which), but can be approximated by setting SetMaxDsicoveryAttemtpsTo(int.MaxValue)

Bottom line is to set FailOnNoServerResponse, KeepDiscovering, SetQueueTimeoutTo, SetOperatonTimeoutTo and the max retries.

–Ruben