Hello community!
I stumbled upon one error that I can’t understand why is happening, despite reading other similar topics and docs [Introduction to projections | EventStoreDB Documentation]
There is a common topic for account points(reward points) – accounts_points_changes
.
Projection 1
listens to this topic and links events to a specific stream for account(account_%id%_points_changes
).
There is a Service that allows you to check and deduct points from an account balance. It writes to a specific account stream.
Projection 2
listens to these events types and put subtraction events to the common stream accounts_points_changes
.
This workflow looks a bit strange, but it allows us not to have catch-up subscriptions for account points at the moment(avoiding state management, partitions management for workers who will be processing streams, and easy concurrency checks for deductions).
So after I ran my test, in the Admin UI, I see this error:
The 'accounts_points_changes' stream managed by projection 7 has been written to from the outside.
.
This error is related to Projection 2
And I don’t understand why. I don’t link events to the projection stream; I link events to the stream that the projection listens to. Moreover, I even created(appended one init event) the stream before creating any projections and am still getting this error. What am I doing wrong here?
EventStore DB version: 24.6 OSS
Streams I created
accounts_points_changes
account_123_subtraction
account_123_points_changes
Projection 1:
options({
processingLag: 0,
$includeLinks: true
})
fromStream('accounts_points_changes')
.when({
$any: function (state, event) {
linkTo("account_" + event.body.account_id + "_points_change", event);
}
})
Projection 2:
options({
processingLag: 0,
$includeLinks: true
})
fromAll()
.when({
AccountPointsSubtract: function (state, event) {
if (event.streamId.endsWith("subtraction")){
linkTo("accounts_points_changes", event)
}
}
})
Test code:
import json
from esdbclient import EventStoreDBClient, NewEvent, StreamState
event_store = EventStoreDBClient(
uri='esdb://admin:[email protected]:2113?tls=false&tlsVerifyCert=false'
)
for stream_name in ["account_123_points_changes", "accounts_points_changes", "account_123_subtraction"]:
current_version = event_store.get_current_version(stream_name)
if isinstance(current_version, StreamState):
continue
event_store.delete_stream(stream_name=stream_name, current_version=current_version)
stream = 'accounts_points_changes'
add_event = NewEvent(
type='AccountPointsAdd',
data=json.dumps({"account_id": 123, "value": 1}).encode('utf-8')
)
add_event2 = NewEvent(
type='AccountPointsAdd',
data=json.dumps({"account_id": 123, "value": 2}).encode('utf-8')
)
subtract_event = NewEvent(
type='AccountPointsSubtract',
data=json.dumps({"account_id": 123, "value": 2}).encode('utf-8')
)
event_store.append_to_stream(
stream_name=stream,
current_version=StreamState.ANY,
events=[add_event],
)
event_store.append_to_stream(
stream_name="account_123_subtraction",
current_version=StreamState.ANY,
events=[subtract_event],
)
event_store.append_to_stream(
stream_name=stream,
current_version=StreamState.ANY,
events=[add_event2],
)
event_stream = event_store.get_stream(
stream_name="account_123_points_changes",
stream_position=0,
resolve_links=True
)
for event in event_stream:
print(event.stream_position, event.type, event.data)