"What I want to achieve is to get the number of events of specific
type emitted in the last 1hr (or sum/avg of their properties), and get
the same values refreshed every minute (which is good enough for my
specific purpose). I will have multiple streams (or groups) I need to
have this operation performed on, say something like sum of payments
on a given route in the last hour, and I'll have multiple routes."
I see what you want to do but remember we need to do this in a generic
way. What happens when its one month of data that needs to be
maintained on a 50ms granularity? We can't do replay every 50ms
There are some known solutions to this issue. In particular you model
"enter/leave". Something like:
.when({
Foo : function(s.e) { ... }
$FooExpiredFromWindow : function(s,e) { ...}
})
where the FooExpired is called when something falls out of the window.
Luckily this is not a new problem. You then basically keep two
pointers into the stream (one for current one for the backend of the
window). This will also work historically but it can run into some
issues. For small data this mechanism is also trivial to do in your
own code for your specific case.
I believe rx IIRC does something like this by holding the events in
the window in memory (that obviously doesn't work for large sets but
does work fine for small ones with only a few events, you can however
do that now with projections btw!)
Let's say that the system is in live mode and its not receiving any
events. When does it move up the tail end of the pointer? Does it poll
to find out when it should move? This is possible but ca be costly
(especially when you consider a forEachStream could have 50m separate
folds like in your case of doing the route). Again in your example
this would be absolutely trivial as you have one minute granularity at
a few ms this would become much more complicated. There are some ways
of dealing with this...
This is something that is likely to get implemented at some point.
However from an outside perspective for many usecases its relatively
easy to implement now "on the outside" or for very simple use cases
internally.
The other issue with this is that most systems we have seen wanting
this capability want not just the current value but historical values
as well and they tend to want dynamic roll ups (eg change around the
time periods). This query is actually better handled through say a 1st
nf database where in my projection I write out rows to a table (in
your example "payment" "route" "amount" "time"). They would then query
and do filter + a set operation
eg select count(*), sum(payment) from payments where time > xxx and time < yyy
This again works reasonably well and is quite flexible until the #of
payments between xxx and yyy gets very large.
Cheers,
Greg