persistent subscription fails on streams with only linked events?

Just a quick description of what I encountered, I moved on since I couldn’t get it to work right away. If necessary I could try to find time to build an example.

I build from source the dev branch on Wednesday 11/26. I turned on the by_category projection so I could get a single stream of all events prefixed ‘ABC-’ spanning multiple aggregates. I pointed a “competing consumer” / persistent subscription at it. At first it just blocked forever, but I rubbed some magic dust on it and got it to the point where it would read the [buffer] of events and then stop. No matter what I did it would never go past the buffer, as if it weren’t acking back to the server. If I pointed a persistent subscription to a single aggregate’s stream that contained all real events and not links, everything worked beautifully. I tried modifying the Resolve Links setting back and forth in the persistent subscriptions config builder to no avail.

Again, I had to move on, but if asked nicely I could probably find an hour to come back to that code and rewrite it as an example. Just thought I would share my experience.

Chris.

I will run some tests. Did you have resolve linktos on or off

Cheers,

Greg

Tried both ways, didn’t seem to affect the problem

Do you have your code for creating the subscription?

if resolvelinkto is off it should read just like any other stream (no
differences)

Since you asked nicely, here is a recreation of my bug:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Net;

using System.Text;

using System.Threading.Tasks;

using EventStore.ClientAPI;

using EventStore.ClientAPI.SystemData;

namespace competing_test

{

class Program

{

    static void Main(string[] args)

    {

        var creds = new UserCredentials("admin", "changeit");

        using (var conn = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113)))

        {

            conn.ConnectAsync().Wait();

            var stream = "$ce-hello";

            var group = "hellotest";

            var settings = PersistentSubscriptionSettingsBuilder.Create().ResolveLinkTos().StartFromBeginning();

            var projectionsManager = new ProjectionsManager(new ESLogger(), new IPEndPoint(IPAddress.Loopback, 2113), TimeSpan.FromMinutes(1));

            try { projectionsManager.EnableAsync("$by_category", creds).Wait(); }

            catch { } //already enabled

            try { conn.CreatePersistentSubscriptionAsync(stream, group, settings, creds); }

            catch { } //already created

            var persistentSubscription = conn.ConnectToPersistentSubscription(group, stream, eventAppeared, subscriptionDropped, creds, 5); // 5 buffer

            while (true)

            {

                Console.WriteLine("Press enter to create new aggregate");

                Console.ReadLine();

                var g = Guid.NewGuid();

                conn.AppendToStreamAsync("hello-" + g, ExpectedVersion.Any, new EventData(Guid.NewGuid(), "hellocreated", true, Encoding.UTF8.GetBytes("{}"), null)).Wait();

            }

        }

    }

    private static void subscriptionDropped(EventStorePersistentSubscription sub, SubscriptionDropReason reason, Exception ex)

    {

        Console.WriteLine("subscription dropped");

    }

    private static void eventAppeared(EventStorePersistentSubscription sub, ResolvedEvent evt)

    {

        Console.WriteLine(evt.Link.EventId);

    }

}

class ESLogger : ILogger

{

    public void Debug(Exception ex, string format, params object[] args) { }

    public void Debug(string format, params object[] args) { }

    public void Error(Exception ex, string format, params object[] args) { }

    public void Error(string format, params object[] args) { }

    public void Info(Exception ex, string format, params object[] args) { }

    public void Info(string format, params object[] args) { }

}

}

``

The first five times you hit enter, you’ll get the corresponding guid back from the subscription. After that, nada. Run again and you’ll immediately get the first five again, but nothing after that. For me anyway.

Changing the “ResolveLinkTos” in any way (removing it, or change it to “DoNot” causes the subscription to drop immediately).

Chris

I will repro this evening. Are you getting any errors in your log?

Just “retry” messages when I kill the console app. I ran it twice, here is the log"

[PID:00712:001 2014.12.01 17:44:05.089 INFO ProgramBase`1 ]

ES VERSION: 0.0.0.0 (dev/9ec1716cd4fbfcaf6d40b2a6403764b049d8cc72, Sun, 23 Nov 2014 12:05:59 +0000)

OS: Windows (Microsoft Windows NT 6.2.9200.0)

RUNTIME: .NET 4.0.30319.34014 (64-bit)

GC: 3 GENERATIONS

LOGS: C:\Users\cray\Desktop\cctest\cctest-logs

MODIFIED OPTIONS:

RUNPROJECTIONS: all (Command Line)

DB: C:\Users\cray\Desktop\cctest\cctest (Command Line)

DEFAULT OPTIONS:

HELP: False ()

VERSION: False ()

LOG: ()

CONFIG: ()

DEFINES: ()

WHAT IF: False ()

INT IP: 127.0.0.1 ()

EXT IP: 127.0.0.1 ()

INT HTTP PORT: 2112 ()

EXT HTTP PORT: 2113 ()

INT TCP PORT: 1112 ()

INT SECURE TCP PORT: 0 ()

EXT TCP PORT: 1113 ()

EXT SECURE TCP PORT: 0 ()

INT TCP HEARTBEAT TIMEOUT: 700 ()

EXT TCP HEARTBEAT TIMEOUT: 1000 ()

INT TCP HEARTBEAT INTERVAL: 700 ()

EXT TCP HEARTBEAT INTERVAL: 2000 ()

FORCE: False ()

CLUSTER SIZE: 1 ()

NODE PRIORITY: 0 ()

MIN FLUSH DELAY MS: 2 ()

COMMIT COUNT: -1 ()

PREPARE COUNT: -1 ()

ADMIN ON EXT: True ()

STATS ON EXT: True ()

GOSSIP ON EXT: True ()

DISABLE SCAVENGE MERGING: False ()

DISCOVER VIA DNS: True ()

CLUSTER DNS: fake.dns ()

CLUSTER GOSSIP PORT: 30777 ()

GOSSIP SEED: ()

STATS PERIOD SEC: 30 ()

CACHED CHUNKS: -1 ()

CHUNKS CACHE SIZE: 536871424 ()

MAX MEM TABLE SIZE: 1000000 ()

MEM DB: False ()

SKIP DB VERIFY: False ()

PROJECTION THREADS: 3 ()

WORKER THREADS: 5 ()

HTTP PREFIXES: ()

ENABLE TRUSTED AUTH: False ()

CERTIFICATE STORE LOCATION: ()

CERTIFICATE STORE NAME: ()

CERTIFICATE SUBJECT NAME: ()

CERTIFICATE THUMBPRINT: ()

CERTIFICATE FILE: ()

CERTIFICATE PASSWORD: ()

USE INTERNAL SSL: False ()

SSL TARGET HOST: n/a ()

SSL VALIDATE SERVER: True ()

AUTHENTICATION TYPE: internal ()

PREPARE TIMEOUT MS: 2000 ()

COMMIT TIMEOUT MS: 2000 ()

UNSAFE DISABLE FLUSH TO DISK: False ()

GOSSIP INTERVAL MS: 1000 ()

GOSSIP ALLOWED DIFFERENCE MS: 60000 ()

GOSSIP TIMEOUT MS: 500 ()

[PID:00712:001 2014.12.01 17:44:05.151 INFO ProgramBase`1 ] Quorum size set to 1

[PID:00712:001 2014.12.01 17:44:05.151 INFO ProgramBase`1 ] Can’t find plugins path: c:\es\bin\clusternode\plugins

[PID:00712:001 2014.12.01 17:44:05.167 INFO ProgramBase`1 ]

INSTANCE ID: 0fefc0dc-c778-4eed-869c-b10b521ef135

DATABASE: C:\Users\cray\Desktop\cctest\cctest

WRITER CHECKPOINT: 0 (0x0)

CHASER CHECKPOINT: 0 (0x0)

EPOCH CHECKPOINT: -1 (0xFFFFFFFFFFFFFFFF)

TRUNCATE CHECKPOINT: -1 (0xFFFFFFFFFFFFFFFF)

[PID:00712:001 2014.12.01 17:44:05.276 TRACE MessageHierarchy ] MessageHierarchy initialization took 00:00:00.0632949.

[PID:00712:001 2014.12.01 17:44:05.370 TRACE TFChunk ] CACHED TFChunk #0-0 (chunk-000000.000000) in 00:00:00.0031831.

[PID:00712:001 2014.12.01 17:44:05.511 INFO MiniWeb ] Starting MiniWeb for /web/es/js/projections ==> c:\es\bin\clusternode\projections

[PID:00712:001 2014.12.01 17:44:05.511 INFO MiniWeb ] Starting MiniWeb for /web/es/js/projections/v8/Prelude ==> c:\es\bin\clusternode\Prelude

[PID:00712:001 2014.12.01 17:44:05.511 INFO MiniWeb ] Starting MiniWeb for /web/es/js/projections/resources ==> c:\es\bin\clusternode\web-resources\js

[PID:00712:001 2014.12.01 17:44:05.511 TRACE MiniWeb ] Binding MiniWeb to /web/es/js/projections/{*remaining_path}

[PID:00712:001 2014.12.01 17:44:05.511 TRACE MiniWeb ] Binding MiniWeb to /web/es/js/projections/v8/Prelude/{*remaining_path}

[PID:00712:001 2014.12.01 17:44:05.511 TRACE MiniWeb ] Binding MiniWeb to /web/es/js/projections/resources/{*remaining_path}

[PID:00712:001 2014.12.01 17:44:05.526 INFO MiniWeb ] Starting MiniWeb for /web ==> c:\es\bin\clusternode\clusternode-web

[PID:00712:001 2014.12.01 17:44:05.526 TRACE MiniWeb ] Binding MiniWeb to /web/{*remaining_path}

[PID:00712:001 2014.12.01 17:44:05.526 INFO MiniWeb ] Starting MiniWeb for /web/users ==> c:\es\EventStore\src\EventStore.Web\Users\web

[PID:00712:001 2014.12.01 17:44:05.526 TRACE MiniWeb ] Binding MiniWeb to /web/users/{*remaining_path}

[PID:00712:015 2014.12.01 17:44:05.542 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] SYSTEM INIT…

[PID:00712:015 2014.12.01 17:44:05.557 INFO TcpServerListener ] Starting Normal TCP listening on TCP endpoint: 127.0.0.1:1113.

[PID:00712:020 2014.12.01 17:44:05.573 INFO IndexCommitter ] TableIndex initialization…

[PID:00712:015 2014.12.01 17:44:05.589 INFO HttpAsyncServer ] Starting HTTP server on [http://127.0.0.1:2113/]…

[PID:00712:015 2014.12.01 17:44:05.589 INFO HttpAsyncServer ] HTTP server is up and listening on [http://127.0.0.1:2113/]

[PID:00712:020 2014.12.01 17:44:05.589 INFO IndexCommitter ] ReadIndex building…

[PID:00712:020 2014.12.01 17:44:05.589 DEBUG IndexCommitter ] ReadIndex rebuilding done: total processed 0 records, time elapsed: 00:00:00.

[PID:00712:015 2014.12.01 17:44:05.604 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [MainQueue]: SystemInit - 62ms. Q: 0/4.

[PID:00712:015 2014.12.01 17:44:05.604 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] Service ‘StorageReader’ initialized.

[PID:00712:015 2014.12.01 17:44:05.604 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] Service ‘StorageWriter’ initialized.

[PID:00712:015 2014.12.01 17:44:05.620 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] Service ‘StorageChaser’ initialized.

[PID:00712:015 2014.12.01 17:44:05.620 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] SYSTEM START…

[PID:00712:015 2014.12.01 17:44:05.636 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] IS UNKNOWN!!! WHOA!!!

[PID:00712:006 2014.12.01 17:44:05.636 DEBUG PersistentSubscripti] Subscriptions received state change to Unknown stopping listening.

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: STARTING ELECTIONS.

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: (V=0) SHIFT TO LEADER ELECTION.

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: (V=0) VIEWCHANGE FROM [127.0.0.1:2112, {0fefc0dc-c778-4eed-869c-b10b521ef135}].

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: (V=0) MAJORITY OF VIEWCHANGE.

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: (V=0) SHIFT TO PREPARE PHASE.

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: (V=0) PREPARE_OK FROM 127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}.

[PID:00712:015 2014.12.01 17:44:05.667 DEBUG ElectionsService ] ELECTIONS: (V=0) SHIFT TO REG_LEADER.

[PID:00712:015 2014.12.01 17:44:05.682 DEBUG ElectionsService ] ELECTIONS: (V=0) SENDING PROPOSAL CANDIDATE: 127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}, ME: 127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}.

[PID:00712:015 2014.12.01 17:44:05.682 DEBUG ElectionsService ] ELECTIONS: (V=0) ACCEPT FROM [127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}] M=[127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}]).

[PID:00712:015 2014.12.01 17:44:05.682 INFO ElectionsService ] ELECTIONS: (V=0) DONE. ELECTED MASTER = 127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}. ME=127.0.0.1:2112,{0fefc0dc-c778-4eed-869c-b10b521ef135}.

[PID:00712:015 2014.12.01 17:44:05.682 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] PRE-MASTER STATE, WAITING FOR CHASER TO CATCH UP…

[PID:00712:005 2014.12.01 17:44:05.682 DEBUG PersistentSubscripti] Subscriptions received state change to PreMaster stopping listening.

[PID:00712:015 2014.12.01 17:44:05.682 INFO ClusterVNodeControll] ========== [127.0.0.1:2112] IS MASTER!!! SPARTA!!!

[PID:00712:006 2014.12.01 17:44:05.698 DEBUG PersistentSubscripti] Subscriptions Became Master so now handling subscriptions

[PID:00712:006 2014.12.01 17:44:05.698 DEBUG PersistentSubscripti] Subscriptions Became Master so now handling subscriptions

[PID:00712:018 2014.12.01 17:44:05.776 TRACE InMemoryBus ] SLOW BUS MSG [bus]: StartCore - 62ms. Handler: ProjectionCoreServiceCommandReader.

[PID:00712:019 2014.12.01 17:44:05.776 TRACE InMemoryBus ] SLOW BUS MSG [bus]: StartCore - 62ms. Handler: ProjectionCoreServiceCommandReader.

[PID:00712:018 2014.12.01 17:44:05.776 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [Projection Core #1]: StartCore - 78ms. Q: 1/5.

[PID:00712:019 2014.12.01 17:44:05.776 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [Projection Core #2]: StartCore - 78ms. Q: 1/5.

[PID:00712:017 2014.12.01 17:44:05.776 TRACE InMemoryBus ] SLOW BUS MSG [bus]: StartCore - 62ms. Handler: ProjectionCoreServiceCommandReader.

[PID:00712:017 2014.12.01 17:44:05.776 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [Projection Core #0]: StartCore - 78ms. Q: 1/5.

[PID:00712:013 2014.12.01 17:44:06.011 DEBUG EpochManager ] === Writing E0@0:{2db5d54f-0ade-4513-a8dc-270dc7162243} (previous epoch at -1).

[PID:00712:013 2014.12.01 17:44:06.011 DEBUG EpochManager ] === Update Last Epoch E0@0:{2db5d54f-0ade-4513-a8dc-270dc7162243} (previous epoch at -1).

[PID:00712:012 2014.12.01 17:44:06.026 TRACE QueuedHandlerThreadP] SLOW QUEUE MSG [MonitoringQueue]: SystemInit - 437ms. Q: 0/3.

[PID:00712:006 2014.12.01 17:44:06.229 TRACE MonitoringService ] Created stats stream ‘$stats-127.0.0.1:2113’, code = Success

[PID:00712:015 2014.12.01 17:44:06.276 INFO UserManagementServic] ‘admin’ user account has been created

[PID:00712:016 2014.12.01 17:44:06.292 INFO ProjectionManager ] Projection manager is initializing from the empty $projections-$all stream

[PID:00712:017 2014.12.01 17:44:06.432 TRACE InMemoryBus ] SLOW BUS MSG [bus]: CreateAndPrepare - 62ms. Handler: ProjectionCoreService.

[PID:00712:019 2014.12.01 17:44:06.432 TRACE InMemoryBus ] SLOW BUS MSG [bus]: CreateAndPrepare - 62ms. Handler: ProjectionCoreService.

[PID:00712:017 2014.12.01 17:44:06.432 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [Projection Core #0]: CreateAndPrepare - 62ms. Q: 5/8.

[PID:00712:018 2014.12.01 17:44:06.432 TRACE InMemoryBus ] SLOW BUS MSG [bus]: CreateAndPrepare - 62ms. Handler: ProjectionCoreService.

[PID:00712:019 2014.12.01 17:44:06.432 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [Projection Core #2]: CreateAndPrepare - 62ms. Q: 3/8.

[PID:00712:018 2014.12.01 17:44:06.432 TRACE QueuedHandlerMRES ] SLOW QUEUE MSG [Projection Core #1]: CreateAndPrepare - 62ms. Q: 3/8.

[PID:00712:016 2014.12.01 17:44:06.495 INFO ProjectionManager ] ‘$by_category’ projection source has been written

[PID:00712:016 2014.12.01 17:44:06.511 INFO ProjectionManager ] ‘$users’ projection source has been written

[PID:00712:016 2014.12.01 17:44:06.511 INFO ProjectionManager ] ‘$stream_by_category’ projection source has been written

[PID:00712:016 2014.12.01 17:44:06.511 INFO ProjectionManager ] ‘$streams’ projection source has been written

[PID:00712:016 2014.12.01 17:44:06.511 INFO ProjectionManager ] ‘$by_event_type’ projection source has been written

[PID:00712:003 2014.12.01 17:44:08.620 INFO TcpService ] External TCP connection accepted: [Normal, 127.0.0.1:8249, L127.0.0.1:1113, {495e464b-6e95-49de-9658-bef2177aff38}].

[PID:00712:016 2014.12.01 17:44:08.620 INFO ProjectionManager ] Enabling ‘$by_category’ projection

[PID:00712:016 2014.12.01 17:44:08.651 INFO ProjectionManager ] ‘$by_category’ projection source has been written

[PID:00712:021 2014.12.01 17:44:08.870 DEBUG PersistentSubscripti] create subscription $ce-hello::hellotest

[PID:00712:021 2014.12.01 17:44:08.886 DEBUG PersistentSubscripti] New persistent subscription hellotest.

[PID:00712:021 2014.12.01 17:44:08.886 DEBUG PersistentSubscripti] Saving Confiugration.

[PID:00712:021 2014.12.01 17:44:08.901 DEBUG PersistentSubscripti] New connection to persistent subscription hellotest.

[PID:00712:015 2014.12.01 17:44:08.901 DEBUG PersistentSubscripti] Subscription $ce-hello::hellotest: read no checksum.

[PID:00712:015 2014.12.01 17:44:08.901 DEBUG PersistentSubscripti] strtfrom = 0

[PID:00712:023 2014.12.01 17:44:19.321 INFO TcpConnection ] ES TcpConnection closed [17:44:19.305: N127.0.0.1:8249, L127.0.0.1:1113, {495e464b-6e95-49de-9658-bef2177aff38}]:

Received bytes: 1708, Sent bytes: 3080

Send calls: 19, callbacks: 19

Receive calls: 20, callbacks: 20

Close reason: [ConnectionReset] Socket receive error

[PID:00712:023 2014.12.01 17:44:19.321 INFO TcpConnectionManager] Connection ‘external-normal’ [127.0.0.1:8249, {495e464b-6e95-49de-9658-bef2177aff38}] closed: ConnectionReset.

[PID:00712:025 2014.12.01 17:44:19.337 DEBUG PersistentSubscripti] Lost connection from 127.0.0.1:8249

[PID:00712:025 2014.12.01 17:44:19.352 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:32020/P:32020

[PID:00712:025 2014.12.01 17:44:19.352 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:33038/P:33038

[PID:00712:025 2014.12.01 17:44:19.352 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:34056/P:34056

[PID:00712:025 2014.12.01 17:44:19.352 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:34479/P:34479

[PID:00712:025 2014.12.01 17:44:19.352 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:35525/P:35525

[PID:00712:003 2014.12.01 17:44:28.312 INFO TcpService ] External TCP connection accepted: [Normal, 127.0.0.1:8252, L127.0.0.1:1113, {44db29f2-cef3-4036-b486-3fd5c30dba37}].

[PID:00712:016 2014.12.01 17:44:28.328 INFO ProjectionManager ] Enabling ‘$by_category’ projection

[PID:00712:007 2014.12.01 17:44:28.422 DEBUG PersistentSubscripti] create subscription $ce-hello::hellotest

[PID:00712:010 2014.12.01 17:44:28.422 DEBUG PersistentSubscripti] New connection to persistent subscription hellotest.

[PID:00712:003 2014.12.01 17:44:42.678 INFO TcpConnection ] ES TcpConnection closed [17:44:42.678: N127.0.0.1:8252, L127.0.0.1:1113, {44db29f2-cef3-4036-b486-3fd5c30dba37}]:

Received bytes: 689, Sent bytes: 2868

Send calls: 13, callbacks: 13

Receive calls: 12, callbacks: 12

Close reason: [ConnectionReset] Socket receive error

[PID:00712:003 2014.12.01 17:44:42.678 INFO TcpConnectionManager] Connection ‘external-normal’ [127.0.0.1:8252, {44db29f2-cef3-4036-b486-3fd5c30dba37}] closed: ConnectionReset.

[PID:00712:012 2014.12.01 17:44:42.678 DEBUG PersistentSubscripti] Lost connection from 127.0.0.1:8252

[PID:00712:012 2014.12.01 17:44:42.678 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:32020/P:32020

[PID:00712:012 2014.12.01 17:44:42.678 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:33038/P:33038

[PID:00712:012 2014.12.01 17:44:42.678 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:34056/P:34056

[PID:00712:012 2014.12.01 17:44:42.678 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:34479/P:34479

[PID:00712:012 2014.12.01 17:44:42.678 DEBUG PersistentSubscripti] Retrying message $ce-hello::hellotest $ce-hello/C:35525/P:35525

``

Quick thing

                try { conn.CreatePersistentSubscriptionAsync(stream,
group, settings, creds); }
                catch { } //already created

This won't fail unless you take the result (just an async call). So
likely your "subscription dropped" you mention was something failing
here.

Might want to look at the reason you got back. For a $ce stream if in
a previous iteration of the code you were not passing user credentials
this would cause a failure like this.

Cheers,

Greg

Ok. I modified it to take the result. I get “Success” so long as it’s running against a fresh eventstore, an exception is thrown otherwise (500 internal server error). Regardless, it doesn’t seem to affect the bug. I’m starting eventstore fresh each time, handing in admin credentials always, no change.

Chris

Its over TCP are you sure its a 500 internal server error? Can you
copy/paste it? Would be pretty weird if you got a "500 internal server
error" over TCP

Sorry, your right, that was on the previous call to enabling the $by_category projection. The CreatePersistentSubscriptionAsync does throw but the message is:
“Subscription group hellotest on stream $ce-hello alreay exists”

Chris.

OK I am pushing a fix for when resolvelinkto is set it works fine now.

also it was working perfectly well when it was not set however except
for your handler:

        private static void
eventAppeared(EventStorePersistentSubscription sub, ResolvedEvent evt)
        {
            Console.WriteLine(evt.Link.EventId);
        }

Link was null. It was therefore dropping your subscription with the
message of an error in handler and the exception from your handler.

Whether this is a desirable behaviour vs a NAK is another story as a
default when autoack is set. What does everyone think?

Greg

Greg, that fix did in fact solve the problem, but it may have introduced another. I cannot seem to get to the original event via the link. See the updated code. It’s almost as if the link just keeps pointing back to itself instead of the actual event?

using System;

using System.Collections.Generic;

using System.Linq;

using System.Net;

using System.Text;

using System.Threading.Tasks;

using EventStore.ClientAPI;

using EventStore.ClientAPI.SystemData;

namespace competing_test

{

class Program

{

    static void Main(string[] args)

    {

        var creds = new UserCredentials("admin", "changeit");

        using (var conn = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113)))

        {

            conn.ConnectAsync().Wait();

            var stream = "$ce-hello";

            var group = "hellotest";

            var settings = PersistentSubscriptionSettingsBuilder.Create().ResolveLinkTos().StartFromBeginning();

            

            var projectionsManager = new ProjectionsManager(new ESLogger(), new IPEndPoint(IPAddress.Loopback, 2113), TimeSpan.FromMinutes(1));

            try { projectionsManager.EnableAsync("$by_category", creds).Wait(); }

            catch { }//already enabled

            try { var a = conn.CreatePersistentSubscriptionAsync(stream, group, settings, creds).Result; }

            catch {} //already created

            //update to restart subscription from beginning

            //var a = conn.UpdatePersistentSubscriptionAsync(stream, group, settings, creds).Result;

            var persistentSubscription = conn.ConnectToPersistentSubscription(group, stream, eventAppeared, subscriptionDropped, creds, 5); // 5 buffer

            while (true)

            {

                Console.WriteLine("Press enter to create new aggregate");

                Console.ReadLine();

                var g = Guid.NewGuid();

                var data = System.Text.Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(new HelloWorld { Hello = gethello() }));

                conn.AppendToStreamAsync("hello-" + g, ExpectedVersion.Any, new EventData(Guid.NewGuid(), "hellocreated", true, data, null)).Wait();

            }

        }

    }

    static int i = 0;

    private static string gethello()

    {

        return "hello " + ++i;

    }

    private static void subscriptionDropped(EventStorePersistentSubscription sub, SubscriptionDropReason reason, Exception ex)

    {

        Console.WriteLine("subscription dropped: " + reason + " " + ex.Message);

    }

    private static void eventAppeared(EventStorePersistentSubscription sub, ResolvedEvent evt)

    {

        //how do I get the event data?

        var link = evt.Link;

        var data = link.Data;

        var obj = Newtonsoft.Json.JsonConvert.DeserializeObject<HelloWorld>(System.Text.Encoding.UTF8.GetString(data));

        Console.WriteLine("{0} {1}@{2} {3} <Hello> {4}", link.EventId, link.EventNumber, link.EventStreamId, link.EventType, data, obj.Hello);

    }

}

class HelloWorld

{

    public string Hello { get; set; }

}

class ESLogger : ILogger

{

    public void Debug(Exception ex, string format, params object[] args) { }

    public void Debug(string format, params object[] args) { }

    public void Error(Exception ex, string format, params object[] args) { }

    public void Error(string format, params object[] args) { }

    public void Info(Exception ex, string format, params object[] args) { }

    public void Info(string format, params object[] args) { }

}

}

``

Chris.

I'm flying today and will be unable to look at this at minimum until
2200 @james or others feel free to look into it. Competing does
nothing special with linktos (just uses the dispatcher/subscription
service) my guess is its a misunderstanding etc here.

Greg

and the Link represents the Link part of the resolved event if you
look there is more than one entry on resolved event such as Event

Cheers,

Greg

Oh I guarantee the misunderstanding is mine :slight_smile:

I assumed that the ResolvedEvent would contain the data of the original event that spawned the link. I guess not. The information on all three event properties (Event/OriginalEvent/Link) is the same, just metadata pointing to where I can go find the original event should I want to look it up. Unfortunately my connection handle is outside the scope here.

System.Text.Encoding.UTF8.GetString(evt.Event.Data)

“0@hello-72fcf578-58be-4c58-85fe-2ed01804a46d”

System.Text.Encoding.UTF8.GetString(evt.OriginalEvent.Data)

"0@hello-72fcf578-58be-4c58-85fe-2ed01804a46d"

System.Text.Encoding.UTF8.GetString(evt.Link.Data)

“0@hello-72fcf578-58be-4c58-85fe-2ed01804a46d”

``

Safe travels, none of this is holding up anything I’m doing. Thanks for investing any time at all!

Chris.

I'll take a look at it if I get a chance this evening otherwise it may
be a few days due to travel.

If anyone @james etc wants to follow up on this the place to look is
right here https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/PersistentSubscription/PersistentSubscriptionService.cs#L429

Cheers,

Greg

I fixed the issue, at least for me. I don’t know what it might break upstream. Please take a look. Pull request #305: https://github.com/EventStore/EventStore/pull/305

Chris.