Query to read from multiple streams

I am relatively new to EventStore and would like to read the events from multiple streams to rebuild a read model. I am struggling with how to do this and I can’t seem to get the examples I have found to work.

For example, the following wiki page shows how to use the esquery command line tool to append four events into multiple streams and then read them back. However, when I run through the example I do not get the events returned, I only see the last event:

es:> append foo1 MyType {‘foo1’ : ‘data1’}

Succeeded.
es:> append foo MyType {‘foo’ : ‘data2’}

Succeeded.
es:> append foo1 MyType {‘foo1’ : ‘data3’}

Succeeded.
es:> append foo MyType {‘foo’ : ‘data4’}

Succeeded.
es:> q fromStreams([‘foo’, ‘foo1’])

Query started. Press esc to cancel.
Query Completed in: 00:00:01.1369890
Result:

{
“foo”: “data4”
}

Query Completed

I have all the standard projections enabled, and will ultimately be needing to combine $et- streams together.

So, my questions are:

  • How do I create a query that combines multiple, known, streams (I am assuming this is possible and ordering by time will occur automatically)?
  • Do I even need a query or can this be done via some other mechanism? (I would rather not have a long running projection combining streams if possible as the projection does not need to be running all the time)
  • How to create the query and read the results via the .Net API?

Many thanks!
Cameron

What do you get in the UI when you try fromStreams()?

Hi Greg,

I see the same output:

Query:
Completed/Stopped/Writing results
Source:
fromStreams(['foo', 'foo1'])

State:
{
  "foo": "data4"
}
    
Cheers
Cameron

Add .when(
          $any : .... and count or something
       )

Thank Greg for the rapid response!

I'll give that a try but just to confirm in the meantime - I want to replay the combined stream of events, I don't want to project new state or emit new events.

Thanks
Cameron

I was just showing your how to test it (I dont think fromstreams
returns the combined stream now).

a quick example:

fromStreams(['foo', 'bar', 'baz').
     when(
           $any : function(s,e) { linkTo('shitbird', e);
     )

After you should have a stream shitbird with your events in it.

There is also some query support for doing joined streams in the API
but its currently not documented and likely to change in the next few
weeks

Thanks Greg,

I tried the example you provided:

fromStreams(['foo', 'foo1']).
     when({
           $any : function(s,e) { linkTo('shitbird', e); }
     });

It would not work from the query UI as emit is not enabled.

If I turn it into a transient projection I do indeed see the shitbird stream.

Any advice on where to go from here?

Hi again,

Can you please provide some suggestions on how this can (hopefully?) be solved without creating a projection and emitting all the events into a new stream?

I can do it programmatically by reading each stream and then ordering the events by time but that seems like overkill if Event Store can do it for me

Thanks
Cameron

Is there such a way? It would be a nice feature so that filtering/aggregation could be done server side.

Not sure yet – would like an official response from Greg.

Right now I am loading all the streams into memory and then sorting. This is ok for the moment as the streams are short but will not
be acceptable for long.

On Behalf Of [email protected]

Agreed. I understand the first implementation using javascript like the general projections do now.

However the nicer experience (from a .net perspective) would be a IQbservable provider and use of LINQ like StreamInsight does (https://blogs.msdn.microsoft.com/streaminsight/2012/07/09/streaminsight-2-1-meet-linq/ ).

I am not sure why this would be in the client API (eg observables) you
can wrap any subscription as an observable trivially (it has a call
back function just call the observable.

The second part that takes N subscriptions, orders them and outputs a
single subscription (where N is small). This including handling
persistence of checkpoints is < 100loc. Where N is large you would
prefer an All subscription and ignore messages. I again struggle why
we would want to build this in client api as opposed to on top of it.

Greg

I’ve got some code that does much of what your looking for as a layer on top of the event store client.

You’ll need to make it your own but this will get you started.

I’ll put it up on github and share a link here.

-Chris

Thanks Chris, I appreciate it although I think the discussion may have drifted from my existing quer.

Greg – my question concerns the ability to replay/read a number of existing, known, streams ordered by time in a query. I don’t want
observables, or subscriptions, and it seems wasteful to create a transient projection to process many (potentially very many!) events into a new stream just to get to the event data.

This seems like a fairly straightforward request and that is what I would like some direction on.

Thanks

Cameron

On Behalf Of Chris Condron

you misunderstand. observable is not the desired bit. It’s the bit about being to pass a server side query that does the aggregation/filtering/etc. IQbservable as in QUERY (not IObservable) feature.

But you have that already, it’s just in javascript. Rx is not something you want to take a dependency on internally. For starters it does not ilmerge very well.

To read say 3 streams [foo, bar, baz] and get one stream out:

Read from foo,bar,baz keep buffer for each. take the event with the
lowest position from the 3 buffers, repeat.

btw ^^^ is exactly what projections does.

That leads me to think.. As a variation to the one stream per ar; Would it be a good idea to design an aggregate that builds up it's state from a projection stream, but saves to the various streams the projection is based on?

Like an account, saving settings to settings-[id], transactions to transactions-[period]-[id] and so on, but the account ar is built up from acproj-[id] that reads from settings, transactions-[period] and so on.