Hi all,
I have a very naive question. I am used to handling event from message bus like Rabbit MQ. With a bus, I usually do the following :
try
f myEvent
bus.ack()
catch Exception
log Exception
This way the event is still present in the subscription to be consumed later on. I do not see any ack , and I fail to understand what should be the correct way to handle this business use case :
I have two subscriptions :
Subscription 1 onMyEvt1
Subscription 2 onMyEvt2
on which I react to create commands for an aggregate. In case both events arrive at the same time, I intended to rely on idempotency to make one fail. I would like the failed one to retry consuming the event, but it is not clear how to do so.
Thanks for your help,
Here is an example of my code :
let subscribeReactor<'event> (log:Logging.MycorrhizaLogger) (client:EventStoreClient) who streamName (f:StreamRevision -> EventStore.Metadata ->'event-> SubscriptionAction) =
let onEvent = fun (streamSubscription:StreamSubscription) (resolvedEvent:ResolvedEvent) (cancellationToken:CancellationToken) ->
async {
log.info <| sprintf "%s reading action to %s" who streamName
let messageMetadata = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata.ToArray())
let metadata = JsonConvert.DeserializeObject<EventStore.Metadata>(messageMetadata, FableJsonConverter())
let message = Encoding.UTF8.GetString(resolvedEvent.Event.Data.ToArray())
let event = JsonConvert.DeserializeObject<'event>(message, FableJsonConverter())
let revision = EventStore.Client.StreamRevision(resolvedEvent.OriginalEvent.EventNumber.ToUInt64() )
f revision metadata event |> ignore
}
|> Async.startAsPlainTask
let streamNameCe = sprintf "$ce-%s" streamName
log.info <| sprintf "%s subscribing sync action to %s" who streamName
client.SubscribeToStreamAsync(streamNameCe, onEvent,true)