Subscription Ack

Hi all,

I have a very naive question. I am used to handling event from message bus like Rabbit MQ. With a bus, I usually do the following :

try
f myEvent
bus.ack()
catch Exception
log Exception

This way the event is still present in the subscription to be consumed later on. I do not see any ack , and I fail to understand what should be the correct way to handle this business use case :

I have two subscriptions :
Subscription 1 onMyEvt1
Subscription 2 onMyEvt2

on which I react to create commands for an aggregate. In case both events arrive at the same time, I intended to rely on idempotency to make one fail. I would like the failed one to retry consuming the event, but it is not clear how to do so.

Thanks for your help,

Here is an example of my code :

    let subscribeReactor<'event> (log:Logging.MycorrhizaLogger) (client:EventStoreClient) who  streamName (f:StreamRevision -> EventStore.Metadata ->'event-> SubscriptionAction) =

      let onEvent = fun (streamSubscription:StreamSubscription) (resolvedEvent:ResolvedEvent) (cancellationToken:CancellationToken) ->
        async {
            log.info <| sprintf "%s reading action to %s" who streamName
            let messageMetadata = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata.ToArray())
            let metadata = JsonConvert.DeserializeObject<EventStore.Metadata>(messageMetadata, FableJsonConverter())

            let message = Encoding.UTF8.GetString(resolvedEvent.Event.Data.ToArray())
            let event = JsonConvert.DeserializeObject<'event>(message, FableJsonConverter())
            let revision = EventStore.Client.StreamRevision(resolvedEvent.OriginalEvent.EventNumber.ToUInt64() )

            f revision metadata event    |> ignore             
        }   
        |> Async.startAsPlainTask

    let streamNameCe = sprintf "$ce-%s" streamName
    log.info <| sprintf "%s subscribing sync action to %s" who streamName
    client.SubscribeToStreamAsync(streamNameCe, onEvent,true)

Catch-up subscriptions do not maintain any information about consumers on the server. That’s why one of the SubscribeToStreamAsync overloads accepts the stream positions to subscribe from. You need to maintain the position on the client side yourself. I wrote about it here.

If you want to use ESDB like a message broker, you should look at persistent subscription. They allow you to implement competing consumers, just like you’d do with the broker. You can then choose to set AutoAck to true (default), or ack explicitly. You also have two more options - nack and park messages. If you nack a message, it will remain in the stream as an unprocessed message. If you park it, it will be almost the same, but you can later replay parked messages.

Thanks a lot Alexey!