Greetings All,
I’d like to store my commands in EventStore in addition to my events and I’m curious how others are handling this. Does it make more sense to store all commands in a single stream or to have multiple streams partitioned by, say, command handler?
I’m also curious how others are dealing with recovering and executing commands that are “lost” due to some sort of infrastructure problem/bug (someone for got to register the command handler) that would cause the command to not get executed. How do you handle resending the command when the system comes back online? I think that this is more of an issue when the command is issued not by the action of a user, but by a saga or workflow manager. For instance, I’m storing my sagas in and retrieving them from EventStore using the code below. Just after I save the saga, I dispatch the saga’s undispatched commands over my “commandBus”, which you can just pretend is an in memory dictionary. What I’m worried about is the case where the sending/processing of a command is interrupted for some unforeseen reason. Should the saga keep track of command execution acknowledgements? If so, how does one handle waking up the saga to periodically check its state and resend the command if it is never reloaded due other incoming messages?
public interface ISaga : IEntity
{
int LoadedVersion { get; }
int CurrentVersion { get; }
IEnumerable<IEvent> GetUncommittedEvents();
void ClearUncommittedEvents();
void LoadEvent(IEvent domainEvent);
void ApplyEvent(IEvent domainEvent);
IEnumerable<ICommand> GetUndispatchedCommands();
void ClearUndispatchedCommands();
}
public async Task SaveAsync(ISaga saga) {
var streamName = this.sagaIdToStreamName(saga.GetType(), saga.Id);
var eventsToSave =
saga.GetUncommittedEvents()
.Select(e => this.eventSerializer.ToEventData(Guid.NewGuid(), e))
.ToList();
var expectedVerion = saga.LoadedVersion == AggregateRoot.InitialVersion
? ExpectedVersion.NoStream
: saga.LoadedVersion;
if (eventsToSave.Count < WritePageSize) {
await
this.eventStoreConnection.AppendToStreamAsync(streamName, expectedVerion, eventsToSave)
.ConfigureAwait(false);
}
else {
var transaction =
await
this.eventStoreConnection.StartTransactionAsync(streamName, expectedVerion)
.ConfigureAwait(false);
var position = 0;
while (position < eventsToSave.Count) {
var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
await transaction.WriteAsync(pageEvents).ConfigureAwait(false);
position += WritePageSize;
}
await transaction.CommitAsync().ConfigureAwait(false);
}
//TODO: Do we need some way to handle command sends failing for non-domain reasons?
foreach (var command in saga.GetUndispatchedCommands()) {
await this.commandBus.SendAsync(command);
}
saga.ClearUncommittedEvents();
saga.ClearUndispatchedCommands();
}
public async Task GetByIdAsync(Guid id, int atVersion) where TSaga : ISaga {
if (atVersion <= 0)
throw new InvalidOperationException(“Cannot get version <= 0”);
var streamName = this.sagaIdToStreamName(typeof (TSaga), id);
var saga = (TSaga) Activator.CreateInstance(typeof (TSaga), id);
var sliceStart = 0;
StreamEventsSlice currentSlice;
do {
var sliceCount = sliceStart + ReadPageSize <= atVersion
? ReadPageSize
: atVersion - sliceStart + 1;
currentSlice =
await this.eventStoreConnection.ReadStreamEventsForwardAsync(streamName, sliceStart, sliceCount,
false).ConfigureAwait(false);
if (currentSlice.Status == SliceReadStatus.StreamNotFound)
throw new SagaNotFoundException(id, typeof (TSaga));
if (currentSlice.Status == SliceReadStatus.StreamDeleted)
throw new SagaDeletedException(id, typeof (TSaga));
sliceStart = currentSlice.NextEventNumber;
foreach (var evnt in currentSlice.Events) {
saga.LoadEvent(this.eventSerializer.ToEvent(evnt.OriginalEvent));
}
} while (atVersion >= currentSlice.NextEventNumber && !currentSlice.IsEndOfStream);
if (saga.LoadedVersion != atVersion && atVersion < int.MaxValue)
throw new SagaVersionException(id, typeof (TSaga), saga.LoadedVersion, atVersion);
saga.ClearUndispatchedCommands();
return saga;
}