Hi!
I want to write a timer service that should be able to receive messages with an arbitrary timeout from other services. If the message times out, the timer service should return the message to the original sender. In the meantime the messages should be persisted in Event Store. Does anybody have an idea how I can efficiently determine, which messages have timed out? If it were only a couple of 1000 messages I would just iterate over every message and compare timestamps, but in our system, there could be a couple of million messages over the course of time.
The second part of my question is, if it is possible to delete specific messages from a stream (i guess no, because of the implications for caching?). That would be useful in my scenario because it would enable me to cleanup already timed out messages. I can not use the $maxAge metadata setting of the stream here, because every message could have a different timeout.
Thanks in advance!
Sebastian
Hey
I was trying to solve exactly same problem in order to implement ES-backed NServiceBus timeout store. The only assumption is that each timeout message belongs to exactly one saga and that timeouts can be cancelled (deleted) only in bulk (all for particular saga instance). Here’s a draft of the solution (not fully implemented yet):
-
Stream-per-saga-instance
-
Indexed using a projection that groups timeouts by theie due time with configurable resolution (default to 1 minute)
fromCategory(‘Timeout’)
.when({
$init: function () {
return { lockedTo : null }; // initial state
},
Timeout: function(s, e) {
var date = new Date(Date.parse(e.data.time));
if (s.lockedTo !== null && date < s.lockedTo) {
date = s.lockedTo
}
var stream = “TimeoutIndex-”+date.getFullYear()+""+date.getMonth()+""+date.getDate()+""+date.getHours()+""+date.getMinutes();
linkTo(stream, e);
return s;
},
TimeoutStop: function(s, e) {
var lockedStream = e.data.lockedStream;
linkTo(lockedStream, e);
s.lockedTo = new Date(e.data.lockedTo);
return s;
}
})
-
The client reads whole TimeoutIndex- streams every time quantum (default 1 minute) and queues them for execution during this quantum period.
-
The client emits special TimeoutStop event that gets appended to the corresponding TimeoutIndex stream and marks this period as locked
-
The client keeps reading current TimeoutIndex-stream until he gets the TimeoutStop event ensuring that all timeouts from this period has been loaded
Hope that helps,
Szymon
We do something similar with persistent producer managed subscriptions. There is a stream metadata property called tb which is for truncate before. This allows the truncation of all events prior to x. You would then keep track of the oldest message in your stream and allow deletion a of all before. The possible drawback with this is if you do not have uniform timings eg I could say two years and your stream would get stuck. This can be handled by partitioning to more than one stream in a relatively easy way
There are some other options worth looking at as well including using many streams as opposed to one.
That solution looks very elegant to me, thank you!
I already considered that option, but as you said due to non uniform timings the worst case scenario could be 5 active timeout messages and 5 million “zombie” timeout messages
There is another trick that can be used here to lower the cost. Put every time into its own stream. Use a linkto to aggregate them into a single large stream. Then when you are handling the timer delete the original stream. It will magically “disappear” from the stream with the linkto (though the link will still be there). This will not work great if partitioned though, it can be a useful trick. The scavenger in single node version will even try to resolve linktos and if they don’t resolve automatically remove the link
Hi Greg!
With partitioned you mean clustered, or do you mean when there is a network partition? What will happen in this scenario? Are there other differences between the two versions?
Thanks!
When clustered (partitioned across many replica sets) for obvious reasons we can resolve linktos in scavenge in this case as it’s a network hit per link as opposed to an internal index hit
I thought Event Store replicates all the data to each member of the cluster, so that every lookup is a local lookup?
Not for blaze (coming) in blaze there will be many replica sets (think 50tb+ data sets). On current version you will be fine with discussed trick but it won’t work on blaze
I see, thanks for your help and the explanations!