EventStore + MSTests deadlocking?

So, this is a question related to how to run the MSTests with EventStore, without running into deadlock.

Background:

A single command handler receives commands from external source

  • It appends to command stream

  • It has a catchup subscription to the same command stream

where it listens for ack/nack messages. It sets up 2 tasks - one with timeout x seconds, and one with resetevent.WaitOne(), that is set when the ack/nack is received from subscription - then waits for any and returns ack/nack to the external source.

1-n other processors each has a catchup subscription to command stream.

  • It reads next command, if among commands valid for processor: handles it in an idempotent way (mostly resulting in appending events to a stream).

  • When handled, it appends an ack/nack message to command stream.

ack/nack has deterministic guid derived from the guid of the command it handled.

Additionally, the processors might have on catchup subscription each to the $all stream, as to let events cascade through the system.

Question:

Has anyone had deadlock troubles with testing EventStore in MSTests?

Details:

Okay. when testing this in MSTests I have a hard time getting it to work. It’s about the threading and async/await pattern, but I was hoping someone could help me understand how this should be done with EventStore, because I’m obviously doing something wrong.

This is the test code:

UserProcessor _processor;
List _evts = new List();
TestUtils _util;
Writer _writer;

    public UserTests()
    {
        _util = new TestUtils(_evts);

        _processor = Setup.Get<UserProcessor>();
        _processor.EnqueueMsg += _util._processor_EnqueueMsg;

        _writer = Setup.Get<Writer>();
    }

    [TestMethod]
    public async Task AcknowledgesCmd()
    {
        await _processor.Start();

        var userId = Guid.NewGuid();
        var cmd = new CreateUser(userId, "MrUser");
        var msg = MsgEnvelope.For(cmd, Guid.NewGuid(), Guid.NewGuid());
        var headers = msg.GetHeaders();
        var data = GetFromSingle((Cmd)msg.GetBody(), headers);

        var success = await _writer.Append(data, Streams.Cmds); // Writing to stream is no problem.

        _util.WaitFor<AckCmd>(c => c.HandledId == cmd.Id);
    }

``

Obviously, there’s a lot more code to this, where things can go awry…

I’ve had problems with deadlocks using .Result on Task-methods. So I’ve changed it all to await all the way up to the test method. Still though,

when getting down to the last async method, one on the ES connection, it’s a deadlock.

I’ll start from above and down to where it locks: (my synchronisation management with the resetevents might be a smell, feel free to suggest better solution)

_processor.Start() calls this:

    public async Task Start()
    {
        Task.Run(() => RunAsync()); // RunAsync() is blocking
        await _cmdService.Start(); // starts subscription to cmd stream
    }

``

private void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent)
{
try
{
if (!_isSubscribed)
subscription.Stop();

            if (resolvedEvent.OriginalStreamId.Contains("$")) return;

            MsgEnvelope evt;
            var couldParse = StoredMsgToEnvelope.TryParse<IMessage>(resolvedEvent, out evt);

            if (!couldParse) return;

            EnqueueMsg(this, new EnqueueInfo(evt));

// SNIP

``

EnqueueMsg leads to this:

  void _cmds_ReceiveCmd(object sender, EnqueueInfo e)
    {
        var msg = (MsgEnvelope)e.Msg;

        if (!_handler.ValidCommands.Any(t => t.AssemblyQualifiedName == msg.ClrType)) return;
        _cmdJobs.Enqueue(msg);

        TriggerQueue();
    }

    void TriggerQueue()
    {
        if (_inProcess || _stopRequested) return;
        _auto.Set();
        _manual.Reset();
    }

    public async Task RunAsync() // was started in Start() along with subscription, described above.
    {
        while (!_stopRequested)
        {
            _manual.WaitOne();
            _auto.WaitOne();
            _inProcess = true;
            try
            {
                MsgEnvelope nextMsg;
                while (_cmdJobs.TryPeek(out nextMsg))
                {
                    Cmd cmd = (Cmd)nextMsg.GetBody();
                    Guid userId = ((dynamic)cmd).UserId;

                    bool couldSave = false;
                    int count = 0;
                    do
                    {
                        var res = await _handler.Handle(nextMsg); // comes to this point
                        couldSave = await _cmdService.Acknowledge(cmd, res);
                   
                    } while (!couldSave && count < 10);

                    _cmdJobs.TryDequeue(out nextMsg);
                }
            }
            catch (Exception ex)
            {
                _logger.Error(ex, "{0}", "Error in UserManager when handling cmd.");
            }
            _inProcess = false;
            _manual.Set();
        }
    }

``

the _handler.Handle-method has this call:

var result = await GetFrom(cmd);

``

which needs to retrieve aggregate from repository before applying cmd, so it does this:

x = await _repository.GetById(cmd.UserId);

``

which comes to this:

public async Task GetById(Guid id, int version) where TEventSourced : class, IAggregate
{
if (version <= 0)
throw new InvalidOperationException(“Cannot get version <= 0”);
var streamName = _eventSourcedIdToStreamName(typeof(TEventSourced), id);
var eventSourced = ConstructAggregate();
var sliceStart = 0;
StreamEventsSlice currentSlice;
do
{
var sliceCount = sliceStart + ReadPageSize <= version
? ReadPageSize
: version - sliceStart + 1;
currentSlice =
await _connection.ReadStreamEventsForwardAsync(streamName, sliceStart, sliceCount, false); // DEADLOCK
// SNIP

``

where it deadlocks…

It doesn’t deadlock when I debug the project for itself, it’s just from the MSTest. So…Is this something anyone has experienced? How do you get around it?

Okay, I have found, maybe not the error, but at least a solution to the situation:

In the RunAsync() method, changing from this

var res = await _handler.Handle(nextMsg);
couldSave = await _cmdService.Acknowledge(cmd, res);

``

to this

var res = _handler.Handle(nextMsg).Result;
couldSave = Acknowledge(cmd, res).Result;

``

made the sudden heartbeat timeout stop. Awaiting any of those would lead to inevitable nonrecoverable drop.

Awaiting a task within a task has been mentioned in many places, also here in this group by João B, to be redundant.

Now, it would be interesting to know exactly why this caused the subscription to drop and not reconnect. Some sort of deadlock, but how so?

Anyhow, that’s that. Maybe saves some time for the guy to trip on it.

This was posted in the wrong thread (I had another one here with almost same question).
It wasn’t the solution to deadlocks in MSTests. It was though to the deadlocks when debugging.

In MSTests, it will deadlock regardless of await / Result.

So, question withstands: Has anyone else experienced this, and do you have workarounds?

Have you tried await someTask.ConfigureAwait(false) ?

Yup, have tried that and it didn’t work.

Did you ever get further?

I’m noticing some odd behavior dealing with await/async in a number of areas.

I’m working up a decent repro test program… Might have it done tomorrow…