Projection on projected streams (by category)

Hi,

I want to solve a real problem I have at a client as POC, by using the eventstore projections. Initially the problem was solved by scheduled SP’s, and now a solutions with a bunch of NServiceBus Handlers is used. But I have the impression that it could be solved simpler with the EventStore projections after the course on Oredev.

Problem:
At a client we have energy meters which register many measurement values. Actually we have a value every 15 minutes. This value is used to display graph’s with averages. The averages are updated and stored when an event is received.

We have following averages:

  • Average per hour
  • Average per day
  • Average per month
  • Average per year
  • Average per HourOfDay
  • Average per DayOfWeek

Theoretical solution with ES Projections:

Let assume we have a stream with measurements of following format:

MeasurmentRead
{
DateTime: timestamp
decimal: value
}

  1. And we have a stream per Meter:

Eg: MeterTV

  1. The first projection I would apply is to filter the MeasurementRead event to a separated stream. This would create a sub-set that is used as source for the following projections.

Eg: MeasurementRead

  1. Then I would apply a projection (linkTo) for each type of average to group the measurements per meter for the time frame of the average:

Eg per hour: MeasurementPeriod-MeterTV+H20121108-12
Eg per day: MeasurementPeriod-MeterTV+D20121108
Eg per month: MeasurementPeriod-MeterTV+M201211
Eg per year: MeasurementPeriod-MeterTV+Y2012
Eg per hour of day: MeasurementPeriod-MeterTV+HD01
Eg per day of week: MeasurementPeriod-MeterTV+Monday

It would be nice assign a catorgy to these streams, so that next projection is only applied to streams with that category. The category in this case is ‘MeasurementPeriod’

  1. Then I would create a projection to create the total and average per stream creation in previous step. The format would look like this:

MeasurmentAverage
{
decimal: total
decimal: average
int: count
}

Eg per hour: MeasurementAverage-MeterTV+H20121108-12
Eg per day: MeasurementAverage-MeterTV+D20121108
Eg per month: MeasurementAverage-MeterTV+M201211
Eg per year: MeasurementAverage-MeterTV+Y2012
Eg per hour of day: MeasurementAverage-MeterTV+HD01
Eg per day of week: MeasurementAverage-MeterTV+Monday

Question:

I’m aware that this create a hugh amount of streams. Are the ES Projections intended to solve this kind of problems?

My Current Implemention:

  1. Project all MeasurmentRead’s into a single stream

fromAll().when( {
‘MeasurmentRead’: function (s, e) {

if (e.body == null) return s;

linkTo(‘MeasurmentRead’, e);

return s;
}});

No problems with this one…

  1. Next projection projects the event per Hour Of Day

fromStream(‘MeasurmentRead’)
.whenAny( function(s,e) {

var FormatName = function(when) {
var date = new Date(when);
return date.getHours();
};

if (e.body == null) return s;

var name = ‘MeasurmentPeriod-’ + e.streamId + ‘+H’ + FormatName(e.body.timestamp);

linkTo(name, e);

return s;
});

This also works, the events are grouped per stream and hour of day.

  1. Calculate totals per time period

So for this one I assume that all stream + eventd created by previous projection are added to ‘MeasurementPeriod’ category.
So I can calculate the total for each MeasurmentPeriod stream

fromCategory(‘MeasurmentPeriod’)
.fromEachStream()
.whenAny( function (s, e) {

  if (e.body == null) return s;

  var total, count;  
  var reading = e.body.value;
  if (s.data == null)
  {
    total = reading;
    count = 1;
  }
  else
  {
    var previous = s.data;
    total = previous.total + reading;
    count = previous.count + 1;
  }

  s.data = {
    total : e,
    count : count,
    average: total / count
  };      
  return s;

});

This is where I’m stuck for the moment. The projection does not process the event’s from the previous projection. Only the ‘$stream-created’ event of the projected ‘MeasurmentPeriod’ streams is processed. So actually I would like to process the results of a previous projection (by using category). Is there a wathis a scenario that is be supported? Is there an other way to accomplish this?

Thanks in advance,

Tim

Oops, too fast, I was still working on the last sentence:
Is there a way to process the evenst added to the categorized stream?

If I were by chance to not get a reading from a sensor for one time unit what should I display (and yes this is the exact intention of the js projections in general)

Missing values should not be taken in account to calculate an average, because they would make the average unrepresentable.

Tim,

let me ask some questions to clarify what you have:

  1. Do you write events from each Meter into its own stream? and these events include MeasurementRead and other event types?

  2. and what is MeasurementPeriod-MeterTV+H20121108-12 in your case? Is it stream name? What is a reason to have a separate stream to hold just one average value? Do I miss the idea?

-yuriy

  1. Yes. MeterTV ( and other Meter streams) contains MeasurementRead event + other events. The needed ( MeasurementRead ) events are filtered in Step 2 to a single stream. This prevents that other types of event are process by the timeframe projections of step 3.

  2. MeasurementPeriod-MeterTV+H20121108-12 contains the events for one single meter for a specific timeframe. In this case: All MeasurementRead events for MeterTV between 11/12/2012 - 12:00 and 11/12/2021 - 12:59. And I also create create streams for all other time-frames (Step3). Eg: Day of the week (MeasurementPeriod-MeterTV+Monday).

The average is calculated upon the events in these streams (timeframe per meter) by on single projection which stores state. This average is stored (projeted) in a state per timeframe stream (step 4). So yes, implicitly a single stream is created to hold a single average value, this is done automaticlly by the projection API.

Maybe I’m taken this to far. But, as not all values of the average are known in the beginning of a timeframe, I need to store these intermediate total and average anyway for each timeframe. I could replay all the events for the an average calculation each, but if I need to calculate an average per day of week for a timespan of 2 years I need to to process at least 2 * 365 * 24 * 4 = 70080 events.

I created a diagram to clarify the projections, maybe this helps…

Measurement.Electricity.png

This seems overly complicated to me. Do you really need a stream/state per timeframe?

You could have your meter streams as a category, then something like this:

fromCategory(‘meter’).foreachStream().when(

‘MeasurementRead’: function (s,e) {

// state in here is per stream

// so you can have a single state at

// http://:2113/projection//state?partition=meter-TV

// which stores the averages (for all timeframes) for that meter

}

);

This would mean this 1 state (per meter) would grow forever and hold:

  • AveragePerHour
  • 22/11/2012 - 01:00 : 1,2kwh
  • 22/11/2012 - 02:00 : 1,3kwh
  • 22/11/2012 - 03:00 : 1,2kwh
  • 22/11/2012 - 04:00 : 1,4kwh
  • 22/11/2012 - 05:00 : 1,6kwh
  • 22/11/2012 - 06:00 : 1,2kwh
  • 22/11/2012 - 07:00 : 2.9kwh
  • 22/11/2012 - 08:00 : 3.5kwh
  • 22/11/2012 - 09:00 : 4.8kwh
  • 22/11/2012 - 10:00 : 3.5kwh
  • 22/11/2012 - 11:00 : 2.9kwh
  • 22/11/2012 - 12:00 : 3.6kwh
  • 22/11/2012 - 13:00 : 3.8kwh
  • 22/11/2012 - 14:00 : 5.0kwh
  • 21/11/2012 - 01:00 : 1,2kwh
  • 21/11/2012 - 02:00 : 1,4kwh
  • 21/11/2012 - 03:00 : 1,6kwh
  • AveragePerDay
    • 22/11/2012 : 3.2kwh
    • 21/11/2012 : 2.9kwh
    • 20/11/2012 : 3.1kwh
  • AveragePer Month
    • 11/2012: 4.5kwh
    • 10/2012: 4,1kwh
    • 09/2012: 2,3kwh
  • AveragePerWeekday
    • Mon - Tue

Is this what you suggest?

Tim,

ES supports scavenging by MaxAge for this purpose. You can have streams holding events with last 10 days of data.

-yuriy

Yes, just seemed like a simpler solution, but maybe you are right that it would get unwieldy after a while with a significant amount of hourly averages in a single state object.

Are you able to throw away old hourly values or need to keep them that granular forever?

Actually I have two kind of time-frame projections:

  • The first one recreates new time-frame’s as time passes by. Eg:AveragePerHour, AveragePerDay, …

  • The second one has a fixed number of time-frames: DayOfWeek, HourOfDay
    I could use the state for the second one, but for the first type it seems to much.
    We don’t keep them that granular forever like you suspected, but for example:

  • hourly we keep 3 months: 3 * 31 * 24 = 2232

  • daily 2 years: 365 * 2 = 730


  • This is maybe possible to store in the state, but does not look so clean in my *opinion.*The reasoning behind how I use the different projections is:

  • Each projection has only one single purpose (SRP)

  • I can reuse the logic to calcuate the average upon different projections. (Maybe this is reuse at the wrong level)

  • I can easily add other time-frame projections without that they affect each-other. If I use one projection for all time-frames I cannot replay

  • As far as I know are stream not heavy at all in ES, so it would be no problem to create the time-frame streams. The events themself are not duplicated anyway, and I could use the clean-up ES features like $maxCount & $ maxAge on the time-frame stream…

We do have a solution in NSB with a handler per time-frame projection. The results are stored in SQL. This works quite good., but thought I could create a more elegant solution with the ES projection. And let ES take care of the clean-up of our streams etc.

But storing all the averages in a does not really look cleaner as our current implementation imho.

Actually I don’t really need the time-frame streams. I created them because I needed multiple states (time-frame eg: each day) per stream (eg meter) per projection (time-frame definition). But the only possibility is to have a state per stream per projection (time-frame definition).

If there would be a way to have multiple states per stream per projection my problem would be solved i guess :slight_smile:

Thanks for thinking with me!

I don’t really care that my streams / events are growing. I care about the fact the my state is growing. This would happen if I store all my average values in one single state as you suggested.
Do you think the growing state will be an issue in ES? Or is it designed to work like this?

Tim,

I definitely miss something as I don’t see where growing state come from. I suppose to have a stream per aggregation time-frame (stream for hours, days, weeks etc). Then every hour (ore week) you write and aggregate event to the appropriate stream.

So for instance, the projection building per-hour aggregates just accumulates total in the projection state with each event processed and emits new event into the per-hours stream when new hour starts. The projection state size will be just {total, current hour, and number of events processed}.

If you run this projection in “continuous” mode it will only write checkpoint events from time to time and events you emit into the per-hour stream. (the same applies to foreachStream() projection if you have multiple meters).

-yuriy

Ok, my mistake, the growing state was not your idea…

Good idea, that would bring me closer! I just ignore the events not applicable for the time-frame-projection, store the intermediate values in the state, and I emit the average result into an aggregation stream when a time-frame is completed (per time-frame type per meter). This prevents that I
have to create a stream per time-frame per time-frame type per meter. I
try this tonight.

For now I see (maybe) some problems (or differences) with this solution:

You will be able to use emit and linkTo in continuous mode soon. Currently ‘persistent’ mode is required. The only difference is that in ‘persistent’ mode the projection state is persisted on any change.