This is how I am initializing my EventStore catchup subscription. I’ve put a debugger in there to verify that prefix contains the correct value. However, if I use the formatted string, I don’t get anything from EventStore. However, if I replace with prefix = "^<Some Aggregate>.*", it works. There seems to be fundamentally no difference between prefix = rf'^{aggregate_type.__name__}.*' and prefix = rf'^Election.*' for aggregate_type == Election. But clearly there is, because I don’t receive any events when using the former. Can anyone suggest what I am missing?
I see, this is not related to EventStore at all. I am having trouble with the fact that the subscription iterator blocks. I wanted to put all of my MongoDB workers into a single script like this:
Unfortunately, because the iterator blocks, it will not get a chance to execute the other workers. Is there any chance we can get an AsyncIterator for CatchupSubscription?
I think perhaps I being a bit unreasonable. This is more appropriately handled with multi-threading, which I have done. I think we can leave this one as closed.
Update: you are using the EventStoreDBClient which is the non-asyncio client. Try your async code with the AsyncioEventStoreDBClient? There’s some documentation here about it:
I just tried doing what you wrote, with the AsyncioEventStoreDBClient the subscribe_to_all() and asyncio.gather() and it works.
You don’t need the anchor assertion ^ here. It’s added when the sequence of patterns is composed into a single regular expression.
You don’t need to use raw strings here because you aren’t doing any escapes, I think.
There’s no need to use filter_exclude if you are using filter_include because the value will be ignored.
I suppose if 'Election.*' != f'{aggregate_type.__name__}.*' then 'Election' != aggregate_type.__name__ so perhaps aggregate_type is not Election? Honestly I have no other idea why you would see any difference. Does it actually work if you change the __init__() to have argument aggregate_type_name: str and mention the types by string name?
It’s probably better to use just one instance of the client in a single Python interpreter, and share that in your different threads/coroutines. You could construct it in your workers() function and pass it as an argument to each MongoDbWorker.
Without some more details about your run() method, I don’t really know why “the iterator blocks”. But I will look into this.
Update: you are using the EventStoreDBClient which is the non-asyncio client. Try your async code with the AsyncioEventStoreDBClient?
Brian (same person as @richardsonb?) The non-async client is more well developed than the async client, but what you tried perhaps should work. Good to hear you got it working though.
Update: I added a test in the repo that shows this idiom of using asyncio.gather with subscribe_to_all() does actually work: