Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster pub-sub resubscription and event sourcing streaming #2112

Open
AqlaSolutions opened this issue Apr 8, 2024 · 8 comments
Open

Cluster pub-sub resubscription and event sourcing streaming #2112

AqlaSolutions opened this issue Apr 8, 2024 · 8 comments
Labels

Comments

@AqlaSolutions
Copy link
Contributor

AqlaSolutions commented Apr 8, 2024

I have 2 nodes, one subscribes and another publishes. If the publisher node restarts, will the first node receive the published messages after that? What are guarantees on this? Is it possible that some published messages from the new node will be skipped before resubscription?

How should we implement a snapshot+events subscription considering all above? We request a snapshot at the start of subscription but it's possible that after the restart we will need a new snapshot so we have to somehow identify that events are coming from a new node, right?

@AqlaSolutions AqlaSolutions changed the title Does cluster pub-sub continue working after publisher node restart? Cluster pub-sub resubscription and event sourcing streaming Apr 8, 2024
@AqlaSolutions
Copy link
Contributor Author

AqlaSolutions commented Apr 8, 2024

I discovered that the subscription doesn't survive the publisher restart when I set RunAsClient = true at the subscriber side. Setting it to false fixes the resubscription. Is it expected behavior?

@AqlaSolutions
Copy link
Contributor Author

I researched the source code, it looks like subscribers don't keep any information on what topic they are subscribed to and we have to add KV storage at publisher side. What is the reason for this design? Wouldn't it be easier for subscribers to keep their state themselves? Then TopicActor at start would need to broadcast that the topic is available and DeliveryActors would send resubscribe requests. Or they could just send currently subscribed topics to each newly connected node when a new connection is established. This way there is no need keep the state externally.

@rogeralsing
Copy link
Contributor

rogeralsing commented Apr 9, 2024

But in addition to this. I´d really like to have the topic types separated. so that you can have more configuration per topic.

e.g.

something like:

clusterConfig.WitTopicKind("ChatRoom", Props.FromProducer(() => new TopicActor(kvStore)))

Then when you publish:

await _publisher.Publish("myChatRoom123", "ChatRoom", new ChatMessage ....)

That would allow you to keep those specific topics bound to specific nodes, and not as it is today, all nodes holds topic-actors.

@rogeralsing
Copy link
Contributor

How should we implement a snapshot+events subscription considering all above? We request a snapshot at the start of subscription but it's possible that after the restart we will need a new snapshot so we have to somehow identify that events are coming from a new node, right?

First, I assume you are using virtual actors here? as they are location transparent within the cluster.

I´m thinking that no matter if the subscribing virtual actor is awake or not right now.
If you publish to the pubsub, and that virtual actor is already subscribed.
Then the message will either go directly to the already awake actor, or the actor will be woken up and then get the message.

In the actor OnStart handler, you could just load your snapshot and any persisted events after that specific snapshot.

Why would you have to identify from where the events are coming in? if they go to the topic actor, and the topic actor forwards to the subscribed actor. it will get it.

@AqlaSolutions
Copy link
Contributor Author

AqlaSolutions commented Apr 9, 2024

You misunderstood what I mean by event sourcing here. Our subscriber actor doesn't store anything, it requests the snapshot of the current state from the publisher grain and then keeps this state up-to-date by receiving incremental Pub-Sub events and applying them onto the snapshot. If event publisher restarts, we need to detect that Pub-Sub events are now coming from another grain instance and re-request the snapshot. What I asked is whether we really need this re-requesting part or can we rely on that there will be no skipped events at the subscriber part when the publisher node restarts.

@AqlaSolutions
Copy link
Contributor Author

AqlaSolutions commented Apr 10, 2024

@rogeralsing

See: https://github.com/asynkron/realtimemap-dotnet/blob/3c2609e26777007805281725a7696ed8338d48a0/Backend/ProtoActorExtensions.cs#L74

https://github.com/asynkron/realtimemap-dotnet/blob/3c2609e26777007805281725a7696ed8338d48a0/Backend/ProtoActorExtensions.cs#L61-L64

This is how you can have persistent subscriptions for the topic actor.

I implemented IKeyValueStore but TopicActor removes subscribers from the list after loading because they are using RunAsClient = true so that they are not present in the Consul member list. I can't set it to false for now because they have no access to the KeyValueStore so TopicActor should not spawn on them. Is there any solution for this?

@AqlaSolutions
Copy link
Contributor Author

AqlaSolutions commented Apr 10, 2024

@rogeralsing , another problem here is that when I subscribe to a topic from an unnamed actor, it receives a numeric PID like $1. This PID gets stored into the key value storage. It's possible that when subscribers are loaded in TopicActor, the loaded PIDs may point now to another actor which didn't subscribe to the topic. Of course it can only happen if the subscriber also restarts during TopicActor downtime. Is there a mechanism to prevent this? May be actors should use random guid names instead of incremental numbers? Or better add unique node GUID to PID instead so that restarted node on the same host will be treated as a different node? Because now even named actors may receive PubSub events after their node restart if the publisher was down during such restart, I don't think that it's intended.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants