making flow synchronous for bulk data import

Hi, I’ve read through the posts on this subject and looked at the source code, but async stuff is kind of hard to wrap my head around.
I am doing a bulk import of data from sql to eventstore. I’m grabbing a bunch of records, creating commands, running them through my aggregates to create the events and saving the aggregates.

The (current) problem is that I get the aggregate add some events and try to save but the previous cmd has just processed so I’m getting a concurrency (version) error.

My repository save is doing

await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, newEvents);

and my get is doing

currentSlice = await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, sliceStart, sliceCount, false);

I have tried making the save sync by doing

_eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, newEvents).Wait();

but this hangs.

I haven’t been able to figure out how to make the get sync.

I’m not even sure if having the save sync would address my issue. I suspect I could put a thread.sleep in there but I’m processing some 400,000 records which would push the process into next year.

it would be great if I could wrap a set of actions somehow and make sure they all complete before processing the next set, but that seems like it would be even more difficult than making the save sync.

So I guess my question is, a) how in the world do make the processes synchronous for the purposes of import, and b) is this even the right way to address my problem.

Thanks for the help,

R

Any reason you can’t hold the all the aggregates in memory and flush when you are done? Also can we see more of your import code? My guess is you are calling async code from sync code using fire and forget (e.g. Task.Run)

Actually that’s a very good idea, building them up in memory then saving them. I’m trying to keep to use the regular workflow as much as possible so I don’t introduce different results but I’ve already had too make several concessions.
as for the code I’m using, basically I create my command then pass it to

_workflows.Where(y => y.Handles.ContainsKey(cmd.GetType())).ForEachItem(x =>

{

var actionBlock = x.ReturnActionBlock();

actionBlock.Post(cmd);

});

which finds the correct command handler and gives it the command. the command then gets the agg from the repo processes the cmd ( which currently just translates to an event ) and creates the event(s). Then hand back to the repo for save. The repos is largely what has been recommended on here, a port I think of a repo from NEventStore.

The ActionBlock.Post(cmd) is the synchronous method ( as apposed to ActionBlock.Async(cmd) ). But I want these cmds to be run syncronously. It doesn’t seem to help though.

Anyway, if I’m doing something wrong, I’d rather fix that then re-write it my data import stuff to build up in memory. However, I do think I should have just done that in the beginning.

Thanks, for your help,

R

If you want sync code to call async code (without blocking and thus negating the entire point) the fire and forget is inevitable. To get around this you need some sort of queuing mechanism. See https://github.com/damianh/Cedar/blob/master/src/Cedar.GetEventStore/Handlers/ResolvedEventDispatcher.cs#L196 which was inspired by (stolen from) https://github.com/EventStore/EventStore/blob/dev/src/EventStore.ClientAPI/Core/SimpleQueuedHandler.cs

Thank you,
However, if possible, I would kinda like sync code to call async code AND block. In this particular process. a) the eventstore code is firing async, I’m not asking it to and then trying to block, I don’t have the choice, and b) In this case ( which is different than a normal application workflow ) I want stuff to block because I’m firing so fast that it’s stepping on it’s self.

Does that make sense. bare in mind that you have given me a perfectly viable solution ( build up in memory ) which I thank you for. But I would also like to understand this stuff better, if you have time.

Thanks a lot,

R

I posted a similar question in the DDD Yahoo group page. My use case is also a massive data operation, and how that is done via DDD.

I think you may have to choose a batch operation which persists all the events in a single operation.