Christian, I wrote a starter project for a C# project to SQL (with remembering last position) that I copy and modify as needed for whatever particular scenario. It’s by no means production ready. To make it a working out-of-box solution, it uses ES-Embedded, but you would obviously change that. Other dependencies are Topshelf (for win-service) and Json.NET for serialization.
As an aside, all of the plumbing surrounding storing the position in SQL is moot once Competing-Consumers (now in dev) makes it to production.
Hope it helps:
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Embedded;
using EventStore.Core;
using Newtonsoft.Json;
using Topshelf;
namespace EsSvcProjection
{
class Program
{
static readonly bool RESTART_SQL = true;//true = completely recreate database tables, false = from last position
static readonly string CONNECTION_STRING = <<INSERT_CONNECTION_STRING_HERE>>;
static void Main(string[] args)
{
HostFactory.Run(x =>
{
x.Service<EsSvcProjectionApp>(s =>
{
s.ConstructUsing(name => new EsSvcProjectionApp(RESTART_SQL, CONNECTION_STRING));
s.WhenStarted(tc => tc.Start());
s.WhenStopped(tc => tc.Stop());
});
x.RunAsLocalSystem();
x.SetDescription("EsSvcProjectionApp Description");
x.SetDisplayName("EsSvcProjectionApp Display Name");
x.SetServiceName("EsSvcProjectionApp");
});
}
}
public class EsSvcProjectionApp
{
readonly ClusterVNode _embeddedEventStore;
readonly IEventStoreConnection _connection;
readonly bool _restartSql;
readonly string _connectionString;
readonly Random _rnd = new Random();
readonly Timer _timer;
public EsSvcProjectionApp(bool restartSql, string connectionString)
{
_restartSql = restartSql;
_connectionString = connectionString;
_embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode().OnDefaultEndpoints().RunInMemory().Build();
_connection = EmbeddedEventStoreConnection.Create(_embeddedEventStore);
_timer = new Timer(1500);
_timer.Elapsed += timerElapsed;
}
public void Start()
{
_embeddedEventStore.Start();
_connection.ConnectAsync().Wait();
_connection.AppendToStreamAsync("bankaccount", ExpectedVersion.Any, getEvents(3));
if (_restartSql)
rebuildSql();
Console.WriteLine("Total before subscription: {0}", sqlScalar("select amount from bankaccount"));
_connection.SubscribeToStreamFrom("bankaccount", getLastPos(), false, eventAppeared, liveProcessingStarted);
_timer.Start();
}
public void Stop()
{
_connection.Dispose();
}
private void timerElapsed(object sender, ElapsedEventArgs e)
{
_connection.AppendToStreamAsync("bankaccount", ExpectedVersion.Any, getEvents(1));
}
private void liveProcessingStarted(EventStoreCatchUpSubscription obj)
{
Console.WriteLine("live!");
}
private void eventAppeared(EventStoreCatchUpSubscription sub, ResolvedEvent evt)
{
var type = evt.Event.EventType;
switch (type)
{
case "MoneyDeposited": moneyDeposited(evt.Event); break;
case "MoneyWithdrawn": moneyWithdrawn(evt.Event); break;
}
}
private void moneyDeposited(RecordedEvent recordedEvent)
{
var evt = JsonConvert.DeserializeObject<MoneyDeposited>(Encoding.UTF8.GetString(recordedEvent.Data));
var total = sqlAddToBankAccount(evt.Amount, recordedEvent.EventNumber);
Console.WriteLine("Money Deposited: {0}. Total: {1}", evt.Amount, total);
}
private void moneyWithdrawn(RecordedEvent recordedEvent)
{
var evt = JsonConvert.DeserializeObject<MoneyWithdrawn>(Encoding.UTF8.GetString(recordedEvent.Data));
var total = sqlAddToBankAccount(-evt.Amount, recordedEvent.EventNumber);
Console.WriteLine("Money Withdrawn: {0}. Total: {1}", evt.Amount, total);
}
private void rebuildSql()
{
sqlScalar("drop table position", true);
sqlScalar("drop table bankaccount", true);
sqlScalar("create table position (pos int not null default(0))");
sqlScalar("create table bankaccount (amount int not null)");
sqlScalar("insert into bankaccount (amount) values (0)");
}
private IEnumerable<EventData> getEvents(int count)
{
var retVal = new List<EventData>();
for (int i = 0; i < count; i++)
{
var amount = _rnd.Next(1, 100);
if (amount % 2 == 0)
retVal.Add(new EventData(Guid.NewGuid(), "MoneyDeposited", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MoneyDeposited { Amount = amount })), null));
else
retVal.Add(new EventData(Guid.NewGuid(), "MoneyWithdrawn", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MoneyWithdrawn { Amount = amount })), null));
}
return retVal;
}
private int? getLastPos()
{
return (int?)sqlScalar("select pos from position");
}
private object sqlScalar(string stmt, bool suppressError = false)
{
using (var conn = new SqlConnection(_connectionString))
{
conn.Open();
try
{
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = stmt;
cmd.CommandType = System.Data.CommandType.Text;
return cmd.ExecuteScalar();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
if (!suppressError)
throw;
else
return ex.Message;
}
finally
{
conn.Close();
}
}
}
private int sqlAddToBankAccount(int amount, int pos, bool suppressError = false)
{
using (var conn = new SqlConnection(_connectionString))
{
conn.Open();
var trans = conn.BeginTransaction();
try
{
using (var cmd = conn.CreateCommand())
{
cmd.Transaction = trans;
cmd.CommandText = "delete position";
cmd.CommandType = System.Data.CommandType.Text;
cmd.ExecuteNonQuery();
}
using (var cmd = conn.CreateCommand())
{
cmd.Transaction = trans;
cmd.CommandText = "insert into position (pos) values (@pos)";
cmd.Parameters.AddWithValue("@pos", pos);
cmd.CommandType = System.Data.CommandType.Text;
cmd.ExecuteNonQuery();
}
using (var cmd = conn.CreateCommand())
{
cmd.Transaction = trans;
cmd.CommandText = "update bankaccount set amount = amount + @amount";
cmd.Parameters.AddWithValue("@amount", amount);
cmd.CommandType = System.Data.CommandType.Text;
cmd.ExecuteNonQuery();
}
trans.Commit();
}
catch (Exception ex)
{
trans.Rollback();
Console.WriteLine(ex.Message);
if (!suppressError)
throw;
}
finally
{
conn.Close();
}
}
return (int)sqlScalar("select amount from bankaccount");
}
}
public class MoneyDeposited
{
public int Amount { get; set; }
}
public class MoneyWithdrawn
{
public int Amount { get; set; }
}
}
``
Chris