Rx for EventStore

I got Rx working with EventStore via an IQbservableProvider!

So the query below is written on the client but runs inside of EventStore so it’s nice and quick, and only the results are streamed back (via gRpc).

var options = new StreamDbContextOptionsBuilder()
    .UseEventStore("127.0.0.1:5001")
    .Options;

new EventStoreContext(options)
    .FromAll()
    .Where(e => e.EventType == "CustomerCreatedEvent")
    .Where(e => new DateTime(2018, 3, 1) <= e.Created)
    .TakeWhile(e => e.Created < new DateTime(2018, 4, 1))
    .Select(e => e.Data)
    .Subscribe(
        onNext: s =>
        {
            var @event = JsonConvert.DeserializeObject<CustomerCreatedEvent>(s);
            Console.WriteLine($"{@event.CustomerId}: {@event.Email}");
        },
        onError: e => Console.WriteLine("ERROR: " + e),
        onCompleted: () => Console.WriteLine("DONE")
    );

``

See the fork of EventStore with an example Rx client if you want to play around (see the Qube.EventStore.Client project). There’s also an in-memory provider and vanilla gRpc provider kicking around in there.

This is an experimental project to see what’s possible and what doesn’t work so well, so lots to explore - and feedback is most welcome.

Cheers

Great work! Really interesting for cases when creating a ’native’ projection is overkill. I haven’t had time to look closer, so these questions might be obvious:

Any possibility to make it work as catchup subscription, passing checkpoint?

Backpreassure?

/Peter

Tx and good questions.

Catchup subscriptions and passing checkpoints could be done by passing something into the .From*() methods. This can then get passed through to the gRpc query for ES to deal with. As results come back, details can be added to the response envelope (badly named EventEnvelope atm), so either side can track progress. Also, once all events are loaded from storage ES doesn’t need to OnCompleted - that would just be for queries. It could just switch to live-mode instead.

I don’t think there should be any backpressure other than the client just needing to keep up. It looks like the client could use something like ObserveLatestOn but I’d need to dig into it. On the ES side events are loaded one batch at a time and results streamed back to the client one item at a time - so as I understand it, gRpc wont send faster than the client has acknowledged.

Cheers

This looks great, I’m going to go take a look at the fork.

-Chris

:+1:

Hey Mat, good to hear from you :slight_smile:

There are a few things to iron out yet before it’s as useful as the current projection API - mainly support for strongly typed events and anonymous types. But it seems like a glove that fits so I’m excited by what’s possible.

I look forward to your thoughts.

Cheers,

Jas

Is this lifting the matches and pushing them over the wire as expressions?

This is pretty cool overall!

Yeah, the entire linq expression is extracted by the provider on the client, serialized & sent to ES where it’s compiled and run against a local observable. Since IQbservable implements IObservable there’s no need to change the expression tree, so straight away you get a fully-featured linq provider with the goodness of the Rx’s schedulers.

All the hard work’s done by System.Reactive.Linq and Serialize.Linq, so it’s super simple. The hard part was reflecting over the expression tree to wire it things up but in the end that was only about a dozen lines of code.

For it to be genuinely useful though, it’ll need to support anonymous types and strongly-typed events (since expressions don’t dupport dynamic types). I’m hopeful this wont be too difficult - my plan is to look at that next.

Suppor for strongly-typed events is in 

// Client code
es.When()

.Where(e => e.Email.Contains(".test@"))

.Subscribe

(

onNext: e => Console.WriteLine(e.CustomerId),

onError: ex => Console.WriteLine("ERROR: " + ex),

onCompleted: () => Console.WriteLine(“COMPLETED”)

);

``

It’s returning the event itself now rather than the envelope, which I think is nicer.

Having .FromAll<T>() parameterised with a client-defined base-class seems to make sense, and an overload without the generic parameter that returns object.

I did a spike to push this Rx provider to where it can do something practical.

A good strategy for creating an event sourcing projection seems to be to first get the events filtered and partitioned into the desired streams in EventStore (the heavy-lifting done server-side), and only then load each stream (in parallel) from a C# client to do the final reduction into document storage or simply on the fly as you would load an aggregate. This example is an Rx version of three chained EventStore projections we have, leaving out the final reduction step:

  1. Combine CategoryA and CategoryB, and linkTo a new stream by a shared Id.

  2. Enrich events in this stream with a CategoryC Id that’s missing from most and linkTo a new stream by this CategoryC Id.

  3. Join in CategoryC’s events and linkTo the final stream by CategoryC’s Id.

I should probably post the original JS projections too as a comparison but in any case, here’s the Rx query that does all three steps without linkTo’s. It’s probably fine, but I changed the names of things in case it’s sensitive.

var es = new EventStoreContext(options);

var categoryAAndBEvents = es.FromAll(); // TODO: Filter by category

var categoryCEvents = es.FromAll(); // TODO: Filter by category

var stream = categoryAAndBEvents

// Step 1

.GroupBy(e =>

(e is SomeEvent) ? ((SomeEvent)e).Id :

(e is SomeOtherEvent) ? ((SomeOtherEvent)e).VerboseNameId :

(e is AndAgainEvent) ? ((AndAgainEvent)e).IdId :

// etc…

Guid.Empty

)

.Where(g => g.Key != Guid.Empty)

// 2. Attach CatCId (tricky when limited to expression trees)

.Select(g => g

.Aggregate(

Guid.Empty,

(s, e) => (Guid)(e.GetType().GetProperty(“CatCId”) == null ? s : e.GetType().GetProperty(“CatCId”).GetValue(e, null))

)

.Zip(g, (c, e) => g.Where(x => !x.MetaData.ContainsKey(“CatCId”)).Do(x => x.MetaData.Add(“CatCId”, c)))

.SelectMany(x => x)

)

.SelectMany(g => g)

// 3. Merge in categoryC

.Merge(categoryCEvents)

.GroupBy(e =>

(e is SomeEvent) ? ((SomeEvent)e).CustomerId :

(e is AndAgainEvent) ? (Guid)(e.MetaData.ContainsKey(“CatCId”) ? e.MetaData[“CatCId”] : Guid.Empty) :

(e is SomeCatCEvent) ? ((SomeCatCEvent)e).CatCId :

// etc…

Guid.Empty

)

// 4. At this point the groups (streams) are ready for the final reduction into a document. This is just for logging.

.Where(g => g.Key == new Guid(“879c5d49-fd72-4392-bf88-8bba363e590e”))

.SelectMany(g => g)

.Subscribe(

onNext: s => Console.WriteLine(JsonConvert.SerializeObject(s)),

onError: e => Console.WriteLine("ERROR: " + e),

onCompleted: () => Console.WriteLine(“DONE”)

);

``

The JS API is limited by comparison to Rx but a lot can be done in JS and done easily. Working withing the constraints of expression trees is no fun coming from that environment, but like IQueryable you kind of get used to it and learn a few recipes. Also, anonymous type support would help a lot. Rx is pretty capable, fully testable and my feeling is that it would be a preferred API for EventStore with a few utility methods added.

I leaned that more work needs to be done with strong-typing support and how the Rx query is wired up on the ES side. I’ll get things tidied up, ironed out, etc. and push another update to the code when I get time.