Hello All,
Mine is an event sourced with EventStore(version 3.9.3) as the journal used in conjunction with akka-persistence(version 2.4.16). To query event store for persisted entities, i am using the ReadJournal plugin of akka and initialize it to use EventStoreReadJournal.class as the journal class and eventstore.persistence.query, as the query identifier in the application.conf file.
In order to get all the persisted ids i use the readJournal.allPersistenceIds() with the readJournal initialized to use EventStore as the underlying read journal.
The situation is that there are a lot of ids which have already been persisted in the event store.
Now if i query for events of a particular persistenceId, i am able to see it’s events.
public CompletionStage runForEachEvent(String id, long sequenceNr, Procedure function) {
*** ActorMaterializer materializer = ActorMaterializer.create(system);***
*** Source<EventEnvelope, NotUsed> eventsForId = ((CurrentEventsByPersistenceIdQuery) journal(system))***
*** .currentEventsByPersistenceId(id, sequenceNr, Long.MAX_VALUE);***
*** return eventsForId.runForeach(function, materializer);***
*** }***
But if i try to get all the events of all the persistenceIds, the akka source seems to break somewhere and i am unable to see some persistenceIds.
public final Source<String, NotUsed> allPersistenceIds(Predicate… filters) {
Source<String, NotUsed> allPersistenceIds = ((AllPersistenceIdsQuery) journal(system))
.allPersistenceIds()
.buffer(500, backpressure());
for (Predicate filter : filters)
allPersistenceIds = allPersistenceIds.filter(filter);
return allPersistenceIds;
}
public final Source<EventEnvelope, NotUsed> allEventsSourceForPersistenceId(String id, long from, long to) {
return ((EventsByPersistenceIdQuery) journal(system)).eventsByPersistenceId(id, from, to)
.buffer(100, OverflowStrategy.backpressure());
}
The above is wired together in the code below.
Source<String, NotUsed> filteredPersistenceIds = journalProvider.allPersistenceIds(id -> id.contains(“identifier”)));
*** Source<EventEnvelope, NotUsed> sourceOfEvents = filteredPersistenceIds.flatMapConcat(id -> journalProvider.allEventsSourceForPersistenceId(id, startSeqNumber(id), Long.MAX_VALUE));***
*** sourceOfEvents.runForeach(event -> System.out.println("Encountered event is " + event.event()), materializer);***
When the above source is materialized, i am unable to see all the events containing the aforementioned identifier in their persistenceId.
Any help in the matter would be highly appreciated.
Regards,
Ankit