I did a spike to push this Rx provider to where it can do something practical.
A good strategy for creating an event sourcing projection seems to be to first get the events filtered and partitioned into the desired streams in EventStore (the heavy-lifting done server-side), and only then load each stream (in parallel) from a C# client to do the final reduction into document storage or simply on the fly as you would load an aggregate. This example is an Rx version of three chained EventStore projections we have, leaving out the final reduction step:
-
Combine CategoryA and CategoryB, and linkTo a new stream by a shared Id.
-
Enrich events in this stream with a CategoryC Id that’s missing from most and linkTo a new stream by this CategoryC Id.
-
Join in CategoryC’s events and linkTo the final stream by CategoryC’s Id.
I should probably post the original JS projections too as a comparison but in any case, here’s the Rx query that does all three steps without linkTo’s. It’s probably fine, but I changed the names of things in case it’s sensitive.
var es = new EventStoreContext(options);
var categoryAAndBEvents = es.FromAll(); // TODO: Filter by category
var categoryCEvents = es.FromAll(); // TODO: Filter by category
var stream = categoryAAndBEvents
// Step 1
.GroupBy(e =>
(e is SomeEvent) ? ((SomeEvent)e).Id :
(e is SomeOtherEvent) ? ((SomeOtherEvent)e).VerboseNameId :
(e is AndAgainEvent) ? ((AndAgainEvent)e).IdId :
// etc…
Guid.Empty
)
.Where(g => g.Key != Guid.Empty)
// 2. Attach CatCId (tricky when limited to expression trees)
.Select(g => g
.Aggregate(
Guid.Empty,
(s, e) => (Guid)(e.GetType().GetProperty(“CatCId”) == null ? s : e.GetType().GetProperty(“CatCId”).GetValue(e, null))
)
.Zip(g, (c, e) => g.Where(x => !x.MetaData.ContainsKey(“CatCId”)).Do(x => x.MetaData.Add(“CatCId”, c)))
.SelectMany(x => x)
)
.SelectMany(g => g)
// 3. Merge in categoryC
.Merge(categoryCEvents)
.GroupBy(e =>
(e is SomeEvent) ? ((SomeEvent)e).CustomerId :
(e is AndAgainEvent) ? (Guid)(e.MetaData.ContainsKey(“CatCId”) ? e.MetaData[“CatCId”] : Guid.Empty) :
(e is SomeCatCEvent) ? ((SomeCatCEvent)e).CatCId :
// etc…
Guid.Empty
)
// 4. At this point the groups (streams) are ready for the final reduction into a document. This is just for logging.
.Where(g => g.Key == new Guid(“879c5d49-fd72-4392-bf88-8bba363e590e”))
.SelectMany(g => g)
.Subscribe(
onNext: s => Console.WriteLine(JsonConvert.SerializeObject(s)),
onError: e => Console.WriteLine("ERROR: " + e),
onCompleted: () => Console.WriteLine(“DONE”)
);
``
The JS API is limited by comparison to Rx but a lot can be done in JS and done easily. Working withing the constraints of expression trees is no fun coming from that environment, but like IQueryable you kind of get used to it and learn a few recipes. Also, anonymous type support would help a lot. Rx is pretty capable, fully testable and my feeling is that it would be a preferred API for EventStore with a few utility methods added.
I leaned that more work needs to be done with strong-typing support and how the Rx query is wired up on the ES side. I’ll get things tidied up, ironed out, etc. and push another update to the code when I get time.