I have added code to use an async transaction when writing more than 500 events to a stream. However, the code is failing at:
var transaction = await Connection.StartTransactionAsync(streamName, expectedVersion)
``
The error message I get is:
Message = “Thread was being aborted.”
" at EventStore.ClientAPI.Transport.Tcp.ProtobufExtensions.Deserialize[T](ArraySegment
1 data)\r\n at EventStore.ClientAPI.ClientOperations.OperationBase
2.InspectPackage(TcpPackage package)\r\n at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()\r\n at ServiceStack.EventStore.Repository.EventStoreRepository.d__12.MoveNext() in C:\Development\ServiceStack.EventStore\ServiceStack.EventStore\Repository\EventStoreRepository.cs:line 88"
Any ideas?
My code is as follows:
public async Task Save(Aggregate aggregate)
{
var headers = new Dictionary<string, object>
{
{AggregateClrTypeHeader, aggregate.GetType().Name}
};
var streamName = getStreamName(aggregate.GetType(), aggregate.Id);
var newEvents = aggregate.Changes.ToList();
var originalVersion = aggregate.State.Version - newEvents.Count;
var expectedVersion = originalVersion == InitialVersion
? ExpectedVersion.NoStream
: originalVersion - 1;
var eventsToSave = newEvents.Select(@event => ToEventData(@event, headers)).ToList();
if (eventsToSave.Count < WritePageSize)
{
try
{
await Connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave);
}
catch (Exception e)
{
log.Error(e);
}
}
else
{
try
{
using (var transaction = await Connection.StartTransactionAsync(streamName, expectedVersion))
{
var position = 0;
while (position < eventsToSave.Count)
{
var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
try
{
await transaction.WriteAsync(pageEvents);
}
catch (Exception e)
{
log.Error(e);
}
position += WritePageSize;
}
await transaction.CommitAsync();
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
aggregate.ClearCommittedEvents();
}
``