subscribe to stream via windows service, project events and persist data

Hi,

Is anyone aware of some .Net (C#) code that uses a windows service that subscribes to a stream and projects certain events and ultimately persists them in a database? I guess the windows service would ReadAllEventsBackwardAsync according to some heartbeat. It would also catch-up if it has been (re)started. Any feedback would be very much appreciated. Thanks.

Christian

Hey, are you looking for some code that implements all of that? As far as I know there is no full ES infrastructure package. I think many people have come up with there own event dispatchers ( listen to events and hand off to the appropriate handlers ). I have and it took some thinkin. I intend either make available, or ask for code review on mine sometime soon.
R

Christian, I wrote a starter project for a C# project to SQL (with remembering last position) that I copy and modify as needed for whatever particular scenario. It’s by no means production ready. To make it a working out-of-box solution, it uses ES-Embedded, but you would obviously change that. Other dependencies are Topshelf (for win-service) and Json.NET for serialization.

As an aside, all of the plumbing surrounding storing the position in SQL is moot once Competing-Consumers (now in dev) makes it to production.

Hope it helps:

using System;

using System.Collections.Generic;

using System.Data.SqlClient;

using System.Linq;

using System.Net;

using System.Text;

using System.Threading.Tasks;

using System.Timers;

using EventStore.ClientAPI;

using EventStore.ClientAPI.Embedded;

using EventStore.Core;

using Newtonsoft.Json;

using Topshelf;

namespace EsSvcProjection

{

class Program

{

    static readonly bool RESTART_SQL = true;//true = completely recreate database tables, false = from last position

    static readonly string CONNECTION_STRING = <<INSERT_CONNECTION_STRING_HERE>>;

    static void Main(string[] args)

    {

        HostFactory.Run(x =>

        {

            x.Service<EsSvcProjectionApp>(s =>

            {

                s.ConstructUsing(name => new EsSvcProjectionApp(RESTART_SQL, CONNECTION_STRING));

                s.WhenStarted(tc => tc.Start());

                s.WhenStopped(tc => tc.Stop());

            });

            x.RunAsLocalSystem();

            x.SetDescription("EsSvcProjectionApp Description");

            x.SetDisplayName("EsSvcProjectionApp Display Name");

            x.SetServiceName("EsSvcProjectionApp");

        });

    }

}

public class EsSvcProjectionApp

{

    readonly ClusterVNode _embeddedEventStore;

    readonly IEventStoreConnection _connection;

    readonly bool _restartSql;

    readonly string _connectionString;

    readonly Random _rnd = new Random();

    readonly Timer _timer;

    public EsSvcProjectionApp(bool restartSql, string connectionString)

    {

        _restartSql = restartSql;

        _connectionString = connectionString;

        _embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode().OnDefaultEndpoints().RunInMemory().Build();

        _connection = EmbeddedEventStoreConnection.Create(_embeddedEventStore);

        _timer = new Timer(1500);

        _timer.Elapsed += timerElapsed;

    }

    public void Start()

    {

        _embeddedEventStore.Start();

        _connection.ConnectAsync().Wait();

        _connection.AppendToStreamAsync("bankaccount", ExpectedVersion.Any, getEvents(3));

        if (_restartSql)

            rebuildSql();

        Console.WriteLine("Total before subscription: {0}", sqlScalar("select amount from bankaccount"));

        _connection.SubscribeToStreamFrom("bankaccount", getLastPos(), false, eventAppeared, liveProcessingStarted);

        _timer.Start();

    }

    public void Stop()

    {

        _connection.Dispose();

    }

    private void timerElapsed(object sender, ElapsedEventArgs e)

    {

        _connection.AppendToStreamAsync("bankaccount", ExpectedVersion.Any, getEvents(1));

    }

    private void liveProcessingStarted(EventStoreCatchUpSubscription obj)

    {

        Console.WriteLine("live!");

    }

    private void eventAppeared(EventStoreCatchUpSubscription sub, ResolvedEvent evt)

    {

        var type = evt.Event.EventType;

        switch (type)

        {

            case "MoneyDeposited": moneyDeposited(evt.Event); break;

            case "MoneyWithdrawn": moneyWithdrawn(evt.Event); break;

        }

    }

    private void moneyDeposited(RecordedEvent recordedEvent)

    {

        var evt = JsonConvert.DeserializeObject<MoneyDeposited>(Encoding.UTF8.GetString(recordedEvent.Data));

        var total = sqlAddToBankAccount(evt.Amount, recordedEvent.EventNumber);

        Console.WriteLine("Money Deposited: {0}. Total: {1}", evt.Amount, total);

    }

    private void moneyWithdrawn(RecordedEvent recordedEvent)

    {

        var evt = JsonConvert.DeserializeObject<MoneyWithdrawn>(Encoding.UTF8.GetString(recordedEvent.Data));

        var total = sqlAddToBankAccount(-evt.Amount, recordedEvent.EventNumber);

        Console.WriteLine("Money Withdrawn: {0}. Total: {1}", evt.Amount, total);

    }

    private void rebuildSql()

    {

        sqlScalar("drop table position", true);

        sqlScalar("drop table bankaccount", true);

        sqlScalar("create table position (pos int not null default(0))");

        sqlScalar("create table bankaccount (amount int not null)");

        sqlScalar("insert into bankaccount (amount) values (0)");

    }

    private IEnumerable<EventData> getEvents(int count)

    {

        var retVal = new List<EventData>();

        for (int i = 0; i < count; i++)

        {

            var amount = _rnd.Next(1, 100);

            if (amount % 2 == 0)

                retVal.Add(new EventData(Guid.NewGuid(), "MoneyDeposited", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MoneyDeposited { Amount = amount })), null));

            else

                retVal.Add(new EventData(Guid.NewGuid(), "MoneyWithdrawn", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MoneyWithdrawn { Amount = amount })), null));

        }

        return retVal;

    }

    private int? getLastPos()

    {

        return (int?)sqlScalar("select pos from position");

    }

    private object sqlScalar(string stmt, bool suppressError = false)

    {

        using (var conn = new SqlConnection(_connectionString))

        {

            conn.Open();

            try

            {

                using (var cmd = conn.CreateCommand())

                {

                    cmd.CommandText = stmt;

                    cmd.CommandType = System.Data.CommandType.Text;

                    return cmd.ExecuteScalar();

                }

            }

            catch (Exception ex)

            {

                Console.WriteLine(ex.Message);

                if (!suppressError)

                    throw;

                else

                    return ex.Message;

            }

            finally

            {

                conn.Close();

            }

        }

    }

    private int sqlAddToBankAccount(int amount, int pos, bool suppressError = false)

    {

        using (var conn = new SqlConnection(_connectionString))

        {

            conn.Open();

            var trans = conn.BeginTransaction();

            try

            {

                using (var cmd = conn.CreateCommand())

                {

                    cmd.Transaction = trans;

                    cmd.CommandText = "delete position";

                    cmd.CommandType = System.Data.CommandType.Text;

                    cmd.ExecuteNonQuery();

                }

                using (var cmd = conn.CreateCommand())

                {

                    cmd.Transaction = trans;

                    cmd.CommandText = "insert into position (pos) values (@pos)";

                    cmd.Parameters.AddWithValue("@pos", pos);

                    cmd.CommandType = System.Data.CommandType.Text;

                    cmd.ExecuteNonQuery();

                }

                using (var cmd = conn.CreateCommand())

                {

                    cmd.Transaction = trans;

                    cmd.CommandText = "update bankaccount set amount = amount + @amount";

                    cmd.Parameters.AddWithValue("@amount", amount);

                    cmd.CommandType = System.Data.CommandType.Text;

                    cmd.ExecuteNonQuery();

                }

                trans.Commit();

            }

            catch (Exception ex)

            {

                trans.Rollback();

                Console.WriteLine(ex.Message);

                if (!suppressError)

                    throw;

            }

            finally

            {

                conn.Close();

            }

        }

        return (int)sqlScalar("select amount from bankaccount");

    }

}

public class MoneyDeposited

{

    public int Amount { get; set; }

}

public class MoneyWithdrawn

{

    public int Amount { get; set; }

}

}

``

Chris

This exists its called a catchupsubscription in the client api. Just
host it in your windows service. You would just store your last
persisted event and pass it back on startup (will continue from that
point)

You probably don't want to use competing consumers for projections.

@greg is this because of datastore cluster replication? It’s difficult/impossible to be sure that a write to the datastore has been replicated to other nodes before a competing consumer reads the same record from the ds and updates. That’s what I was running into last week.

because of our replication confused by the response? You should never
get a message that has not yet been replicated.

Competing consumers can quite easily give out of order messages where
as catchup subscription gives in order.

Cheers,

Greg

rather that when you make an update to a clusterd instance of say elastic, which has a 1 sec replication time, you could be reading and acting on the next message in the queue before the replication is complete. If that message is updating the same record as the last message this could result in querying the db, getting the stale date, updating and persisting it. There are ways, I discovered last week, of mitigating this, however, they are not possible when you have competing consumers on the projection side.
I guess that is ultimately the same issue with a different cause, as out of order messages.

R

Ah ok so you are referring to the store you are writing to that makes sense.

Competing will do its best to be in order but a single slow message or
zombied tcp socket could cause out of order messages (waits until
retry)

yes sorry if that wasn’t clear.

That’s a good point, I’ll stick to catchup subscriptions for projections.