Enriching events - stream - table join

My service needs to publish external events to downstream systems. These external events are often fat ones, containing data from different aggregates. This basically can be achieved using stream-table join.

For example, I have a BusTripExecution aggregate publishing the ArrivedToCityStop(busTripExecutionId, busTripId, cityStopId) event.
I also have the following aggregates:

  1. CityStop aggregate containing information about the city stop - (cityStopId, street, city, country, number of awaiting passengers).
  2. BusTrip (busTripId, startingCityStopId, destinationStopId)

For external systems, I need to publish the enriched event ArrivedToCityStopEvent(_Enriched) with the information about the bus trip execution and latest city stop and bus trip info.

ArrivedToCityStopEvent
{  
  BusTripExecutionId,
  BusTripId,
  StartCityStop // taken from BusTrip joined with CityStop
  {
    Street,
    City,
    Country
   },
   DestinationCityStop // taken from BusTrip joined with CityStop
   {
     Street,
     City,
     Country
   },
   ArrivedToCityStop // taken from BusTrip joined with CityStop
   {
     Street,
     City,
     Country,
     AwaitingPassengers
   }

}

Basically we need to join BusTripExecution events with latest state of CityStop and BusTrip, a stream-table join.

To implement this in EventStore, I would imagine having a projection running inside its own catch-up subscription that subscribes to BusTrip & CityStop events, creates caches for each in read side storage and when handling the BusTripExecution events performs the enrichment.

Are there better solutions to implement this?

I have a few concerns with this solution:

  1. Performance, as same thread will be in charge of processing events for all the bus trip executions.
  2. Reusability. Other events coming from other aggregates might need to be enriched with the same data, or this data might be already built for query purposes.

When we build enriched events (similar to what you have here) we have a single “enricher” projection running that subscribes to the streams we are interested in, and simply spit the enriched event out (emit to a single stream). The single stream gets stamped with a max lifetime 930 days) so it’s self cleaning as well.

Steven, if we go with separate subscription producing and publishing this enriched event, it means that this event might be published out of order in respect to other events published by this service. This implies that, if I want this order, I need to have a single subscription enriching and publishing all events where I want this order to be respected.
This might be a performance concern.

Maxim,

Just so I understand the process (and my understanding of projections)

  1. Enricher projection would receive events in the date order they were created (it’s one event at a time, irrespective of the number of streams subscribed to - i.e fromStreams(‘stream1’,‘stream2’) )
    2.You construct the enriched event, and then emit to a result stream (again, we are in date order, so everything is still "in order)
  2. It’s the route from the result stream to your read model that could become a source of ordering? If you use a Persistent subscription, it could be processing events concurrently, which would give you the ordering problem.
  3. Using a catchup removes the ordering issue, but means processing 1 event at a time (no concurrency)

In our systems, we use a persistent subscription, but ordering is not a problem, because our real model checks the date, and if the event processed is older, we simply ignore it.
The enriched event holds all the information, so we never lose anything by skipping older events.

Here is an example of an enriched event (from the json, you it might not be obvious what triggered it, but it was a Sale, but the previous things that have happened to the product (last delivered etc, remain in place)

{
  "$type": "Vme.Eposity.QueryModelWriter.BusinessLogic.Events.StoreProductHistoryUpdatedEvent, Vme.Eposity.QueryModelWriter.BusinessLogic",
  "storeProductId": "5bdaa9a8-69f0-871d-1a1e-e15042193629",
  "eventId": "6b060978-f013-e8fb-eb4c-571d1053cf87",
  "ActivityDateTime": "2021-01-27T08:41:53.063Z",
  "storeId": "525d796c-aa73-43bf-93fb-1205e162d7bc",
  "organisationId": "6e7eadde-b84f-4a64-b287-2b98d805a958",
  "CurrentStockLevel": 5,
  "organisationProductId": "097d303c-bf7d-445e-8723-cf4b61b75f6d",
  "IsSale": true,
  "salesTransactionId": "592edc00-b0bd-9d77-078e-67a8d0e97123",
  "salesTransactionLineId": "6b060978-f013-e8fb-eb4c-571d1053cf87",
  "NumberOfItemsSold": 1,
  "state": {
    "lastSale": "2021-01-27T08:41:53.063Z",
    "lastDelivery": "2021-01-22T08:06:00Z",
    "lastOrder": "2021-01-20T21:00:57.31Z",
    "currentMpl": 2,
    "previousMpl": 2,
    "lastMplChangedDate": "2020-08-18T22:37:09.143Z"
  }
}

Btw performance concerns can be address by batching and keeping some of the static information in memory (like, bus stops never change, the number of passengers does change). That makes me think that the CityStop might need to be split into two as one part of it is strictly static and another one is highly dynamic.

Issues with the order could be resolved by comparing the time. Also, buses don’t teleport, so that could ease the ordering requirement.