Check if Event exists in stream

Hello

I would like to know how can I check if an event already exists in a specific stream?

thanks

If you try to write the same event to a stream (the event id is the same) then eventstore can block that (idemptotent write). That being said, if your eventstore is restarted, it would not block it, so I think it’s suggested only rely on this for retry scenarios.
If that’s your concern, the eventstore is probably doing enough work for you.
From my experience, having your aggregate keeping a history of the events is a belt and braces approach, so before we even attempt to write an event, we know it was duplicate simply by checking our “historic events” list (which is just a list of all the events we read from the stream)

Hope that helps.

I agree with @steven.blair here on the tech side of thing.

@ekjuanrejon why do you want to check if it already exist ?
Is that event anywhere in the stream?
is it for idempotency or some special requirement or blocking certain actions ?

Hi Ekjuanrejon,

I agree with Steven and Yves on their points.

Some broader notes on the mechanics of an efficient search for any particular event.
The key to efficiency here is largely based in how you set up your data.

Worst case: You only have the event id or some other data in the event and you write all data to a single stream
In this case you’re forced to scan all of the events forward or backwards. This is not recommended and not how ES is designed to be used.

You can make this somewhat better with a filtered read of the $All stream. This will allow you to reduce the number of returned events to evaluate, but will still enumerate every event in the DB.

If you know the full stream name you can search that specific stream reading forward or backwards. Note: unlike other system ES supports fine grained streams and will easily allow you to have 100’s of millions of streams making this kind of search rather efficient.

If you turn on the standard projections and format your stream names “[category]-[instanceId]” then a number of options are available.

If you know on the Category or aggregate-type you can search the “$ce-[category]” stream which will only contain events posted to streams prefixed with that category.

If you have set the “event-type” field you and are running standard projections you can search the generated “$et-[typename]” stream which will contain only events of that type to do a search either forwards or backwards.

The next key question is what kind of search this is. If this is a “cold” or from scratch search where there is no local caching or state then you are limited to the above methods. In most cases there are much better options available.

If this is a running search (i.e. a system waiting to react to an event) then you can subscribe to one of the streams above and check the events as they are pushed to the consumer.

If this is a periodic search then you can scan to current and commit a “Checkpoint” the records the stream position. When the next search run is triggered you can safely use the Checkpoint as a starting point for your search leveraging the append only and guaranteed ordering of the Event Store DB.

The last kind of check is around checking on write to avoid duplicate writes. (This seems likely to be your specific case.?)

If the network or transport is duplicating or resending writes then the built in idempotency will filter them out.

If a pool of competing services might attempt concurrent writes then optimistic concurrency is the best tool to stop that. If each service when starting processing reads the stream end position and on write includes the last position in the stream as the expected version the Event Store will fail one of the duplicate writes with an expected version error.

If you need to block a write based on the history of events in the stream (aka previous state) then the best practice is to simply read the stream history into the service before processing the new input/trigger.

This operation can be very fast when using large number so fine grained streams per the the intended design patterns.

In those case where even the fine grained streams get too long to read efficiently then a “hot aggregate” pattern can be used where the service state is retained in local memory and checks for only new events on the stream before processing.

When the system scales beyond the local application cache being feasible an larger in-memory cache like redis can be implemented.

The next step as things scale is to use persisted storage and write a “Snapshot” (service state + checkpoint) to a durable store. This durable store can be as simple as local disk or S3 or could be a full secondary DB.

Finally if you don’t need current state and you just need to do a quick check then I’d just read the the stream backwards checking for the event as the simplest possible thing.

Thanks,
Chris

Hey guys… thanks for the feedback…

What you mean by event id?

Probably my use of eventstore is incorrect. For the event Id every time I do Guid.NewGuid(). When I append the event to the stream I have another Id. That’s the Id I have and I want to know if an event with that Id exist in the stream

I have a Account stream and I am appending a SaleTransaction to the stream.

I have a Account-42AADAA4-9562-4C11-B033-C9287C5E5515 and I want to check if an Event already exists.

I’m guessing that you have some sort of external system sending you transactions and they might resend something and you don’t want to repost it to the account then?

Or are you worried about duplicates because you’re running multiple account processors at the same time and more than one of them might post the same sale transaction to the account stream.

or both?

@chris.condron

You got it right. I have two microservices… Customers and Merchants.

The first time a Customer Authorizes a payment everything works fine…

Both microservices are subscribed to a stream by Category and all events are published unto Azure Service Bus… My worries is when the CustomerService stream is replayed again… The MerchantService is subscribed to an event from the CustomerService…

That is my worry that when events from CustomerService are replayed.

how are the events distributed to the services :
Through azure SB or EventStore’s subscription ?

In what circumstances & reasons do you need to replay the streams ?

@yves.lorphelin

So the service subscribe to the stream and once the service receives the event it then publishes the event unto the bus.

So I have two process…
Process A: builds the state from the subscribe stream
Process B: Subscribes to stream and publishes unto the bus

Process A can be replayed as many times since it does not publish unto the bus
Process B should be rarely replayed… but say we need to push the events unto analytics platform… It should then use a different azure service bus…

Hi,

Ah, it seems this is more of a messaging pattern question than a query question.

There are various options than can used by themselves or in conjunction to solve the the challenges.

  1. If your services can query the event store directly to catchup to current state rather than having them get events indirectly through the AZ Bus.
  2. Any service that is processing a catch-up request can discard the generated messages rather than posting them on the bus, until after it is caught up.
  3. The services can maintain the last position (aka event number) they have seen on the various streams they are consuming. This will help with both switching back to live mode after catchup, and more importantly when they can discard messages they have seen before.
  4. for “side effects” such as triggering another system Customer Service -> Merchant service you can use messages/commands that are not persisted in the event store and therefore will not be replayed. This approach is more complicated and requires an ongoing monitor to ensure sent messages are handled.
  5. the AZ Bus supports topics, and those topics can be used to isolate catchup/replay events from live events.

I would suggest looking at option 3 first (idempotency via stream position checkpoints) combined with option 1 direct read if possible. Option 4 would likely be overly complicated for your scenario, and option 2 is just general best practice.

Thanks,
Chris