When I get all events using:
var settings = new CatchUpSubscriptionSettings(200, 1, true, false);
var catchup = _eventStoreClient.Connection.SubscribeToAllFrom(
null,
settings,
(sub, evt) =>
EventStoreOnEventCatchup(sub, evt),
null,
(sub, reason, ex) =>
EventStoreOnSubscriptionDroppedCatchup(sub, reason, ex),
_eventStoreClient.Credentials);
``
… I seem to be receiving all events, all expected event types, just fine.
However, when I try to get slices to operate on asynchronously and in batched workloads against an external service, and then use the last slice’s Position as so:
var indexPosition = new Position(0, 0); // getIndexPosition();
ServiceTrace.Trace.Verbose(EventIds.ProcessingStart, () => “BeginIndexing: Subscribe to event store notifications”);
AllEventsSlice currentSlice;
int slice = 1;
do
{
currentSlice = _eventStoreClient.Connection.ReadAllEventsForwardAsync(indexPosition, _eventStoreConfig.EventStoreReadBatchSize, false, _eventStoreClient.Credentials).Result;
ServiceTrace.Trace.Verbose(EventIds.ProcessingInfo, () => string.Format(“Slice #: {0}”, slice));
indexPosition = currentSlice.NextPosition;
try
{
var eventsToIndex = currentSlice.Events
.Where(e => null != e.OriginalEvent &&
null != e.OriginalEvent.EventType &&
!_elasticClient.TypesToFilter.Contains
(e.OriginalEvent.EventType));
Task.Run(() => _elasticClient.IndexEvents(eventsToIndex.ToList()));
}
catch (AggregateException aex)
{
ServiceTrace.Trace.Error(EventIds.ProcessingError, () => string.Format(“Aggregate exception while processing slice #{0}.”, slice), aex);
}
catch (Exception ex)
{
ServiceTrace.Trace.Error(EventIds.ProcessingError, () => string.Format(“Exception while processing slice #{0}.”, slice), ex);
}
slice++;
} while (!currentSlice.IsEndOfStream);
ServiceTrace.Trace.Information(EventIds.ProcessingInfo, () => “Catchup complete. Subscribing to new events.”);
var settings = new CatchUpSubscriptionSettings(200, 1, true, false);
var catchup = _eventStoreClient.Connection.SubscribeToAllFrom(
indexPosition,
settings,
(sub, evt) =>
EventStoreOnEventCatchup(sub, evt),
null,
(sub, reason, ex) =>
EventStoreOnSubscriptionDroppedCatchup(sub, reason, ex),
_eventStoreClient.Credentials);
``
… I seem to only receive a subset of event types. All the event types show up in slices, but after all slices are read, the subscription subsequent to that only surfaces limited event types. I restart the service, and new events that were missed in the subscription, now show up in the catch-up slices.
Why am I seeing this behavior? What am I missing?