Python filter_include not working as expected

    def __init__(self, aggregate_type: type):
        load_dotenv()
        self._mongo = MongoClient(os.getenv("MONGO_URI"))
        self._aggregate_type = aggregate_type
        prefix = f'^{aggregate_type.__name__}.*'
        esdb_uri = os.getenv("EVENTSTORE_URI")
        self._client = EventStoreDBClient(uri=esdb_uri)
        self._subscription = self._client.subscribe_to_all(
            filter_by_stream_name=True,
            filter_exclude=[ESDB_SYSTEM_EVENTS_REGEX],
            filter_include=[prefix]
        )

This is how I am initializing my EventStore catchup subscription. I’ve put a debugger in there to verify that prefix contains the correct value. However, if I use the formatted string, I don’t get anything from EventStore. However, if I replace with prefix = "^<Some Aggregate>.*", it works. There seems to be fundamentally no difference between prefix = rf'^{aggregate_type.__name__}.*' and prefix = rf'^Election.*' for aggregate_type == Election. But clearly there is, because I don’t receive any events when using the former. Can anyone suggest what I am missing?

I see, this is not related to EventStore at all. I am having trouble with the fact that the subscription iterator blocks. I wanted to put all of my MongoDB workers into a single script like this:

async def workers():
    ballot_worker = MongoDbWorker(Ballot).run()
    candidate_worker = MongoDbWorker(Candidate).run()
    competition_worker = MongoDbWorker(Competition).run()
    election_worker = MongoDbWorker(Election).run()
    voter_worker = MongoDbWorker(Voter).run()
    await asyncio.gather(ballot_worker, candidate_worker, competition_worker, election_worker, voter_worker)

if __name__ == "__main__":
    asyncio.run(workers())

Unfortunately, because the iterator blocks, it will not get a chance to execute the other workers. Is there any chance we can get an AsyncIterator for CatchupSubscription?

Richard, could you open an issue in the Python client repository? I am not sure if John is looking here, but he probably will be able to help you.

I think perhaps I being a bit unreasonable. This is more appropriately handled with multi-threading, which I have done. I think we can leave this one as closed.

@richardsonb The async method subscribe_to_all() returns an AsyncIterator. Here’s the code:

Update: you are using the EventStoreDBClient which is the non-asyncio client. Try your async code with the AsyncioEventStoreDBClient? There’s some documentation here about it:

I just tried doing what you wrote, with the AsyncioEventStoreDBClient the subscribe_to_all() and asyncio.gather() and it works.

@richardsonb Just some tips:

  1. You don’t need the anchor assertion ^ here. It’s added when the sequence of patterns is composed into a single regular expression.

  2. You don’t need to use raw strings here because you aren’t doing any escapes, I think.

  3. There’s no need to use filter_exclude if you are using filter_include because the value will be ignored.

  4. I suppose if 'Election.*' != f'{aggregate_type.__name__}.*' then 'Election' != aggregate_type.__name__ so perhaps aggregate_type is not Election? Honestly I have no other idea why you would see any difference. Does it actually work if you change the __init__() to have argument aggregate_type_name: str and mention the types by string name?

  5. It’s probably better to use just one instance of the client in a single Python interpreter, and share that in your different threads/coroutines. You could construct it in your workers() function and pass it as an argument to each MongoDbWorker.

Without some more details about your run() method, I don’t really know why “the iterator blocks”. But I will look into this.

Update: you are using the EventStoreDBClient which is the non-asyncio client. Try your async code with the AsyncioEventStoreDBClient?

Thanks for the mention @alexey.zimarev! :grinning:

Brian (same person as @richardsonb?) The non-async client is more well developed than the async client, but what you tried perhaps should work. Good to hear you got it working though.

Update: I added a test in the repo that shows this idiom of using asyncio.gather with subscribe_to_all() does actually work: