Understanding How a Projection Partitions State per Entity

I am completely new to ES, so apologise in advance for the stupid question. I’m trying to use projections in JS to process different event types from a single stream to build up state per policy_id and write to a new stream when a certain event is encountered (insurance_policy.bound).

The source stream has the following event types per policy_id:

  • insurance_policy.created
  • Many insurance_policy.rate_added
  • insurance_policy.bound

My code is below and I am wondering about the following:

  • How does the projection know to partition the state per policy_id
  • I am wondering do I need to do “Reset state after flushing to new stream” part? If I remove s.rates = [] I get rates being mixed up in the emitted event from different policy rate_added events.
fromStream('policies')
.when({
    $init:function(){
        return {
            transaction_type: null,
            policy_id: null,
            agency_data: {},
            rates: []
        }  
    },
    "insurance_policy.created": function(s, ev){
        s.policy_id = ev.data.insurance_policy_id
    },
    "insurance_policy.rate_added": function(s, ev) {
        s.rates.push({
            policy_rate_id : ev.data.insurance_policy_rate_id,
            key : ev.data.key
        })
    },
    "insurance_policy.bound": function(s, ev) {
        s.transaction_type = 'bound'
        emit(
            'analytics.policy_transactions',
            'policy_bound',
            s
        )

        // Reset state after flushing to new stream
        s.transaction_type = null
        s.policy_id = null
        s.agency_data = {}
        s.rates = []        
    }
})

Thank you for your help.
Jag

Jag,

If you want to partitions per Policy, I think you would need to do this:

fromCategory(policies)
.foreachStream()

That would (concurrently) process each of your policies.
This means you would have state per policy (assuming that’s what you need)

You can then emit you new event.

Currently the single stream I am reading from has events for all policies in it. Is there a way keep this and still have state per policy ID in the projection, or do I need to split the source stream into different ones like so?
policies-1
policies-2
policies-3

Disclaimer: I maybe thinking about this the wrong way :slight_smile:

Thanks!

Jag,

You could create a new projection which would manage building the polciy streams.
So something like:

fromStreams(‘policies’)
.when({
$any: function (s, e) {

        if (e === null || e.data === null || e.data.IsJson === false)
            return;

	var streamname = "policy-" + e.data.policyId.replace(/-/gi, "");
	
        //linkTo or emit to using e and streamname
    }
});

You could then use another proejction (using the foreach example) for partitions.

Just a side note though, if you are putting all policies into one stream, is there not a danger this stream will become too big?
Do you have specific domain rule which is forcing you to have 1 stream?

Without knowing anything about your domain, I would have thought 1 stream per policy, and if u need to check across all policies for a domain rule, then handle that in a different manner.

That worked Steven! Thanks!

1 Like