I am using the EventStore.Client.Embedded Version=20.6.0
and it’s all good for appending events to stream and reading, but when I try to subscribe it complains about access denied Unhandled exception. EventStore.ClientAPI.Exceptions.AccessDeniedException: Subscription to '<all>' failed due to access denied.
and I’m not sure why because I am using it in-memory.
This is my code, it uses the IEventStoreConnection to subscribe to all streams and then it ignores all the events that don’t belong in an expected stream
public class EventStoreSubscriber
: IEventStoreSubscriber
{
private readonly IEventStoreConnection _eventStoreConnection;
private readonly SerializationService _serializationService;
public EventStoreSubscriber(
IEventStoreConnection eventStoreConnection,
SerializationService serializationService)
{
_eventStoreConnection = eventStoreConnection;
_serializationService = serializationService;
}
public Task Subscribe(string streamNamePrefix, Func<object, CancellationToken, Task> handler, CancellationToken cancellationToken)
{
return _eventStoreConnection.SubscribeToAllAsync(
false,
async (streamSubscription, resolvedEvent) =>
{
var streamName = resolvedEvent.OriginalStreamId;
if (streamName.StartsWith(streamNamePrefix, StringComparison.InvariantCultureIgnoreCase))
{
var recognizedEvent = _serializationService.GetAsEvent(resolvedEvent);
await handler(recognizedEvent, cancellationToken);
}
});
}
}
This approach works well with the “real” event store. What should I do to have the embedded ES also work with this approach?
This is how I use it
var nodeBuilder =
EmbeddedVNodeBuilder
.AsSingleNode()
.OnDefaultEndpoints()
.RunInMemory();
var node = nodeBuilder.Build();
await node.StartAsync(true);
using var embeddedConnection = EmbeddedEventStoreConnection.Create(node);
var supportedTypes = new SupportedTypes();
supportedTypes.AddType(typeof(Foo));
supportedTypes.AddType(typeof(Bar));
var serializationService = new SerializationService(supportedTypes);
var eventStoreSubscriber = new EventStoreSubscriber(embeddedConnection, serializationService);
await eventStoreSubscriber.Subscribe(
streamNamePrefix, async (retrievedEvent, cancellationToken) =>
{
await Console.Out.WriteLineAsync($"Subscriber {subscriberId} received: {retrievedEvent}");
},
CancellationToken.None
);