Hi
If a subscription is dropped after a call to SubscribeToStreamAsync, say by a closed connection, do I need to call SubscribeToStreamAsync again to get a new working subscription?
Thanks.
Hi
If a subscription is dropped after a call to SubscribeToStreamAsync, say by a closed connection, do I need to call SubscribeToStreamAsync again to get a new working subscription?
Thanks.
Yes. There is an optional argument when subscribing to handle when a subscription drops. You probably want to just reconnect here unless you manually closed the connection
Thank you for the code example. Looks very interesting.
When you say reconnect do you mean resubscribe?
I have a situation where I call SubscribeToStreamAsync again when the connection automatically reconnects (KeepReconnecting), but this second call never returns…
The other place you can do it is on the Connected event of the connection. That way if you’re running in a cluster the subscriptions will persist as you move between nodes. Ensure you don’t leak handlers etc though!
James
That’s what I’m doing at the moment. I call SubscribeToStreamAsync.Result, but it never returns…
Code sample?
using DataFoundation.ClientLibrary.Model;
using EventStore.ClientAPI;
using System;
using System.Reactive.Disposables;
namespace DataFoundation.ClientLibrary.Communication.Clients.EventStore
{
public class EventStoreObservable : IObservable, IDisposable
{
private IObserver observer;
private readonly DataKey dataEntryKey;
private readonly IEventStoreConnection eventStoreConnection;
private EventStoreSubscription subscription;
private readonly StreamNameTranslator streamNameTranslator = new StreamNameTranslator();
private SubscriptionDropReason? dropReason;
public EventStoreObservable(DataKey dataEntryKey, IEventStoreConnection eventStoreConnection)
{
this.dataEntryKey = dataEntryKey;
this.eventStoreConnection = eventStoreConnection;
this.eventStoreConnection.Connected += ConnectedToEventStore;
}
public IDisposable Subscribe(IObserver obs)
{
if (subscription != null)
{
throw new InvalidOperationException(“Already subscribed!”);
}
if (observer != null)
{
throw new InvalidOperationException(“Only one observer supported!”);
}
observer = obs;
subscription = CreateSubscription();
return this;
}
private void ConnectedToEventStore(object sender, ClientConnectionEventArgs e)
{
if (dropReason == SubscriptionDropReason.ConnectionClosed)
{
CreateSubscription();
}
}
private EventStoreSubscription CreateSubscription()
{
dropReason = null;
return eventStoreConnection.SubscribeToStreamAsync(
stream: streamNameTranslator.DataEntryKeyToStream(dataEntryKey),
resolveLinkTos: false,
eventAppeared: EventAppeared,
subscriptionDropped: SubscriptionDropped).Result;
}
private void SubscriptionDropped(EventStoreSubscription eventStoreSubscription, SubscriptionDropReason subscriptionDropReason, Exception e)
{
dropReason = subscriptionDropReason;
}
private void EventAppeared(EventStoreSubscription eventStoreSubscription, ResolvedEvent resolvedEvent)
{
observer.OnNext(DataValue.FromBytes(resolvedEvent.Event.Data));
}
public void Dispose()
{
if (subscription != null)
{
subscription.Unsubscribe();
eventStoreConnection.Connected -= ConnectedToEventStore;
}
}
}
}
Connection settings:
settings.KeepReconnecting();
settings.EnableVerboseLogging();
You’re blocking in the event handler… makes sense!
But shouldn't the call succeed and return?
I rewrote the code to use a non blocking approach and now its working. Thank you all for taking the time to help me.
If you block in the event handler waiting for a message to be processed it will never complete, so messages won’t get pumped in the meantime. If you happen to be at Build Stuff in Lithuania next week I’m doing a talk on the various gotchas with this kind
of code specifically focussing on the Event Store client. It took me a long time to find it too!
James
Well that’s comforting
Sadly no build stuff conference for me, but I’ll definitely watch it, if it becomes available online at some point…
I think I’m having similar problems. Can you link to any articles or videos explaining how to avoid these problems and how to do it properly?
/Julian
Not sure that was ever filmed. If you post your set up code and the code where you’re seeing the issue we can probably help though