Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added visibility_timeout.go #11

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open

added visibility_timeout.go #11

wants to merge 25 commits into from

Conversation

kitkatchoco2002
Copy link
Collaborator

Implement message distribution, acknowledgment, visibility timeout, and time extension logic

Copy link
Collaborator

@tituscarl tituscarl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented. Please move this handlers to the server.go and maybe you can focus more on how to implement the visibility timeout logic so that all nodes can track whether that message is safe to stream to the client without depending on spanner data, maybe through broadcast?

}

// Publish writes a message to Spanner and acknowledges receipt
func (s *PubSubServer) Publish(ctx context.Context, req *pubsubpb.PublishRequest) (*pubsubpb.PublishResponse, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have already Publish API?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please edit the current Publish that we have if you need to do some changes.

return &pubsubpb.ModifyVisibilityTimeoutResponse{Success: true}, nil
}

func main() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be our main func in main.go

func (s *PubSubServer) Subscribe(req *pubsubpb.SubscribeRequest, stream pubsubpb.PubSubService_SubscribeServer) error {
ctx := stream.Context()
stmt := spanner.Statement{
SQL: `SELECT id, payload, topic, visibilityTimeout, processed FROM Messages
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not query directly to spanner, instead utilize the data received from the leader.

return err
}

msg.Processed = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to set to false since all messages from the leader has processed = false.


// Acknowledge confirms message processing
func (s *PubSubServer) Acknowledge(ctx context.Context, req *pubsubpb.AcknowledgeRequest) (*pubsubpb.AcknowledgeResponse, error) {
_, err := s.spannerClient.Apply(ctx, []*spanner.Mutation{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After receiving Ack request, it's supposed to remove the messages from the queue right? then set the status in spanner.

@kitkatchoco2002
Copy link
Collaborator Author

kitkatchoco2002 commented Feb 22, 2025

good day sir, i have updated the server.go in my branch that primarily focuses on implementing the visibility timeout logic while ensuring that all nodes track whether a message is safe to stream to a client without solely relying on Spanner. It maintains an in-memory structure to store message visibility states, allowing nodes to coordinate through broadcast mechanisms rather than frequent database queries. When a subscriber requests a message, the system first checks if the message is locked or visible. If the message is available, it is assigned exclusively to the subscriber, and a visibility timeout is set. This timeout is synchronized across all nodes using a broadcast mechanism to ensure consistency.

If the subscriber acknowledges the message, it is removed from the in-memory queue and marked as processed in Spanner. If the subscriber fails to acknowledge before the timeout expires, the message is made available again for other subscribers. Additionally, the system allows subscribers to request visibility extensions, updating both the in-memory state and Spanner asynchronously. The subscription process does not query Spanner directly but instead retrieves messages from the leader node’s in-memory queue, ensuring efficient message distribution. This approach enforces exclusive message locks per subscriber while minimizing database dependency and reducing query load.

are my codes on the right track sir?

server.go Outdated
messageQueueMu sync.RWMutex
}

type broadCastInput struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the one in broadcast.go?

server.go Outdated
Comment on lines 21 to 25
visibilityTimeouts sync.Map // messageID -> VisibilityInfo
lockMu sync.RWMutex

messageQueue map[string][]*pb.Message // topic -> messages
messageQueueMu sync.RWMutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to PubSub object.

Copy link
Collaborator Author

@kitkatchoco2002 kitkatchoco2002 Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it from the server struct into a separate PubSub struct. is this what you mean sir?

maybe like this?

type PubSub struct {
visibilityTimeouts sync.Map // messageID -> VisibilityInfo
lockMu sync.RWMutex
messageQueue map[string][]*pb.Message // topic -> messages
messageQueueMu sync.RWMutex
}
type server struct {
client *spanner.Client
op *hedge.Op
pb.UnimplementedPubSubServiceServer
pubSub *PubSub
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check app.go

server.go Outdated
MessageID string `json:"messageId"`
SubscriberID string `json:"subscriberId"`
ExpiresAt time.Time `json:"expiresAt"`
NodeID string `json:"nodeId"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I now what is NodeID for? It seems it's an uuid and generated every locking of message. So I assume it's different per message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeID is generated as a UUID when locking a message. it is different per message, it means each message lock instance gets a unique identifier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, when can nodeID be used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be useful in distributed environments where multiple nodes manage message processing.
If multiple nodes handle subscriptions, NodeID can help track which node locked a message. Example: If NodeID is stored with VisibilityInfo, a node can check if it owns a lock before attempting an update.
Before allowing a visibility timeout extension, we could verify if the request comes from the correct node using NodeID. Example: Only the original locking node (with the correct NodeID) can extend the timeout.
If issues arise where messages are incorrectly locked/unlocked, NodeID can help trace which node last held the message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should i remove it sir?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss this further.

server.go Outdated
)

func NewServer(client *spanner.Client, op *hedge.Op) *server {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to initialize server

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try to do it in main.go

server.go Outdated
"topic": req.Topic,
},
}
if err := s.broadcastAck(bcastin); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there are no definitions for broadcastAck()

broadcast.go Outdated
Comment on lines 33 to 59
parts := strings.Split(string(msg), ":")
switch parts[0] {
case "lock":
//if a node receives a "lock" request for a message it already has locked, it should reject duplicate locks.
messageID := parts[1]
if _, exists := app.messageLocks.Load(messageID); exists {
return nil, nil // Already locked, ignore duplicate
}
case "unlock":
// Handle unlock request
messageID := parts[1]
app.messageLocks.Delete(messageID)
// Clean up locks and timers
case "delete":
messageID := parts[1]
app.messageLocks.Delete(messageID)
app.messageQueue.Delete(messageID)
case "extend":
// Handle timeout extension
messageID := parts[1]
newTimeout, _ := strconv.Atoi(parts[2])
if lockInfo, ok := app.messageLocks.Load(messageID); ok {
info := lockInfo.(MessageLockInfo)
info.Timeout = time.Now().Add(time.Duration(newTimeout) * time.Second)
app.messageLocks.Store(messageID, info)
}
// Update timeout and reset timer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kitkatchoco2002 Please create function per case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the handleBroadcastedMsg is supposedly for type = message only, you can add you own type and create separate function each.

app.go Outdated
Comment on lines 12 to 17
messageLocks sync.Map // messageID -> MessageLockInfo
messageQueue sync.Map // topic -> []*pb.Message
subscriptions sync.Map // subscriptionID -> *pb.Subscription

// Timer tracking
timeoutTimers sync.Map // messageID -> *time.Timer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to coordinate with this with Jansen, since he will be responsible for storing in-mem data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants