Throttling Projection

Hi!

Is it possible to configure EventStore to run projection at certain speed limit? I have issues with running projection too fast hence it was spamming a lot of Snapshots to EventStore, which was then unable to be saved either because the snapshot is too large or it couldn’t keep up with how many writes / seconds.

Perhaps you can provide more information on what you are doing?

Hi Greg! Thanks for the swift response.

I am using the process manager implementation from https://github.com/pawelkaczor/akka-ddd.

It works by creating a projection and tying that projection to the process manager. The process manager tries to ensure no duplication is happening by saving the state in a snapshot.

Given that i already have hundreds of thousands of events being tied to the projection, the projection causes the process manager to run and sending commands to our aggregate roots (which is expected), but then given how many events it has, it will continually send snapshot to EventStore. This is where it fails on my app since there were too many events being sent to EventStore, and i suspect EventStore couldn’t keep up, then it all fails, and causes an OOM.

Can you send over a log? Also what kind of setup is this (mem/disk/etc)

Also from looking it seems to be using the $by_category projection, is
there some other projection you are using?

The OOM is at my backend application, the EventStore is actually doing ok.

This is one of the projections that i have :

fromStreams(['$ce-Product', '$ce-User', '$ce-Invoice', '$ce-Shipment']).
when({
    'co.styletheory.context.userAndAccount.domain.contracts.user.AccountCreated' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.UserRegistered' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.SubscriptionCreated' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.UserCharged' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.invoice.domain.contracts.invoice.ChargeSucceeded' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.invoice.domain.contracts.invoice.InvoiceCreatedAndChargeSucceeded' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductBooked': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductBookedForSwap': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductUnbooked': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductUnbookedWithStatusRemains': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductBookedV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductBookedForSwapV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductUnbookedV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductUnbookedWithStatusRemainsV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.shipping.domain.contracts.shipping.ShipmentStatusUpdated': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.shipping.domain.contracts.shipping.ShippingCreated': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.shipping.domain.contracts.shipping.ShippingCreatedV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.BoxStatusUpdated': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.BoxStatusUpdatedV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.BoxStatusUpdatedV3': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductReviewed': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductReviewedV2': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductReviewedV3': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductIsAlreadyUnbooked': function(s,e) {
        linkTo('users', e);
    }
});

Running at continous mode and emit enabled. I also have all projection running (all 4 of them, $by_category, $stream, $stream_by_category, $by_event_type).
The setup is just an AWS EC2 t2.medium with an Elastic Block Store using magnetic provisioning set on 5000 IOPS.

It sounds like your app is not getting the throughput it wants and
just buffering in memory then crashing.

But again what is it you are trying to achieve here? This projection
as example could be written far more simply (and likely other dont
need to be running).

Yes and and i assumed that was because the EventStore read was too fast.

I want to throttle it so reading is not too fast, and saving snapshots happens in a more acceptable interval.

So a bit of a background.

My company is doing fashion renting, and i am trying to create a feature where when a certain product is unbooked, i can notify other people who are putting that product in their wishlist.

The process manager is to coordinate sending the events to either the Product AR or the User AR.

First I would fix the buffering issue.

Second which snapshots are you referring to the frameworks or
projections? There is no state in any of the projections I see here so
projections would only be checkpoint not snapshotting state. If the
framework's I would imagine it is configurable? In general what you
are seeing here is that each projection is writing (the reads should
be cheap or free) as they are all writing linkTos. $streams is very
cheap but bytype/bycategory/your projection are quite expensive
projections (producing linkTos).

What is your current writes/second measured from your app and measured
from looking at storage writer queue messages/second in the admin UI
or reading statistics over http (where they come from).

Hi,
as you are using akka-ddd (and underlying akka-persistence eventstore journal) , there is no problem with too fast event producers (projections). The akka-ddd Receptor will stop fetching subsequent events from the eventstore until the number of events buffered by the Receptor goes below its capacity, which is configurable. Simply speaking, akka-ddd Receptor supports back-pressure.

Pawel

It is the Snapshots inside the framework that i was referring to.

But according to Pawel’s reply, then my issue might be something else besides of what i thought as the producer being too fast.

I thank you guys very much for the responses!