When reading events out of a ordinary stream, you will need to track the EventNumber property.
By “ordinary” stream, I mean one that you (not the system) created and wrote to directly.
Other streams exist being linked or emitted by projections. When you’re reading from those streams, you may want to track other properties instead.
Below I’ve pasted some code that I wrote that extracts event position data nicely out of what you get from EventStore, converting it to what I consider a more “plain-english”, “understandable” representation of event data and positions.
There’s quite a lot of it because I wrote my own event sourcing abstractions library and implemented an EventStore version of it as one of several possible dependency injection options. You’ll be able to take what you learn from reading this code to get the information you need.
My own abstraction library code (just the relevant bits)
[DebuggerDisplay("{DebuggerFormat}")]
public class EventData {
///
/// The name of the stream
///
public string StreamId;
///
/// A globally unique identifier for the event.
///
public Guid EventId;
///
/// The sequence number of the event within the stream.
/// When writing events to a store, you can set it to “EventNumbers.Any” or any other member of the EventNumbers class.
/// If you want the store to check for sequential integrity, set it to the sequence number that the store will be expecting.
///
public long EventNumber;
///
/// The name of the event type. A stream can contain multiple event types.
///
public string EventType;
public byte[] Data;
public byte[] Metadata;
///
/// True if the Data and MetaData bytes represent a json string.
///
public bool IsJson;
#region Debugger-view properties
string DebuggerFormat => $"{EventNumber}@{StreamId} <{DataAsString}><{MetadataAsString}>";
string DataAsString => null == Data ? “” : Encoding.UTF8.GetString(Data);
string MetadataAsString => null == Metadata ? “” : Encoding.UTF8.GetString(Metadata);
#endregion
public override bool Equals(object obj) {
return Equals(obj as EventData);
}
public bool Equals(EventData other) {
if (null == other) return false;
if (!(StreamId == other.StreamId)) return false;
if (!(EventId == other.EventId)) return false;
if (!(EventNumber == other.EventNumber)) return false;
if (!(EventType == other.EventType)) return false;
if (!(Data.SequenceEqual(other.Data))) return false;
if (!(Metadata.SequenceEqual(other.Metadata))) return false;
if (!(IsJson == other.IsJson)) return false;
return true;
}
}
///
/// An event that is retrieved from a data store
///
public class RecordedEventData : EventData {
///
/// The time that the event was stored in the event stream. (Not necessarily the time that the event was created).
///
public TimeStamp StoredAt;
}
///
/// An event that is retrieved from a data store. It is possibly a linked event, due to a projection.
///
public class ResolvedEventData {
///
/// The original event
///
public RecordedEventData Event;
///
/// Represents the link. Link.Metadata contains information about the link itself.
/// Null if this is not a linked event.
///
public EventLink Link;
}
public class EventPositionData {
///
/// Contains the position of the event in the stream in which it was first created.
/// If you’re reading events from a stream that contains linked events, and you’re interested in the
/// event’s position within the stream currently being read, you’ll need to use the “EventLinkPosition”
/// property instead.
///
public EventPosition EventPosition;
///
/// Null if the event was not created or linked by a projection.
/// Contains the position of the event in the stream it’s currently being read from.
/// NB: Not the stream in which the event was first created
///
public EventPosition EventLinkPosition;
///
/// Null if the event was not created by a custom user projection.
/// When created (or linked) by a custom user projection, it contains list of the positions of all the streams
/// used as inputs to the projection at the time the event was created or linked.
///
public List ProjectionPositions;
}
///
/// Use this class to describe the position of an event within a stream,
/// or the number of events written to a stream at a particular point in time.
///
public class EventPosition {
public string StreamId;
public long EventNumber;
public EventPosition Clone() {
return new EventPosition {
StreamId = StreamId,
EventNumber = EventNumber,
};
}
public override bool Equals(object obj) {
return Equals(obj as EventPosition);
}
public bool Equals(EventPosition other) {
if (null == other) return false;
return StreamId == other.StreamId && EventNumber == other.EventNumber;
}
}
Implementation Code (just the relevant bits)
/// Converts an EventStore event into my abstraction library object
static ResolvedEventData Convert(ESResolvedEvent e) {
return new ResolvedEventData {
Event = new RecordedEventData {
StreamId = e.Event.EventStreamId,
EventId = e.Event.EventId,
EventNumber = e.Event.EventNumber,
EventType = e.Event.EventType,
Data = e.Event.Data,
Metadata = e.Event.Metadata,
IsJson = e.Event.IsJson,
StoredAt = new TimeStamp(e.Event.Created.Ticks),
},
Link = null == e.Link ? null : new EventLink {
EventId = e.Link.EventId,
EventNumber = e.Link.EventNumber,
StreamId = e.Link.EventStreamId,
Metadata = e.Link.Metadata,
},
};
}
/// Gets the event position data from my abstraction library object
public EventPositionData GetPositionData(ResolvedEventData data) {
/// TODO: Not tested: Suspect this does not work properly with events from the $category-categoryname stream
/// which is created by the $stream_by_category projection which sends the first event of every category to the $category-categoryname stream.
/// Same issue probably exists with events in the $streams stream, created by the $streams projection.
var result = new EventPositionData();
result.EventPosition = new EventPosition {
StreamId = data.Event.StreamId,
EventNumber = data.Event.EventNumber,
};
if (null != data.Link) {
result.EventLinkPosition = new EventPosition {
StreamId = data.Link.StreamId,
EventNumber = data.Link.EventNumber,
};
}
// ----------------------------------------------------
// Extract projection positions
// ----------------------------------------------------
List FromMetaData(byte[] metadata) {
return ((metadata?.Length ?? 0) == 0) ? null : (JObject.Parse(Encoding.UTF8.GetString(metadata)).GetValue("$s") as JObject)
?.Values() // NB: null coalescing operator (?.) handles case when $s property does not exist by returning null
.Select(x => new EventPosition { StreamId = x.Name, EventNumber = (long)x })
.ToList();
}
/// Handle case when event is linked by a custom user projection
result.ProjectionPositions = FromMetaData(data.Link?.Metadata);
if (null == result.ProjectionPositions) {
/// Handle case when event is created by a custom user projection
result.ProjectionPositions = FromMetaData(data.Event.Metadata);
}
// ----------------------------------------------------
return result;
}