Projections/client api

Hi,

I’m trying to rework my denormalizers so that they subscribe to the appropriate projections in the EventStore directly without there being a queue in the middle, which is how I was providing event notifications before trying out the EventStore. I have three issues/questions though:

  1. Is this even a correct usage scenario for projections? Not entirely sure on this.
  2. The RecordedEvent passed to my subscription handler doesn’t seem to contain the event data. The Data property seems to just contain the bytes of the stream id.
  3. Say my denormalizers go down and miss x events. When they come back up what is the recommended approach to recovering the missed events?
    Thanks a lot

Thomas

I’m a bit confused by the post could you go into more detail about how you are using projections for your denormalizers? Projections themselves can be denormalizers.

If I had existing denormalizers I would use the stream $all to read events into them. You can remember the last event you processed if you go down and start reading from there forward (after subscribing) once you get an event read paging through ones you missed that is in your queue from subscribe just process the subscribed events.

Cheers,

Greg

So at the moment I have a denormalizer in a separate process to the core (‘write side’) domain, which is what is pushing events into the store. The denormalizer is listening to events in the store by subscribing to a projection (a basic projection that is only linking to a named stream…I’m just starting to experiment with projections in the EventStore). When it receives events it routes them through to appropriate handlers which write them to a database.

Maybe I don’t need to use projections here and I could just subscribe to $all from the denormalizer.

So before I was using the EventStore, the write-side was pushing events out onto an ActiveMQ broker, and the denormalizer was picking up events from there. I’m trying to cut that part out and get the denormalizer to receive events directly from the store.

Cheers

Hi Thomas,

Are you using the latest version? It sounds like the problem you describe in point 2 is that you’re getting link events returned, whose data consist of the event number and stream ID of the original event.

The version currently on the dev branch instead uses EventLinkPair types for most of the messages

Cheers,

James

Hi James,

Thanks for the reply. I built the latest dev branch yesterday, but I think I was just confusing events like ‘created’ for my actual events. I forgot that for a stream create I would receive a create event.

Regarding a denormalizer recovering missed messages when it comes back online, I understand that I can catch up using ReadAllEventsForward and passing the last known position. How do I get that position when a new event is received though? I can’t see any property on RecordedEvent that tells me the position, unless I’m missing something about the workflow…

Thanks,

Thomas

I’ve been working around this issue today as well.

As far as i can see you are right that events received over a subscription don’t contain the necessary data to create a position for use with ReadAllEventsForward, so it isn’t very useful for catching up after downtime.

Additionally you cant treat $all like a normal stream, (you get a streamnotfound exception from ReadEventStreamForward)

What i ended up doing was creating a projection that is essentially the same as $all -

fromAll().whenAny(
 function(state, event) {
 linkTo('temp-all', event);
 return state;
 }
);

My subscription code then subscribes to just this stream, rather than using SubscribeToAll... For recording progress I just increment a counter as events are dealt with.

The catch up logic just uses ReadEventStreamForward on temp-all, using the counter from the database as the starting point.

The final issue you will notice if you try this approach is that subscriptions to temp-all returns link events, rather than your actual events, which is no use for passing to denormalizers. 

At the moment i am resolving this client side by performing another round trip to the event store to get the real event, something like:

if (recordedEvent.EventType.Equals("$>"))

recordedEvent = ResolveLinkEvent(recordedEvent);

private RecordedEvent ResolveLinkEvent(RecordedEvent recordedEvent)

{

   var decodedData = Encoding.UTF8.GetString(recordedEvent.Data);

   var elements = decodedData.Split(new[] {'@'});

   int eventNumber = elements[0];
    string streamId = elements[1];

   var slice = _eventStoreConnection.ReadEventStreamForward(streamId, eventNumber, 1);

   return slice.Events[0];

}

This is a bit crap and inefficient but the actual denormalizers are probably the bottleneck anyway.

Will probably have something on github when this is actually in a state that might be useful for others, but if you are curious my event processor code so far in this gist: 

[https://gist.github.com/3990336](https://gist.github.com/3990336)

Cheers,

Rich

PS.

Hope this is readable now!

Google keeps ruining my formatting and running text off the side of the screen!

Yes this is a way to get your own $all but its very innefficient
(though one nice thing is that it would work in a clustered
environment)

We are adding the positions onto subscription and looking at pushing
events for links now.

Cheers,

Greg

Thanks for the reply, Rich. I’ll take a look at your code.

Greg, do you think that work will be done in the next month or so?

Cheers,

Thomas

Oh some of it is already pushed to dev. :slight_smile: I am thinking more by
monday or tuesday.

Thanks Greg,

I realise that recreating $all isn’t the best idea…
However, we probably know in advance the subset of event types that a particular denormalizer will be interested in, so my thinking was to have a projection create a stream of just those, then run the denormalizer pointed at that rather than using $all and filtering client-side.
I guess the efficiency of each approach depends on the ratio of those event types to overall events in the system… or is $all just so much more efficient that it would win in most cases?

@Rich the middle road is

fromAll().whenAny(function(s,e) { linkTo(e.Type, e); });

then you have a stream per event type. You would just have to
implement in your projection client a join operation similar to
fromStream(x,y,z).

Cheers,

Greg

Rich,

and if you need a subset of event types with events appearing in the same relative order as they appear in original streams, you can create a projection like:

function link(s, e) { linkTo(“your-stream”, e); }

fromAll().when({

eventType1: link,

eventType2: link,

eventType3: link,

});

then you can read “your-stream” with links to all filtered events in the order as they appeared to the projection.

-yuriy

Indexing Projections are fun ... and powerful :slight_smile:

@yuriy
Yes, this is the approach I was imagining using.

@Greg

Perhaps I am being braindead and missing something (no coffee yet this
morning!)... But with stream per event type we have no way of ordering
events correctly in the client if you did that?

As I said the ordering would be equivalent to what is being done in
fromStream(x,y,z). On a single node you can give perfect ordering.
When distributed you cannot.

Creating a stream per projection is one way of doing things but can
cause more work to need to be done in terms of releasing software.

Cheers,

Greg