I’m developing a set of TypeScript libraries for Node.js that are heavily inspired by the philosophy and the implementation of Eventuous.
I’d like to support resubscribing to the AllStreamSubscription as Eventuous does. So, here are my questions for you.
How do I handle subscription drops with the Node.js client? The client doesn’t provide the subscriptionDropped callback that’s mentioned in the docs.
Maybe, it’s because you thought of a pattern like
const subscription = client.subscribeToAll();
try {
for await (const resolvedEvent of subscription) {
// handle event
}
} catch (error) {
// handle subscription drop
}
If so, how do I get the drop reason? Looking at the CommandError I couldn’t find any mention of drop reasons. Also, there is no explicit mention of a catch-up counterpart of PERSISTENT_SUBSCRIPTION_DROPPED. Why is that? Does anybody always use persistent subscriptions?
Last question: how do I drop subscriptions on purpose for different reasons?
I’ve recently implemented this in a project and can share how I did it. Unsure if it is the best practices way as the docs didn’t really cover it. But it seems to work. It’s using the stream events to separate out regular events and error events.
class EventStoreDbSubscriber {
private subscription: AllStreamSubscription
constructor(
) {
void this.startSubscription()
}
async startSubscription() {
this.subscription = client.subscribeToAll()
this.registerSubscriptionHandlers()
}
registerSubscriptionHandlers() {
this.subscription
.on('data', (resolvedEvent: AllStreamResolvedEvent) => {
const event = resolvedEvent.event
// Handle the event
})
.on('error', (err) => {
// 'err' should have the subscription dropped reason
// Destroying this subscription will emit a 'close' event and we will try to reconnect
this.subscription.destroy()
})
.on('confirmation', () => {
console.log('EventStoreDb connection established')
})
.on('close', () => {
console.log('EventStoreDb connection closed. Attempting to reconnect in 5 seconds')
setTimeout(() => {
void this.startSubscription()
}, 5000)
})
}
}