Dispatching events

Store the checksum transactionally with the handler that wrote it. Problem solved.

Beyond that you can relatively easily do deduping (process you mentioned but normally not perfect)

Greg.

Can you elaborate on “store transactionally with the handler”?

If a dispatcher dispatches the messages from the ES into RabbitMQ, how being transactional with my checkpoint helps me given that RabbitMQ is not enlisted in the tx? I still can die after I publish a message but before the tx is committed (or the other way around)

Or let’s take a simple process manager as a more common example: I dequeue a message from RabbitMQ and publish a number of commands back to RabbitMQ as a result of my “decision making” process. I can use a transactional storage (SQL, Raven) to store my internal state, but RabbitMQ does not support transactions anyway, so acking the message and publishing my N command I can die at any time, tx doesn’t help me there…

P.S. Can’t wait for your Process Managers class here in Australia, but I have a feeling that it’ll not going to happen anytime soon :wink:

Cheers,

Alexey.

Actually it may happen sooner than you think. Remember we are heading to Thailand after your wedding. Not a bad trip Thailand to aus

Don’t think rabbit mq think more simple (less abstracted). Rabbit is forced to be at least once.

Take an event store. I have events with a monotonic incrementing sequence. So I process. Write checoint. Repeat yes?

If the checkpoint and write are transactional my handler is transactional.

The question here is: in the EventStore how much the projection is behind the AllEvents stream in time? I mean how probable it is to publish an event #16, then read the projection that says that the last event seen was #15 so I go to the AllEvents stream and process #16 once more?

Another concern is that then every subscription should do exactly the same: publish an EventProcessedBySubscription(subscriptionId, eventId) event so the projection can easily be built.

Here is a time when the guidance is needed: do you think it would be a valid approach to take?

There is no projection for doing this in the event store it’s just basic checkpointing.

There are no special events being generated for projections either.

Every event in the event store has two numbers associated with it. The first is based off its position (and the position of it’s commit) in the log. These are based off logical not physical locations. The second is the stream incrementing sequence to read all events in stream foo you as a caller just remember the last one you saw (think polling) if you remember the last one yu saw transactionally with what you do, you are transactional. If you want to read all you use the first number for checkpointing.

Greg

So another question I had along these lines is how do you identify the handler for saving it’s stream position?

I was thinking that it would have to have a permanent identifier, either as an attribute or a constant member.

Perhaps you can name your endpoint somehow and use this name as a subscription identifier. In most of the cases it would be enough I believe.

The name can be a BC name, an exchange/queue name or something else but meaningful. So when you want to move your “billing” BC from one machine to another you still use the “billing” checkpoint.

I figured at first I’d just be able to keep one stream position for the whole bounded context since messages are idempotent, but then I thought of the case where I bring a new report denormalizer online. It would be nice to automatically catch up the new denormalizer on startup. I suppose the alternative is fine too: since it’s maintenance anyway, run a manual replay.

Why not a position per denormalizer? Let them float on their own.

Indeed. You’re likely to want each normalizer to have its own position. You can consider tracking yet another position across ALL streams for events handlers that have side effects so that events are not played back to these handlers again.

That’s what I was originally asking about. How do you identify them to save their position?

Depends how you create them. A class name + version would work fairly well.

Rather than having each denormalizer or BC objects (e.g. saga) listen to the event store directly, I was thinking more along the lines of there being a piece of infrastructure in between that would manage positions. So that way handlers were just simple classes that mark themselves as message handlers. Instead of putting the listening duties on a base class from which the handlers would inherit. So along those lines, I have to have a way to track which handlers have seen which message. Since not all of the handlers are denormalizers, and renaming within the BC is likely, I’m thinking I need a separate ID, like settings a GuidAttribute on the class. But I would love any alternative suggestions.

We have just such a thing and call it a “projectionist”. It is given a list of projectors (demoralizes) that it does just this work on behalf of. Right now it just makes up a name for them, but the writer of the projector can supply a function implementation that returns a more specific name.

You may want to be careful with a single thing responsible for managing positions.

To do things reliably you need to be storing your position in the same transaction as the work your denormalizer is doing.

The position doesn’t have to be stored transactionally if the messages are idempotent and the handlers assume at least once delivery.

I thought about an interface method which supplies an ID:

public class SomeHandler : IMessageHandler

{

// IMessageHandler implementation

public string Id { get { return “5C876942-798B-42F5-9A76-CF58273C3B32”; } }

}

The other alternative I was looking at was:

[MessageHandler(Id = “5C876942-798B-42F5-9A76-CF58273C3B32”)]

public class SomeHandler

{

}

I like the semantics of interfaces better, but it seems like it adds more noise inside the class.

In alot of cases (but admittedly not all), that means your denormalizer is writing something extra anyway to achieve idempotency … in which case you may as well just have it write the position instead?

I having trouble thinking of an example where the denormalizers need something extra for idempotency. (Not saying such an example doesn’t exist.) The usual denormalizers I deal with will always have the same result, no matter how many times a message is processed. E.g.:

UPDATE SomeTable SET SomeValue = @MessageValue WHERE SomeId = @MessageAggregateId;

I can see your point for workflows (aka sagas, process managers). In which case, yes you would have to save state transactionally with issuing a command or else the command could get issued again after a failure. That might not be a big deal, depending on the workflow. (I.e. CancelOrder issued twice, second one will likely fail, but first one has intended effect.) But as you said, transactionally storing the stream position with the operation would be harder for stream position manager than the workflow itself.

Is there something else I’m missing?