Projection emit problem

Hey

I have a problem with a projection. I created a continuous projection with emit enabled:

fromStream(‘comp2_intermediate_out’)

.when({

$any: function (s, e) {

if (e.metadata.destinationStream !== “undefined”) {

emit(e.metadata.destinationStream, e.eventType, e.data, e.matadata);

}

}

})

there are two streams in the system:

  • comp1

  • 'comp2_intermediate_out

The code first writes 5 events to comp1 (ExpectedVersion=Any)

and then writes some events to comp2_intermediate_out. These events have metadata property DestinationStream set to “comp1”. So essentially the projection is forwarding the events to their destination queue.

Now, if run this, I get exception from StreamPositionTagger::AdjustTag (line 114) and the projection reports it processed the events but nothing got emitted.

If I comment out the first part that emits directly to comp1 stream, the projection works smooth and no exception occur.

What can be wrong?

Szymon

fromStream(‘comp2_intermediate_out’)

.when({

$any: function (s, e) {

if (e.metadata.destinationStream !== “undefined”) {

If you want to check for undefined in Javascript I think you should do: if (typeof e.metadata.destinationStream !== “undefined”) -or simply- if (e.metadata.destinationStream)

emit(e.metadata.destinationStream, e.eventType, e.data, e.matadata);

You’ve written e.matadata. That should be e.metadata, right?

}

}

})

/Daniel

Thanks for pointing it out Daniel. Fixed the projection but the problem remains the same. For some reason if there are any events in the stream before the projection is applied, it crashes when appending events.

In addition, as a result of the exception the whole projection infrastructure dies e.g. I cannot even reset the faulty projection

I’ll check what blocks resetting a projection.

What is metadata of the last event in the stream your projection attempts/should emit to?

-yuriy

Will check. On the other hand, I would like to send you a failing test but I have problems with creating a projection in MiniNode. I’ve added the projection subsystem but keep getting 401 when trying to create new continuous projection.

Szymon

are you passing credentials? actually im not sure if mininode even sets up credentials off the top of my head

Yeah, passing default admin creds but does not help. Tried to debug is but ProjectionController handler methods don’t get hit so I guess the 401 is returned by somewhere upstream

Szymon

Update: when using anonymous ProjectionManager::Handle(ProjectionManagementMessage.Post message) returns (causes 401) because of lack of creds. In case of basic, the InternalAuthenticationProvider retuns 401. Do I need to somehow create an admin account when using MiniNode? Replacing InternalAuthenticationProvider with a stub in SingleVNode would be quite tedious.

Szymon

My guess is yes the same way single node and cluster node do it. I have not had a chance to go looking through mininode yet today.

Managed to get the auth working (which turned out to be as easy as setting one bool flag). Anyway, I attached a failing test. It takes ~10 seconds to run because of few Thread.Sleeps but it shows where the problem is:

[31120,16,19:37:07.364,ERROR] Error while processing message EventStore.Core.Messages.ClientMessage+ReadStreamEventsBackwardCompleted in queued handler ‘Projection Core #0’.

EXCEPTION(S) OCCURRED:

System.NullReferenceException: Object reference not set to an instance of an object.

at EventStore.Projections.Core.Services.Processing.StreamPositionTagger.AdjustTag(CheckpointTag tag) in c:\Projects\EventStore\src\EventStore\EventStore.Projections.Core\Services\Processing\StreamPositionTagger.cs:line 89

at EventStore.Projections.Core.Services.Processing.CheckpointTagVersion.AdjustBy(PositionTagger tagger, ProjectionVersion version) in c:\Projects\EventStore\src\EventStore\EventStore.Projections.Core\Services\Processing\CheckpointTagExtensions.cs:line 52

at EventStore.Projections.Core.Services.Processing.EmittedStream.ReadStreamEventsBackwardCompleted(ReadStreamEventsBackwardCompleted message, CheckpointTag upTo) in c:\Projects\EventStore\src\EventStore\EventStore.Projections.Core\Services\Processing\EmittedStream.cs:line 334

at EventStore.Projections.Core.Services.Processing.EmittedStream.<>c__DisplayClass1.b__0(ReadStreamEventsBackwardCompleted completed) in c:\Projects\EventStore\src\EventStore\EventStore.Projections.Core\Services\Processing\EmittedStream.cs:line 416

at EventStore.Core.Messaging.RequestResponseDispatcher`2.Handle(TResponse message) in c:\Projects\EventStore\src\EventStore\EventStore.Core\Messaging\RequestResponseDispatcher.cs:line 85

at EventStore.Core.Bus.MessageHandler`1.TryHandle(Message message) in c:\Projects\EventStore\src\EventStore\EventStore.Core\Bus\MessageHandler.cs:line 59

at EventStore.Core.Bus.InMemoryBus.Publish(Message message) in c:\Projects\EventStore\src\EventStore\EventStore.Core\Bus\InMemoryBus.cs:line 349

at EventStore.Core.Bus.QueuedHandlerMRES.ReadFromQueue(Object o) in c:\Projects\EventStore\src\EventStore\EventStore.Core\Bus\QueuedHandlerMRES.cs:line 143

If the code that emits the events directly to dest stream is commented out, everything works perfectly fine.

Cheers,

Szymon

EmitBugRepro.cs (5.23 KB)

Did you have a chance to look at it? This issue is starting to block me. Looking at the EmittedStream, it assumes that if a stream has already been created then it must contain a tag on last event which suggests that emitting to a stream that somebody else it also writing to is not supported. Is it the case? I understand there can’t be stream that two projections are writing to but is it OK to have one projection and external sources?

Szymon

This is the case you cannot have multiple things emiting to the stream a projection emits to.

Yeah. That’s what I found out eventually this morning. I’d like to fix the overall behavior though because it kills projection subsystem altogether. I guess it should mark projection as faulted, right?

Btw, is this exclusive behavior intentional or is it just implementation detail of EmittedStream class? I.e. If I allow EmittedStream to be smarter, would something else break?

Last, I managed to work around it by emitting to separate streams in one category and then using fromCategory projection to join them but this is one additional layer of indirection.

It’s 100% intentional otherwise how would you recover etc in failure cases/restart/etc

You can have one more projection joining two or more streams

I would just ignore any events that are not tagged as mine? Just thinking out loud.

Szymon,

EmittedStream reads last events in the stream only once. Then expected version is used when writing events. Any external writes break this condition. Attempting to ignore this would require reading output streams every time someone writes to this stream and under the load we get infinite number of retries.

-yuriy

Makes sense Yuriy, not I undestand. Thank you both for a response. I will proceed with using fromCategory as a join.