Hi,
I found that when use many thread pool thread synchronously append event to a single connection will cause event store connection disconnect frequently.
The following is the example code:
for( int i=0; i<100; i++ )
{
Task.Run
(
var beginTime = DateTime.Now();
connection.AppendToStreamAsync(Guid.NewGuid.ToString(), ExpectedVersion.Any, new EventData(Guid.NewGuid(), “String”, true, data, null)).Wait();
Console.WriteLine(DateTime.Now().Substract(beginTime).TotalMilliseconds.ToString());
);
}
``
I use console to output the duration of this method call, i found the duration became more and more longer, and at that time, the connection will disconnect randomly.
To keep connection stable, i increased server side heartbeat timeout value, it will be work fine, but the speed of event appending is very slow.
At first, I thought this problem maybe caused by thread pool exhausted, but it’s no use to increase MaxThreads of ThreadPool.
Is there something wrong with threadpool? so i shift code from threadpool thread to normal thread.
The following is the example code:
for( int i=0; i<100; i++ )
{
Task.Run
(
var thread = new Thread
(
()=>
{
var beginTime = DateTime.Now();
connection.AppendToStreamAsync(Guid.NewGuid.ToString(), ExpectedVersion.Any, new EventData(Guid.NewGuid(), “String”, true, data, null)).Wait();
Console.WriteLine(DateTime.Now().Substract(beginTime).TotalMilliseconds.ToString());
}
);
);
}
``
Fortunately,the above code execute very fast and the event store connection never disconnect again.
I don’t think using thread pool thread is the root cause of this problem, maybe something wrong with event store client API.
I download the source code of event store client API and debug it, at last, i found the following code is the root cause of this problem. (The following code is under class EnqueueMessage in project EventStore.ClientAPI).
public void EnqueueMessage(Message message)
{
Ensure.NotNull(message, “message”);
_messageQueue.Enqueue(message);
if (Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(ProcessQueue);
}
private void ProcessQueue(object state)
{
do
{
Message message;
while (_messageQueue.TryDequeue(out message))
{
Action handler;
if (!_handlers.TryGetValue(message.GetType(), out handler))
throw new Exception(string.Format(“No handler registered for message {0}”, message.GetType().Name));
handler(message);
}
Interlocked.Exchange(ref _isProcessing, 0);
} while (_messageQueue.Count > 0 && Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0);
}
``
The above code will delay event message processing when use many threadpool thread synchronously append event, the following graph figure out it.
I changed the code as following, the problem is solved.
public void EnqueueMessage(Message message)
{
Ensure.NotNull(message, “message”);
_messageQueue.Enqueue(message);
if (Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0)
{
var thread = new Thread(ProcessQueue);
thread.IsBackground = true;
thread.Start();
}
}
``