public static void Run()
{
int numConnections = 10;
int numBatches = 10000;
int numEventsPerBatch = 10;
int numEvents = numBatches * numEventsPerBatch;
Console.WriteLine("Connections: " + numConnections + ", Events per connection: " + numEvents);
Console.WriteLine("Event batches: " + numBatches + ", events per batch: " + numEventsPerBatch);
Console.WriteLine("Total events: " + (numConnections * numEvents));
IEventStoreConnection[] connections = new IEventStoreConnection[numConnections];
for (int i = 0; i < numConnections; i++)
{
connections[i] = EventStoreConnection.Create(/*settings,*/ new IPEndPoint(IPAddress.Loopback, 1113));
connections[i].ConnectAsync().Wait();
}
List<EventData[]> eventBatches = createBatches(numBatches, numEventsPerBatch);
Stopwatch s = new Stopwatch();
s.Start();
Parallel.For(0, numConnections,
i =>
{
appendBatches(connections[i], "test-stream-" + i.ToString(), eventBatches);
});
s.Stop();
Console.WriteLine("Elapsed time: " + s.ElapsedMilliseconds + " ms");
Console.WriteLine("Rate: " + ((numEvents * numConnections) / s.Elapsed.TotalSeconds) + " msg/sec");
Console.ReadLine();
}
private static List<EventData[]> createBatches(int numBatches, int eventsPerBatch)
{
List<EventData[]> eventBatches = new List<EventData[]>(numBatches);
for (int i = 0; i < numBatches; i++)
{
EventData[] batch = new EventData[eventsPerBatch];
for (int j = 0; j < eventsPerBatch; j++)
{
int eventNum = (i * eventsPerBatch) + j;
batch[j] = new EventData(Guid.NewGuid(), eventNum.ToString(), false, Encoding.UTF8.GetBytes(eventNum.ToString()), null);
}
eventBatches.Add(batch);
}
return eventBatches;
}
private static void appendBatches(IEventStoreConnection connection, string streamName, List<EventData[]> eventBatches)
{
List<Task> tasks = new List<Task>(eventBatches.Count);
for (int i = 0; i < eventBatches.Count; i++)
{
tasks.Add(connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventBatches[i]));
}
Task.WaitAll(tasks.ToArray());
}
}