IQbservableProvider

It seems that,

fromCategories(‘category1’, ‘category2’)

.foreachStream()

.when({

$init: { count: 0 },

$any: (s, e) => { s.count++; },

});

``

Is about the same as the following in Rx,

category1.Merge(category2)

.GroupBy(e => e.Id)

.SelectMany(x => x.Scan(new { Count = 0 }, (s, e) => new { Count = s.Count + 1 }))

.Subscribe(s => Console.WriteLine(s.Count));

``

and I imagine pattern matching and the other operations wouldn’t be too much of a stretch.

If so, it seems like it would make sense to write an IQbservable provider for ES. That sounds guru to me but there seem to be quite a few libraries written already that gets pretty close.

Thoughts?

Attaching to a subscription/catchupsubscription would in fact be quite
easy... they already raise as an event...

Interesting… I’ve got the feeling it’s quite doable, I’ll keep tinkering.

The penny just dropped for me how powerful ES is - particularly with foreachStream(). Instead of a projection taking 4 hours from C# with significant code & ceremony, four ES projections now do it in 2 mins … and in less that 20 lines of JS. It’s exciting stuff.

Rx wont help until it’s IQbservable though, given the memory requirements.

Observable.FromEventPattern

Given an IQbservable expression could be serialised easily, sent over the wire to ES which deserialises it into an IObservable expression, and with the source already hooked up to the events you mention, how hard could it be?

As I send it I’m already starting to see devils in the detail, but where there’s a will…

I guess my intuition is that since there are already C# projections on the ES server then there’s no need to translate linq-to-somequerylanguage. So the observable (& therefore qbservable provider) would be trivial.

Just to chip in, I know it isn’t exactly the same thing but is related - I recently set up an AsyncEnumerable which allowed me to connect to ES using the Atom pub / sub but treat the data as a continuous stream like you would with an Observable, piping it through functions.

It wraps up the paging in a nice way and makes it invisible to the consumer.

I was using F# but C# 8 has them out of the box, no need for RX or anything.

let getEventPagesFrom

(restClient : IRestClient)

address =

let headers = Some[

Header(“ES-TrustedAuth”, “admin; $admins”)

Header(AcceptHeader, JsonType)]

let param = Some ([Parameter (EmbedProperty,BodyValue)]) // include event data, not just links. Only available with JSON response

let initialRequest = Get (address, param, headers)

let maybeGetNextRequest slice =

maybe {

let! link = slice.Links |> Seq.tryFind (fun l -> l.Relation = PreviousLink)

let address = Address link.Uri.AbsoluteUri

return Get (address, param, headers)

}

let rec loop request =

asyncSeq {

let! result = restClient.execute request // await response

match result with

Ok slice ->

yield Ok slice.Entries // return events

match maybeGetNextRequest slice with // look for next page link

Some nextRequest -> yield! loop nextRequest // recurse through remaining pages

None -> () // end of seq, break

Error e -> yield Error e // page request failed, break

}

loop initialRequest

This is the IAsyncEnumerable C# version of F#'s AsyncSeq

https://stu.dev/iasyncenumerable-introduction/

Ah interesting, nice work.

This IQbservable thing can totally work. I’ll be up to my ears in expression trees for a bit, but it wasn’t too painful to get a proof of concept going.

Any pointers to where I’d start to look in EventStore’s code to hook this up? ie. where to use Observable.FromEventPattern to get an iobservable around $all?

It’s a little bit exciting but I have a (full) Qbservable provider running now so you can run queries like,

// Simple

new EventStreamDbContext(“https://localhost:5001”)
.FromAll()
.Where(e => e.Category == “Category1” || e.Category == “Category2”)
.Skip(1)
.Take(15)
.Subscribe(e => Console.WriteLine(e.Category + “-” + e.Id));

``

and

// Map-reduce: Count events by category

new EventStreamDbContext(“https://localhost:5001”)
.FromAll()
.GroupBy(e => e.Category)
.SelectMany(g =>
g.Scan(
$"{g.Key}:0",
// Will be simpler when there’s support for anonymous types
(s, e) => $"{e.Category}:{int.Parse(s.Split(’:’)[1]) + 1}"
)
)
.Subscribe(s => Console.WriteLine(s));

``

over the wire.

It turned out to be relatively straight forward. There are limitations to work through like support for anonymous types but I’m keen to wire it up to EventStore next.

Out of interest I’m using grpc in .Net Core 3 which fits like a glove for this.

Nice. We would be interested in using this. Just out of interest, can you show what the ES JavaScript projection looks like underneath an example IQbservables?

– You received this message because you are subscribed to the Google Groups “Event Store” group.
To unsubscribe from this group and stop receiving emails from it, send an email to
.
To view this discussion on the web, visit .

There is no JavaScript. The Rx query runs in C# as it’s written - but on the server so that only events or results are sent back over the wire.

Oh I see! Makes sense now, for some reason assumed no server side changes. Lovely stuff, do you have/plan to have a repo people could contribute to?

– You received this message because you are subscribed to the Google Groups “Event Store” group.
To unsubscribe from this group and stop receiving emails from it, send an email to
.
To view this discussion on the web, visit .

Yeah, I’ll push the code to github when I get a moment and let you know - hopefully this coming weekend.

Note that it’s not hooked up to anything at the moment. I want to see it run from EvenStore so I’ll spend some time on that but in the meantime I can get this bit out.

The code for this is here:
https://github.com/JasonKStevens/QbservableProvider

It’s being fleshed out at the moment including support for different providers (an in-memory provider, grpc and soon EventStore). So to see the simpler proof-of-concept version it’s probably best to look at the initial commit.

I just got a fork of EventStore working with an initial IQbservableProvider.

The following Rx query is on the client but gets executed by the EventStore server, and just the results are streamed back.

var options = new StreamDbContextOptionsBuilder()

.UseEventStore("127.0.0.1:5001")  // TODO: support ES connection string format

.Options;

new EventStoreContext(options)

.FromAll()

.Where(e => e.EventType == "CustomerCreatedEvent")

.Where(e => new DateTime(2018, 10, 1) <= e.Created && e.Created < new DateTime(2018, 11, 1))

.Where(e => e.Data.Contains("[email protected]"))

.Subscribe(

    onNext: s =>

    {

        var @event = JsonConvert.DeserializeObject<CustomerCreatedEvent>(s.Data);

        Console.WriteLine($"{@event.CustomerId}: {@event.Email}");

            },

    onError: e => Console.WriteLine("ERROR: " + e),

    onCompleted: () => Console.WriteLine("DONE")

);

``

For this initial work the outstanding piece is to allow different types to be subscribed to. This will need a bit of reflection magic but it should be reasonable.

The ES fork is up to date and I’m happy with how it’s implemented in broad-strokes, without knowing too much about the code-base. It might be possible to move the Rx query closer to the metal but it’ll do for now.

I’d like to tidy the client code before I push it since it’s had a spike through it, so what’s there is before the ES provider changes. I’ll post an update at the next milestone, probably in a week.

Jason,

Just stumbled across this now.

This looks very interesting!

Cheers Steven, if you’re interested there’s a fork of ES with it working and an example client.