Just to chip in, I know it isn’t exactly the same thing but is related - I recently set up an AsyncEnumerable which allowed me to connect to ES using the Atom pub / sub but treat the data as a continuous stream like you would with an Observable, piping it through functions.
It wraps up the paging in a nice way and makes it invisible to the consumer.
I was using F# but C# 8 has them out of the box, no need for RX or anything.
let getEventPagesFrom
(restClient : IRestClient)
address =
let headers = Some[
Header(“ES-TrustedAuth”, “admin; $admins”)
Header(AcceptHeader, JsonType)]
let param = Some ([Parameter (EmbedProperty,BodyValue)]) // include event data, not just links. Only available with JSON response
let initialRequest = Get (address, param, headers)
let maybeGetNextRequest slice =
maybe {
let! link = slice.Links |> Seq.tryFind (fun l -> l.Relation = PreviousLink)
let address = Address link.Uri.AbsoluteUri
return Get (address, param, headers)
}
let rec loop request =
asyncSeq {
let! result = restClient.execute request // await response
match result with
Ok slice ->
yield Ok slice.Entries // return events
match maybeGetNextRequest slice with // look for next page link
Some nextRequest -> yield! loop nextRequest // recurse through remaining pages
None -> () // end of seq, break
Error e -> yield Error e // page request failed, break
}
loop initialRequest