Implementing CommonDomain Repository using Event Store - Sample Code

Hi All,

I’ve updated the sample implementation of the CommonDomain IRepository interface to reflect the latest Client API changes. It’s available at

Thanks,

James

awesome very useful

Hey James

Thanks for posting the code. It has been extremely helpful. I have created a gist based on your code that provides an IAggregateRoot, AggregateRoot, IRepository. Hope someone finds these useful. Please note that they don’t match James’s exactly ( just 1 or 2 method name differences). Any feedback is welcome.

https://gist.github.com/4658759

Thanks

Phil

Hi Phil,

They look reasonable although I personally prefer not having an “IEvent” interface - the original code the sample was written against is CommonDomain (https://github.com/joliver/CommonDomain) if you’re interested!

Cheers,

James

The StreamEventsSlice is used to split up the process of reading into several slices as to not strain either the event store/client from a memory/network utilization perspective. Great!

Here’s my question: Is the LastEventNumber on the first slice stable for the entire read operation? If I read the first slice, and then continue reading slices, there is a window of opportunity that the LastEventNumber will have changed on a more recently read slice than the one that was read on the first slice, right? Presuming we allow writing to the same stream while it’s being read. I’m fully aware that as soon as you’re done reading a stream, somebody/thing else might update it.

Yes, of course, LastEventNumber is just the number of the last event in stream AT THE BEGINNING of read operation. If someone is simultaneously writing to the same stream, that number could actually change even during the same read operation, as read and write sides are completely independent. It won’t ever decrease, of course.

I would be interested too know why you ask about this? Do you depend somehow on LastEventNumber to not change or change is some particular way?

Well, the scenario I had in mind was wrapping the sliced read process into an enumerable that got injected into a little structure I created: EventStream { Id:Guid, LastEventNumber:Int32, Events:IEnumerable<Object> } where events would point to a custom enumerable that does the sliced reading (starting with the first slice and then using the connection and nexteventnumber to read the next slice, and so on) and LastEventNumber would contain, well, the last event number from the first slice (which could be different from subsequent slices being read). The reason for doing so would be that this custom enumerable does the upcasting and deserialization of events as well as the sliced reads, making batch reading and full reading seamless. Next to that it's also easy to stub the eventstream (structure above).

I have some options:
1. Only use the LastEventNumber from the last read slice (the one that says we're end of stream). It's still a snapshot of a slice of the stream, and as soon as we got it (or even in the process of getting it) the stream might be appended to. But such is life, so let's embrace it (or ensure only a single writer ergo processor in front of the stream append actions ... we're digressing). This makes my design a bit weird and inappropriate since LastEventNumber would only be known after the enumerable has been consumed (read all the slices).
2. Adopt a "who cares" attitude:
2.a pick the number from the first slice and only keep on reading until the last event number mentioned in this first slice. Willingly accepting failure from an optimistic concurrency perspective, and hope it doesn't happen too often.
2.b. Pick the number from the first slice but keeping on reading until the end of the stream, potentially reading beyond that last event number due to an altered stream since reading the first slice. Still suffering the optimistic concurrency woes because I can not provide the proper expected version.
3. Provide a termination event that contains the last event number (from the perspective of sliced reading at least) in this enumerable. Not clear yet how I would make this work.
4. Drop the abstraction :wink: My intent is not to abstract the event store away. I just want to compose a stable Tuple of LastEventNumber and deserialized events.
5. Don't stream (in my code). Reading all events into a big array, set the last eventnumber and be done with it. Let's hope we won't hogg memory this way for big streams :slight_smile: Would be a shame to have to drop the streaming nature of things.

What's the probability of this 'LastEventNumber not right' happening? Well, depends a lot on whether I allow concurrent writes or not, doesn't it. As an aside, what happens to my reads when a stream is deleted mid sliced reading? I assume the next read slice will reflect this in its state, right? Highly unlikely of happening, but worth contemplating nonetheless.

Regards, Yves.

Quick update: I’ll probably walk away from this design due to the problems mentioned. Joining the hydration and having access to the slice that has its IsEndOfStream set to true (thus also the LastEventNumber) makes all these problems go away :slight_smile:

Yeah, making LastEventNumber static is not best abstraction :slight_smile:

As for LastEventNumber changing during the read operation – probability is quite low, as read operations are very fast, though if you write very intensively while also making reads from the same stream, that can certainly happen.

As for deleting the stream mid-reading it. Before read operation from stream happens, I check whether stream exists, whether it was deleted, etc. But if it happens that this check passes and I start to actually read events and in the middle of that operation delete happens – nothing bad happens, I’ll return the same events as if the stream was not deleted. On next read operation you certainly will receive StreamDeletedException, of course.

Next read operation as in reading the next slice, I presume?

yes, that’s what I meant :slight_smile:

Reopening this conversation as to validate my “assumptions”.

Continuing where we left off: It’s not exactly an exception, is it now? It’s more like the status of the streameventsslice would be deleted, right?! Can I keep reading nonetheless (i.e. despite the stream being deleted) or is that not an option? I presume not, right?!

  1. Read a slice => Status == Success

  2. Read next slice => Status == Success

  3. Read next slice => Status == Deleted (I presume this slice won’t even contain any events, right)

  4. No use reading the next slice as everything about that stream is gone from the perspective of the client.

Ergo, if I was to use this to rehydrate an aggregate root entity (in DDD parlance), I would rather throw an AggregateNotFound/DeletedException than hand back a partially hydrated aggregate root entity. Make sense?

A couple of side questions

a) StreamEventsSlice.LastEventNumber: Is it the last event number in the stream or in the slice I just read?

b) As I read each slice and assuming the stream I’m reading is being written to as I read, is it safe to assume that LastEventNumber might change (to a higher number) as I read the next slices. I understand that the chance of this happening is pretty slim, but want to understand what is going on nonetheless.

Answers inline:

Continuing where we left off: It's not exactly an exception, is it now? It's more like the status of the streameventsslice would be deleted, right?! Can I keep reading nonetheless (i.e. despite the stream being deleted) or is that not an option? I presume not, right?!

Read operations are independent of each other, so if a stream is deleted, the next read operation on it will give you a status of StreamDeleted on the slice. You can't continue to read it at this point (over TCP at least). Bear in mind stream deletion is supposed to be exceptionally uncommon (development only really) as it breaks HTTP caching and we can't guarantee that intermediaries don't still have the data and may respond.

1. Read a slice => Status == Success
2. Read next slice => Status == Success
3. Read next slice => Status == Deleted (I presume this slice won't even contain any events, right)
4. No use reading the next slice as everything about that stream is gone from the perspective of the client.

Ergo, if I was to use this to rehydrate an aggregate root entity (in DDD parlance), I would rather throw an AggregateNotFound/DeletedException than hand back a partially hydrated aggregate root entity. Make sense?

Yes, if you encounter a deleted stream half way through reading it in this context, you'd probably want to throw an aggregate deleted or not found exception.

A couple of side questions
a) StreamEventsSlice.LastEventNumber: Is it the last event number in the stream or in the slice I just read?

LastEventNumber is the number in the stream.

b) As I read each slice and assuming the stream I'm reading is being written to as I read, is it safe to assume that LastEventNumber might change (to a higher number) as I read the next slices. I understand that the chance of this happening is pretty slim, but want to understand what is going on nonetheless.

Yes, it can increase. You can actually see this happen relatively easily - change --stats-period-sec down to around 2 seconds, and attach a debugger to a program reading (say) 10 events at a time, breaking after each slice - if you wait more than the stats period between reads, you can see LastEventNumber increase on the next slice.

Cheers,

James

Hi, James,

I’m in the process of creating a SlicedEventReader in AggregateSource to reduce the duplication in the various repository implementations and to allow for a degree of composability (read, you don’t have to rewrite eventstream reading behavior from scratch) when you want to do custom object inheritance using repositories. The current eventstore integration implementation does not take into account the answers you’ve just given me. I’ll get right on it.

Again, thanks for clearing up the ambiguity or should I say assumption validation. I owe you a lot lately :slight_smile:

Cheers,

Yves.

NEventStore repository implementation has aggregate snapshots concept, so to get aggregate with snapshot you read snapshot, deserialize it then apply events from snapshot to now. It is extremely helpful to decrease loading time for some fat aggregate.
Why this concept was not implemented into GetEventStoreRepository? How can I improve loading performance?

It can be done quite easily. Just put your snapshot into a named stream say I have an aggregate

/streams/foo

I could make a snapshot in

/streams/foo-domain-snapshot

Then just read last event out of snapshot and play events forward. Its not there by default because there are many ways of doing it and its less than 30 minutes of time to implement. It is also quite common if running in a single server to just use an identity map as opposed to snapshots.

The snapshotting in neventstore however is horrifically broken. It only supports a single snapshot. You may have many views of a given aggregate. Say multiple versions running side by side as an example or completely different views of the same event streams.

In general though I would recommend staying away from snapshots if possible as they cause all the same versioning problems that you have with storing structure. When you release a new version that changes your snapshot structure all the snapshots must be discarded and running side by side id made more difficult. Most aggregates have no need for snapshotting, a snapshot as opposed to loading say 80 events is not worth it in most cases compared to the versioning problems you taken on in production. My rule of thumb is normally 500-1000 events on an aggregate before I would consider using a snapshot (this of course depends on latency requirements etc as well)

Cheers,

Greg

+1 to have this feature by default in GetEventStoreRepository so transition from NEventStore to GetEventStore will be easier.

It is also quite common if running in a single server to just use an identity map as opposed to snapshots
Good idea, but what about concurrency? I guess in this case we should queue specific aggregate changes to prevent parallel changes for same aggregate.
When you release a new version that changes your snapshot structure all the snapshots must be discarded and running side by side id made more difficult
For us it is not a problem to rebuild all fat aggregates shapshots after each release.
Thanks Greg, your 30 minutes = mine 3 days :slight_smile:

When you release a new version that changes your snapshot structure all the snapshots must be discarded and running side by side id made more difficult

For us it is not a problem to rebuild all fat aggregates shapshots after each release.
Thanks Greg, your 30 minutes = mine 3 days :slight_smile:

Put your snapshot in a stream with a naming convention. Change logic to check last event in stream and play forward if you get a snapshot. We are discussing ± 15-20 lines of code here :slight_smile:

How many events are you normally talking about needing to replay?

A bit old statistic the most fat aggregates:

  • 1st = 14 054 events
  • 2nd = 7 778 events
  • 3rd = 7 449 events
    normal aggregates have ~100 events to replay.

Without snapshot command was handled for 3sec (neventstore3, mssql), with snapshot = 100ms.

For those few (greater than a few thousand) I would consider using a snapshot. For normal aggregates I wouldn’t even consider it. In fact you will likely be SLOWER with snapshots for 100 events :slight_smile: The reason for this is that you need two round trips with snapshots (one to get the snapshot the second to get the events after). Are you saying that for 100events it was taking 3 seconds?