Subscriptions over HTTP API

Hello,
Using the HTTP api, I’m trying to get my head around subscriptions.

Docs says:

"There are three types of subscription patterns, Volatile, Catch up, and Persistent.

I do understand the meaning behind each concept, but I’m not sure how this should be implemented.

The word ‘pattern’ tells me that this might not be an explicit config property, but a way of approaching this (am I right?)

I understood how to actually poll via HTTP (follow the atom feed till ‘previous’ link disappears and keep polling),

I understood how to create Persistent subscriptions and how to query them-

But I feel i’m missing something.

Persistent subscription- is it for ‘competing consumers’ only?

I’m not sure when i should create a persistent sub and when I shouldn’t.

Do Volatile and Catch up Subscriptions need to be explicitly created? (like Persistent sub?)

Or they ARE actually subset of a Persistent sub? and they just vary by a configuration property?

Please help me understand these concepts better.

Thanks

Is there actually any kind of Subscription that should not be created via
PUT /subscriptions/{stream}/{subscription}

``

?

OK so let’s break this down a little.

There are 4 distinct ways you can subscribe to data as of now (I have been considering adding a 5th for quite some time and will get to that after).

The first is over http. Streams are exposed as atom feeds. These feeds can be followed and treated as a subscription (there is also a header here worth looking at ES-LongPoll: {seconds} which when caught up will wait to see if something appears as opposed to returning immediately.

The next subscription type is a persistent subscription. These implement a normal subscription as a queue etc. You can have N subscribers and varying distribution patterns such as fan out/ hashed/or prefer early subscribers can be used. Persistent subscriptions have server state and assure at least once messaging.

There is also just a “subscription” this will push events to a client as they occur in the backend. As example subscribe {stream} will push an event when an event is written to {stream}.

The last one to discuss here is a CatchUpSubscription. This will automatically switch between reading via reads and subscribing live to allow a subscription to read the past etc and continue into live mode. As example if you have 50 events in a stream and SubscriberToStreamFrom(stream, 0) it will start at event number 0, catch up to #50 and continue as a live subscription. You remember the last event processed and on restart etc say what that event was. CatchUpSubscriptions are most often used for projections.

Volatile and Catchup have no server configuration they are on demand based on client demand.

Does this cover what you were looking for?

Cheers,

Greg

Oh I left out the 5th mechanism I have been considering. The 5th mechanism would be a “push subscription” where you could as example give a uri and tell the subscription to post all events to that uri as they occur. This is not as of this time implemented but likely will be soon.

Thanks Greg, I really appreciate that.

I understand the difference between these kinds, but i’m still feeling a little confused -

let me elaborate on my case so it may be more clear to you which part i don’t really get.

Here’s how i setup things:

  • HTTP API.
  • Stream per AR (e.g Order-bbc507d0-ecaf-40cc-8239-ba478ec63242)
  • Single stream for commands named Commands
  • ProcessorManager:
  • subscribes to event types (using system event-type projection - e.g $et-OrderCreated)
  • writes to *Commands *stream.
  • ApplicationService:
  • subscribes to Commands stream using Persistent Subscription:
  • PUT /subscriptions/Commands/ {startFrom: 0} (ensure creation of subscription each time service inits)
  • GET /subscriptions/Commands/ (poll)
  • when cmd arrives, build AR state by applying all events on it.
  • in such case events are fetched from GET /streams/- from 0 to current using ‘previous’ links.
  • writes result event(s) to AR’s stream
  • ProjectionService:
  • subscribes to event types (using system event-type projection - e.g $et-OrderCreated)
  • writes to read-side db.

As you can see above,

ProcessManagers & ProjectionServices are polling multiple streams (one per event type, each on its separate thread).

While AppServices holds a single persistent subscription each (/subscrtiptions/Commands/).

Each of the above will ACK / NACK accordingly against its own subscription.

Besides the fact that the service reads all history from 0 everytime it starts (which obviously delays its work)- everything is working as expected, but I obviously still don’t understand it as i would want to.

Now, just to validate myself- is my approach right?

Is it ok to create a (persistent) sub for each of event-type-streams the way i did?

Is it correct to rebuild AR state from GET /streams/?

Is it correct to use subscriptions the way i did? after all, (currently) i don’t have competing consumers (there’s only one per service-type)

Which subscription am i using? catchup? persistent? again, not sure how both differ - for me it seems that i’m using both at once…

I know that i’m missing something- i just hope it’s something small.

thanks again.

Gal

One odd thing I see here is:

  • ApplicationService:
  • subscribes to Commands stream using Persistent Subscription:
  • PUT /subscriptions/Commands/ {startFrom: 0} (ensure creation of subscription each time service inits)
  • GET /subscriptions/Commands/ (poll)
  • when cmd arrives, build AR state by applying all events on it.
  • in such case events are fetched from GET /streams/- from 0 to current using ‘previous’ links.
  • writes result event(s) to AR’s stream
    Can you explain a bit more what you are doing here and why? Is this to make highly available your command processing? Likely this can also be done by just putting a load balancer in front of a few instances without using persistent subscriptions etc. Usually persistent subscriptions are used for events as opposed to commands. There is a secondary issue here that by using a persistent subscription you have either given up on (or made vastly more difficult) your ability to return say an error to the originator or a command which is often important.

Greg

Actually the Command part is quite solid, i agree it’s less ‘convenient’ than direct http but it really provides availability and most importantly decouples the service (along with the domain model [ar] it’s carrying with it) from the interface (http) layer.
in other words - my ApplicationServices are not web services, and are not exposed.

another benefit is easy smoke tests with replaying commands.

I do have web services that serve as REST api, putting commands into queues, listening to events and communicates back with SocketIO accordingly.

I guess there are better ways to do this - but that’s actually not my issue…

it really boils down to-

  • How should I subscribe to events with my ProcessorManagers and ProjectionServices? as I do now?
  • How should I rebuild AR state in my AppServices? using GET /streams//0/forward/ with ‘previous’ following (as i do now)?
  • I consider GET /streams/ as a place to ‘query’ (build ar state) unlike GET /subscriptions// which i consider as a place to ‘listen’ for new stuff - is that somehow correct? this is the point i feel i’m not getting around.
    another thing that’s confusing me- i ACK results from /subscriptions// and they don’t get retried which is expected, but looking at the subscriptions tab in Eventstore’s web interface - all the numbers on the right side are static 0 and the dots are red- yelling at me that i don’t get it…

Thanks again,

Gal

Oooh this is interesting. Would it be something like pubsubhubbub or a proprietary protocol?

When I looked at it the idea was just a straight http post (the other side can do with it whatever it wants). There are quite a few circumstances where this can be useful not limited to “subscriptions” (I am sure you can imagine some integration scenarios?) and its relatively straight forward to get going. I will try to get a fire lit under this…

Cheers,

Greg

Also shouldn’t a process manager be interested in correlation id not event type?

btw @Rickard feel free to hit me up off list if you want to discuss and/or make a case for pubsubhubbub. I am not against the idea as a whole but had not considered it.

My reasoning for going with the simple post was originally that something else could easily adapt it into a bus etc. It seemed like a decent “this can be adapted into pretty much anything” type addition.

The main reason for PSH is that it provides a protocol for registering the URL that should be posted to. I just found out that it has been renamed websub, and is now a W3C recommendation. A cursory look through the doc, and it seems pretty sane: https://www.w3.org/TR/2018/REC-websub-20180123/

Basically, handles a bunch of stuff you don’t have to invent on your own. Great!

Well that’s interesting.

So, if I understand correctly what you suggest is to dynamically subscribe the ProcessManager to the Process ID (e.g /subscriptions/Order-/OrderProcessManager) when the process is initiated, and to drop the subscription once the process is completed.

Which is conceptually different from what I do now- which is to create these ‘static’ subs by event-type when PM initiates itself.

Is this is right direction? cuz it totally makes sense to me.

Thanks Greg