Event Store Replicator

We’ve been looking at using the Event Store Replicator (https://replicator.eventstore.org/) to migrate to ES cloud. We’re doing a PoC using two locally running EventStores to understand a bit more about the tool. We’re running the tool in a K8 Minikube cluster. Currently we’re unable to get the tool replicating the events.

We’ve tried various combinations of config e.g.

replicator:
reader:
protocol: grpc
connectionString: “esdb://admin:[email protected]:2113?tls=false”
sink:
protocol: grpc
connectionString: “esdb://admin:[email protected]:2114?tls=false”
partitionCount: 1
filters: []
prometheus:
metrics: false
operator: false

Error in K8 logs:
{“m”:“Error occured in the “ReaderContext” pipe: “Specified method is not supported.””,“i”:“c9acd0a5”,“l”:“Error”,“x”:“System.NotSupportedException: Specified method is not supported.\n at EventStore.Client.EventStoreClient.ReadInternal(ReadReq request, EventStoreClientOperationOptions operationOptions, UserCredentials userCredentials, CancellationToken cancellationToken)+System.IAsyncDisposable.DisposeAsync()\n at EventStore.Client.EventStoreClient.ReadAllAsync(Direction direction, Position position, Int64 maxCount, EventStoreClientOperationOptions operationOptions, Boolean resolveLinkTos, UserCredentials userCredentials, CancellationToken cancellationToken)+MoveNext()\n at EventStore.Client.EventStoreClient.ReadAllAsync(Direction direction, Position position, Int64 maxCount, EventStoreClientOperationOptions operationOptions, Boolean resolveLinkTos, UserCredentials userCredentials, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()\n at Ubiquitous.Metrics.Metrics.Measure[T](Func1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count)\n at EventStore.Replicator.Esdb.Grpc.GrpcEventReader.ReadEvents(Position fromPosition, Func2 next, CancellationToken cancellationToken) in /app/src/EventStore.Replicator.Esdb.Grpc/GrpcEventReader.cs:line 66\n at EventStore.Replicator.Read.ReaderPipe.<>c__DisplayClass1_0.<<-ctor>g__Reader|1>d.MoveNext() in /app/src/EventStore.Replicator/Read/ReaderPipe.cs:line 47\n— End of stack trace from previous location —\n at GreenPipes.Filters.AsyncDelegateFilter1.<>c__DisplayClass3_0.<g__SendAsync|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at EventStore.Replicator.LoggingFilter1.Send(T context, IPipe1 next) in /app/src/EventStore.Replicator/Logging.cs:line 13","Type":"ReaderContext","Message":"Specified method is not supported.","SourceContext":"EventStore.Replicator.LoggingFilter1[T]”}
{“m”:“Error: “Specified method is not supported.”, will retry”,“i”:“c337094d”,“l”:“Error”,“x”:“System.NotSupportedException: Specified method is not supported.\n at EventStore.Client.EventStoreClient.ReadInternal(ReadReq request, EventStoreClientOperationOptions operationOptions, UserCredentials userCredentials, CancellationToken cancellationToken)+System.IAsyncDisposable.DisposeAsync()\n at EventStore.Client.EventStoreClient.ReadAllAsync(Direction direction, Position position, Int64 maxCount, EventStoreClientOperationOptions operationOptions, Boolean resolveLinkTos, UserCredentials userCredentials, CancellationToken cancellationToken)+MoveNext()\n at EventStore.Client.EventStoreClient.ReadAllAsync(Direction direction, Position position, Int64 maxCount, EventStoreClientOperationOptions operationOptions, Boolean resolveLinkTos, UserCredentials userCredentials, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()\n at Ubiquitous.Metrics.Metrics.Measure[T](Func1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count)\n at EventStore.Replicator.Esdb.Grpc.GrpcEventReader.ReadEvents(Position fromPosition, Func2 next, CancellationToken cancellationToken) in /app/src/EventStore.Replicator.Esdb.Grpc/GrpcEventReader.cs:line 66\n at EventStore.Replicator.Read.ReaderPipe.<>c__DisplayClass1_0.<<-ctor>g__Reader|1>d.MoveNext() in /app/src/EventStore.Replicator/Read/ReaderPipe.cs:line 47\n— End of stack trace from previous location —\n at GreenPipes.Filters.AsyncDelegateFilter1.<>c__DisplayClass3_0.<g__SendAsync|0>d.MoveNext()\n--- End of stack trace from previous location ---\n at EventStore.Replicator.LoggingFilter1.Send(T context, IPipe1 next) in /app/src/EventStore.Replicator/Logging.cs:line 13\n at GreenPipes.Filters.RetryFilter1.GreenPipes.IFilter.Send(TContext context, IPipe`1 next)”,“Error”:“Specified method is not supported.”,“SourceContext”:“EventStore.Replicator.Observers.LoggingRetryObserver”}

Any help appreciated!

Hi James, we have not advertised Replicator for the public, it has only been shared with ES Cloud customers looking to migrate their data. Could you please follow up with us (me and Margareth) by email? Thanks.

1 Like

Btw I am facing the same issue in eventuous right now, which didn’t happen before. It throws in Serilog, and I am trying to find the root cause.

I have worked around the above issue for now by switching to TCP.
I now face an issue using the in-proc transform:

“One or more errors occurred. (The best overloaded method match for ‘System.Text.Encoding.GetBytes(char[])’ has some invalid arguments)”, will fail"
“System.AggregateException: One or more errors occurred. (The best overloaded method match for ‘System.Text.Encoding.GetBytes(char[])’ has some invalid arguments)
—> Microsoft.CSharp.RuntimeBinder.RuntimeBinderException: The best overloaded method match for ‘System.Text.Encoding.GetBytes(char[])’ has some invalid arguments
at CallSite.Target(Closure , CallSite , Encoding , Object )
at System.Dynamic.UpdateDelegates.UpdateAndExecute2[T0,T1,TRet](CallSite site, T0 arg0, T1 arg1)\n at EventStore.Replicator.JavaScript.JavaScriptTransform.Transform(OriginalEvent originalEvent, CancellationToken cancellationToken) in /app/src/EventStore.Replicator.JavaScript/JavaScriptTransform.cs:line 26
at EventStore.Replicator.Prepare.TransformFilter.<>c__DisplayClass2_0.<g__Transform|2>d.MoveNext() in /app/src/EventStore.Replicator/Prepare/TransformFilter.cs:line 37
— End of inner exception stack trace —
at System.Threading.Tasks.Task1.GetResultCore(Boolean waitCompletionNotification) at System.Threading.Tasks.Task1.get_Result()\n at Ubiquitous.Metrics.Metrics.Measure[T](Func1 action, IHistogramMetric metric, ICountMetric errorCount, String[] labels, Int32 count) at EventStore.Replicator.Prepare.TransformFilter.Send(PrepareContext context, IPipe1 next) in /app/src/EventStore.Replicator/Prepare/TransformFilter.cs:line 18
at EventStore.Replicator.Prepare.EventFilterFilter.Send(PrepareContext context, IPipe1 next) in /app/src/EventStore.Replicator/Prepare/EventFilterFilter.cs:line 23 at GreenPipes.Filters.ConcurrencyLimitFilter1.Send(TContext context, IPipe1 next) at EventStore.Replicator.LoggingFilter1.Send(T context, IPipe1 next) in /app/src/EventStore.Replicator/Logging.cs:line 13 at GreenPipes.Filters.RetryFilter1.GreenPipes.IFilter.Send(TContext context, IPipe`1 next)”,“Error”:“One or more errors occurred. (The best overloaded method match for ‘System.Text.Encoding.GetBytes(char[])’ has some invalid arguments)”,“SourceContext”:“EventStore.Replicator.Observers.LoggingRetryObserver”}

It looks like we both are hitting a weird event, which has (maybe) no body.
I will try to improve the diagnostics, so you’ll be able to see where it fails. It might be the cause of the original issue as well.

Thanks - have dropped you an email.

There are three issues we have encountered so far:

  1. GRPC client - unable to get this working but have used TCP for now.
  2. In-proc transforms (https://replicator.eventstore.org/docs/features/transforms/js/) missing file - We ran into an exception related to the js file not being deployed in the container. To fix we had to download the helm charts and create a configmap to deploy the local js file. It would be good to understand if we were doing something wrong or there is an issue with the helm charts.
  3. In-proc transforms dynamic binding issue - We have tried the example (https://replicator.eventstore.org/docs/features/transforms/js/) which fails with error in above post. I’ve validated the correct parameters are getting passed to the transform:

function transform(stream, eventType, data, meta) {
var log = stream:${stream};eventType:${eventType};data:${data};meta:${meta};;
throw log:${log}
}

This give an output in the k8 logs and you can see the source event, etc looking well formed. I believe the issue is related to some dynamic binding on the return value from the call.

Any thoughts?

It makes sense to create a volume and put the files there. I will try to put it to the chart.

Concerning the transform GetBytes issue it seems like an empty event body.

1 Like