I recently was working on a class to manage Eventstore connections. This code was just a proof of concept and was never deployed to any environment. I was not confident about the concurrency/timing of connections.
Concepts
- Should act like a static connection, reusing the same connection as long as it is good
- There is a list of connections that are good
- When connection is closed, it should be removed from the collection. It is added to a queue and a background worker works on removing them from the list. this is the part that is more complex than it probably needs to be. I wanted to off load the work of removing collections of the main processing thread. Good? Bad? ¯_(ツ)_/¯
- When any condition that would make the connection unusable, it should be removed from the pool of valid connections. Right now, it looks like I only subscribed to the Closed event. It should listen to the other events too.
- There is some metrics code collection that I cannot include, This code used to collect metrics on time required to open, number of open connections, times connections are closed etc.
- There is another interface implementation that is not provided, IEventStoreConnectionFactory. This is used to create the event store connection anyway the user wants.
- Client code should ask for a connection and use it for the shortest possible time frame as to avoid having the connection closed/disposed and obtaining an exception
- Client code still needs to implement retry logic when the connection is closed/disposed because the connection could be closed/disposed right after getting the connection.
I would be interested if anyone else has an implementation to share or if anyone wants to collaborate on creating a re-usable set of classes anyone could use in their project.
public interface IEventStoreConnectionFactory
{
IEventStoreConnection Create();
}
public interface IEventStoreConnectionManager
{
Task GetConnectionAsync();
}
internal class EventStoreConnectionManager : IEventStoreConnectionManager, IDisposable
{
private readonly object _syncRoot = new object();
private readonly IEventStoreConnectionFactory _connectionFactory;
private List _openConnections;
private ConcurrentQueue _closedConnections;
///
/// Initializes a new instance of the class.
///
/// The connection factory.
///
/// is null.
///
public EventStoreConnectionManager(IEventStoreConnectionFactory connectionFactory)
{
if (connectionFactory == null) throw new ArgumentNullException(“connectionFactory”);
_connectionFactory = connectionFactory;
_openConnections = new List();
_closedConnections = new ConcurrentQueue();
}
public async Task GetConnectionAsync()
{
IEventStoreConnection connection = Get();
if (connection != null)
{
return connection;
}
connection = _connectionFactory.Create();
connection.Closed += ConnectionClosed;
using (EventStoreMetrics.OnConnectToEventStore())
{
await connection.ConnectAsync();
EventStoreMetrics.OnConnectionOpened();
}
SaveConnection(connection);
return connection;
}
private void ConnectionClosed(object sender, ClientClosedEventArgs e)
{
EventStoreMetrics.OnConnectionClosed();
lock (_syncRoot)
{
// the connection could be closed before it is added to the collection
if (_openConnections.Remove(e.Connection))
{
return;
}
}
// save the closed connection for later processing
_closedConnections.Enqueue(e.Connection);
if (Interlocked.CompareExchange(ref _isRemoveProcessingRunning, 1, 0) == 0)
{
ThreadPool.QueueUserWorkItem(RemoveClosedConnections);
}
}
private void SaveConnection(IEventStoreConnection connection)
{
lock (_syncRoot)
{
_openConnections.Add(connection);
}
}
private int _isRemoveProcessingRunning;
private void RemoveClosedConnections(object state)
{
do
{
IEventStoreConnection connection;
while (_closedConnections.TryDequeue(out connection))
{
lock (_syncRoot)
{
_openConnections.Remove(connection);
}
}
Interlocked.Exchange(ref _isRemoveProcessingRunning, 0);
} while (0 < _closedConnections.Count && Interlocked.CompareExchange(ref _isRemoveProcessingRunning, 1, 0) == 0);
}
private IEventStoreConnection Get()
{
lock (_syncRoot)
{
if (_openConnections.Count == 0)
{
return null;
}
return _openConnections[0];
}
}
public void Dispose()
{
lock (_syncRoot)
{
var connections = _openConnections.ToArray();
_openConnections.Clear();
foreach (var connection in connections)
{
try
{
connection.Close();
connection.Dispose();
}
catch
{
}
}
}
}
}