So, this is a question related to how to run the MSTests with EventStore, without running into deadlock.
Background:
A single command handler receives commands from external source
-
It appends to command stream
-
It has a catchup subscription to the same command stream
where it listens for ack/nack messages. It sets up 2 tasks - one with timeout x seconds, and one with resetevent.WaitOne(), that is set when the ack/nack is received from subscription - then waits for any and returns ack/nack to the external source.
1-n other processors each has a catchup subscription to command stream.
-
It reads next command, if among commands valid for processor: handles it in an idempotent way (mostly resulting in appending events to a stream).
-
When handled, it appends an ack/nack message to command stream.
ack/nack has deterministic guid derived from the guid of the command it handled.
Additionally, the processors might have on catchup subscription each to the $all stream, as to let events cascade through the system.
Question:
Has anyone had deadlock troubles with testing EventStore in MSTests?
Details:
Okay. when testing this in MSTests I have a hard time getting it to work. It’s about the threading and async/await pattern, but I was hoping someone could help me understand how this should be done with EventStore, because I’m obviously doing something wrong.
This is the test code:
UserProcessor _processor;
List _evts = new List();
TestUtils _util;
Writer _writer;
public UserTests()
{
_util = new TestUtils(_evts);
_processor = Setup.Get<UserProcessor>();
_processor.EnqueueMsg += _util._processor_EnqueueMsg;
_writer = Setup.Get<Writer>();
}
[TestMethod]
public async Task AcknowledgesCmd()
{
await _processor.Start();
var userId = Guid.NewGuid();
var cmd = new CreateUser(userId, "MrUser");
var msg = MsgEnvelope.For(cmd, Guid.NewGuid(), Guid.NewGuid());
var headers = msg.GetHeaders();
var data = GetFromSingle((Cmd)msg.GetBody(), headers);
var success = await _writer.Append(data, Streams.Cmds); // Writing to stream is no problem.
_util.WaitFor<AckCmd>(c => c.HandledId == cmd.Id);
}
``
Obviously, there’s a lot more code to this, where things can go awry…
I’ve had problems with deadlocks using .Result on Task-methods. So I’ve changed it all to await all the way up to the test method. Still though,
when getting down to the last async method, one on the ES connection, it’s a deadlock.
I’ll start from above and down to where it locks: (my synchronisation management with the resetevents might be a smell, feel free to suggest better solution)
_processor.Start() calls this:
public async Task Start()
{
Task.Run(() => RunAsync()); // RunAsync() is blocking
await _cmdService.Start(); // starts subscription to cmd stream
}
``
private void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent)
{
try
{
if (!_isSubscribed)
subscription.Stop();
if (resolvedEvent.OriginalStreamId.Contains("$")) return;
MsgEnvelope evt;
var couldParse = StoredMsgToEnvelope.TryParse<IMessage>(resolvedEvent, out evt);
if (!couldParse) return;
EnqueueMsg(this, new EnqueueInfo(evt));
// SNIP
``
EnqueueMsg leads to this:
void _cmds_ReceiveCmd(object sender, EnqueueInfo e)
{
var msg = (MsgEnvelope)e.Msg;
if (!_handler.ValidCommands.Any(t => t.AssemblyQualifiedName == msg.ClrType)) return;
_cmdJobs.Enqueue(msg);
TriggerQueue();
}
void TriggerQueue()
{
if (_inProcess || _stopRequested) return;
_auto.Set();
_manual.Reset();
}
public async Task RunAsync() // was started in Start() along with subscription, described above.
{
while (!_stopRequested)
{
_manual.WaitOne();
_auto.WaitOne();
_inProcess = true;
try
{
MsgEnvelope nextMsg;
while (_cmdJobs.TryPeek(out nextMsg))
{
Cmd cmd = (Cmd)nextMsg.GetBody();
Guid userId = ((dynamic)cmd).UserId;
bool couldSave = false;
int count = 0;
do
{
var res = await _handler.Handle(nextMsg); // comes to this point
couldSave = await _cmdService.Acknowledge(cmd, res);
} while (!couldSave && count < 10);
_cmdJobs.TryDequeue(out nextMsg);
}
}
catch (Exception ex)
{
_logger.Error(ex, "{0}", "Error in UserManager when handling cmd.");
}
_inProcess = false;
_manual.Set();
}
}
``
the _handler.Handle-method has this call:
var result = await GetFrom(cmd);
``
which needs to retrieve aggregate from repository before applying cmd, so it does this:
x = await _repository.GetById(cmd.UserId);
``
which comes to this:
public async Task GetById(Guid id, int version) where TEventSourced : class, IAggregate
{
if (version <= 0)
throw new InvalidOperationException(“Cannot get version <= 0”);
var streamName = _eventSourcedIdToStreamName(typeof(TEventSourced), id);
var eventSourced = ConstructAggregate();
var sliceStart = 0;
StreamEventsSlice currentSlice;
do
{
var sliceCount = sliceStart + ReadPageSize <= version
? ReadPageSize
: version - sliceStart + 1;
currentSlice =
await _connection.ReadStreamEventsForwardAsync(streamName, sliceStart, sliceCount, false); // DEADLOCK
// SNIP
``
where it deadlocks…
It doesn’t deadlock when I debug the project for itself, it’s just from the MSTest. So…Is this something anyone has experienced? How do you get around it?