How do you handle out of order events that need to be processed in order?

[Typically with manual] input can’t always guarantee that the stored stream order will be the order we want to process, especially when recording real world events with significant amounts of latency (use case: daily or weekly ingestion of offline documents which are records of patient procedures). Note we record “actual” and “record” time for all events.

I’ve only imagined creating temporary streams ordered by the internal “actual” event time but this would have to be re-created entirely everytime any new event is seen on the original stream. Is there functionality or ideally some guides/literature on how to use event store with high “record” latency?

One optimisation on the re-creating stream approach is that at least if new events are newer than the last event on the stream you can simply append rather than re-creating.

you need a resequencer somewhere in the pipeline

a way to achieve that:

  • each dayly / weekly ingestion contains a complete record for the period of time
  • for a specfic patient n the amount of record in the batch can be reordered in memory

( e.g if you have an ingestion on 2023-07-20 containing data for 2023-07-19 , the 2023-07-19 data is complete and data about patient 1 can be held in memory before appending for that day )

→ ingestion process creates a stream per patient per day
e.g patient-1-2023-07-19 contains all relevant record for the patient 1, ordered for that day.

→ when a new batch arrive with data for patient 1 for 2023-07-25 actual date: you create a stream patient-1-2023-07-25
( there is in this case no data for the rage 2023-07-20 → 2023-07-25)

you add metadata at the stream level of patient-1-2023-07-19 saying "{ “nextStream”: “patient-1-2023-07-25”}

your infrastructure code can then easily read / subscribe to all record from a patient by reading the stream metadata and adjust the read / subscription when needed

if , in this case , later there is some data for actual date 2023-07-20 then what you do in the ingestion pipeline is changing the stream metadata to { “nextStream”: “patient-1-2023-07-20”} and add { “nextStream”: “patient-1-2023-07-25”} to the newly created patient-1-2023-07-20 stream

basically creating linked list distributed over all the streams of a specific patient and using that as a way to resequence the data over each day
intra day ordering is done at ingestion time

ALternative to the above ( instead of using stream metadata) is to add an event at the end of the streams e.g
“Closed” , { “next”: “the next stream id”}
has the advantage of being able to react to it more easily in subscription & read opeations , though the data in that event is not really a business events.

I tend to use a combination :
having a “Closed” event , then the relevant info for the infrastructure code in the stream metadata

if the assumtion about “compleness” of data over the choosen granularity is not true:
you’ll need to

  • read the current now data from the stream in mem, add the new data , reorder and append it all .
  • truncate the stream

ie. if you had 10 events in stream 2023-07-19 and need to add and order 5 new events
=> read the 10 events, reorde with the 5 new ones => append 15 events, truncate the stream at the 15th event