Problem with ReadAllEventsForwardOperation

Hey,

I’m wondering if somebody could help me out with a problem I’m a bit stuck with.

I’m trying to replay the entire event stream for an instance of GES that I have running on Mono, on an AWS EC2 instance.

I do so using the following code. The Connection here is initialised using the admin user credentials.

public IEnumerable GetEventsFrom(DateTime from)

{

var slice = Connection.ReadAllEventsForward(Position.Start, int.MaxValue, false);

return slice.Events.Where(e => !e.OriginalStreamId.StartsWith("$")).Select(e => GetEvent(e.Event)).Where(e => e.TimeStamp >= from);

}

I’m omitting GetEvent, as that just does a conversion from the GES event to the event type used in our domain.

Btw, I’m also wondering, is there a better way of reading the whole stream? Reading from the start brings back events that don’t apply to our actual domain (I guess used by GES for some internal house keeping or something?), which is why I’m using the little dollar based exclusion there.

Anyway, this approach works just fine when I’m running in my dev environment with the SingleNode exe running.

However, when I try it when connecting to the EC2 instance, I get a RetriesLimitReachedException:

Item Operation ReadAllEventsForwardOperation ({660595dd-7f10-4fb6-b08e-3cce8b6418e5}): Position: 0/0, MaxCount: 2147483647, ResolveLinkTos: False, RequireMaster: True, retry count: 10, created: 10:59:15.818, last updated: 10:59:55.190 reached retries limit : 10

Note that it is definitely not an issue with opening the 1113 port for the EC2 security group, as that’s open, and ReadStreamEventsForward seems to be fine.

Any ideas on this one?

Cheers,

Chris

How many events are in the system. You are trying to read them all in one operation. This may not be the best way to read.

To read from a single stream use he stream version of the operation (takes stream name). Read fromall will read all events in your system.

Cheers,

Greg

Hi Greg,

This is only a test bed environment, so there are only 100 or so events.

I don’t quite understand how I’m supposed to provide a specific stream name in this context, as I’m not restoring a particular aggregate. I want to go through all the events that are in the system to use them to regenerate our read model.

Hope that makes sense.

Cheers,

Chris

If do you want to filter internal events server side we have used a projection like this:

fromAll().when({
$any: function(s,e) {
if(e.eventType[0] !== ‘$’)
linkTo(“non-system”, e);
}
});

``

Then read from the non-system stream rather than all.

Turning down stats frequency can also help reduce the amount of system events.

But as Greg said, reading to int.maxvalue in one call probably not be the best idea, try pages of 1024?

Ah! In that case use an eventstorecatchupsubscription. It will handle switching from reading chunks to live subscriptions (pushes) for you. It gets returned from the subscription operations on the connection. You handle its checkpoint (last place you have processed) it handles everything else for you including handling reconnes and node switches in ha version.

I’m on my iPad now but when I get to a regular machine I can point you to a sample of how to use it.

Cheers,

Greg

Try enabling Verbose logging on the connection, This was the only way I was able to spot that I exceeded TCP size limits. (not very likely with 100 events, my batchsize was originally 2000, but I guess it depends on how big they are)

What do people think about putting a limit on this of say 20000 events. There is not really a situation you would actually want to do 2b event reads in one operation.

Yep that was where. Was heading, an error might help here. ResultsToBigException maybe

Yeah, sorry for the confusion here, I have no interest in reading 2 billion events in one read. I will change the read to page the events to 1024, as was suggested. That’s perfectly reasonable.

This code was really just to prove the concept and try to understand the error, why it was working in the dev environment and not with the remote connection.

Sorry, but in terms of the error all that was available was what I posted. The stacktrace on it was null.

The catchup subscription will handle all the paging for you (and move your external projections onto a live subscription after). I would not recommend rewriting this code as there are bunches of race conditions with connection drops/server failovers/slow projections/etc.

Cheers,

Greg

Thanks Greg. I seen an entry on the blog about this.

It looks nice and easy, however, one thing (at least based on the blog entry), is that I couldn’t see a way to determine how you had reached the end of the stream. This event store interface is used in a synchronous context that needs to be able to progress once the replaying has finished.

Any tips on how I can determine that I’m at the end of the stream?

Cheers,

Chris

What does it need to do afterwards? There is a callback for when it switches from reading to subscription if that’s any use. If you just want to read, page through until IsEndOfStream = true

In this particular case, it’s being used when we are provisioning a new tenant for our application.

Creating a new tenant involves running a set of ‘migrations’, which are basically just commands that provision every tenant with the same default data. These commands go through normal command handlers/domain logic, to produce event streams populated with default data.

We then generate a read model and search index for that tenant by replaying the event store we just created. After that process, there’s just a simple piece of book keeping to provision the tenant. It’s easier to have everything be synchronous because we want to indicate any failures in the process, as this provisioning tool is going to be used by non-devs.

So, James, can I just be clear, are you saying do it the normal way? It is just reading, in the sense that no writes will occur for that tenant during this process. However, when in production, there may be writes occurring for other tenants that are already live, because we’re going to use the same store for every tenant; we’ll achieve the multi tenancy by prefixing each stream name with the tenant name. To be honest, regarding the callback you mentioned there, I’m not quite sure how I would apply/use that to determine the end.

Hope this all makes sense.

Cheers,

Chris