Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added visibility_timeout.go #11

Merged
merged 27 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2977e16
Implement message distribution, acknowledgment, visibility timeout, a…
kitkatchoco2002 Feb 21, 2025
eec4f66
Merge remote-tracking branch 'origin/main' into kate_branch
kitkatchoco2002 Feb 22, 2025
871e019
had some conflict in go.mod so i manually fix it
kitkatchoco2002 Feb 22, 2025
3644a6f
Remove visibility_timeout.go
kitkatchoco2002 Feb 22, 2025
2c14324
Delete msg_distribution directory
kitkatchoco2002 Feb 22, 2025
ee9713e
implements a message handling system in handleBroadcastedMsg
kitkatchoco2002 Feb 24, 2025
b7bb7f1
added some sync.map
kitkatchoco2002 Feb 24, 2025
f490a08
Implement distributed message locking and visibility timeout handling
kitkatchoco2002 Feb 24, 2025
bc1190c
Merge remote-tracking branch 'origin/main' into kate_branch
kitkatchoco2002 Feb 25, 2025
1523810
Merged main into kate_branch
kitkatchoco2002 Feb 25, 2025
37c89b0
Create helpers.go
kitkatchoco2002 Feb 25, 2025
ba02771
removed the helper functions
kitkatchoco2002 Feb 25, 2025
8ffd918
removed the helper functions
kitkatchoco2002 Feb 25, 2025
39f1944
Merge branch 'main' into kate_branch
kitkatchoco2002 Feb 25, 2025
b7f0537
updated broadcast.go
kitkatchoco2002 Feb 25, 2025
0450426
updated broadcast.go
kitkatchoco2002 Feb 25, 2025
c3382b7
had some adjustments with the server, broadcast and helpers
kitkatchoco2002 Feb 25, 2025
2020f23
change logic,Unlocking and extending timeouts →from Handled only by …
kitkatchoco2002 Feb 26, 2025
d3a4f01
Merge remote-tracking branch 'origin/main' into kate_branch
kitkatchoco2002 Feb 26, 2025
1821064
using the storage.go?
kitkatchoco2002 Feb 26, 2025
14d86b1
debugged helpers.go
kitkatchoco2002 Feb 26, 2025
2e8255e
Debugged errors. Modified :Acknowledge in server.go, broadcast.go (ha…
Horichii Feb 26, 2025
33129d2
reverted the original main.go
kitkatchoco2002 Feb 26, 2025
03a4f94
fix conflicts
tituscarl Feb 26, 2025
90fd323
fix conflicts
tituscarl Feb 26, 2025
c1151e2
add comments
tituscarl Feb 27, 2025
815bf95
remove comments
tituscarl Feb 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20250219182151-9fdb1cabc7b2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250219182151-9fdb1cabc7b2 // indirect
google.golang.org/protobuf v1.36.5 // indirect
)
)
260 changes: 250 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,42 @@ type server struct {
client *spanner.Client
op *hedge.Op
pb.UnimplementedPubSubServiceServer

visibilityTimeouts sync.Map // messageID -> VisibilityInfo
lockMu sync.RWMutex

messageQueue map[string][]*pb.Message // topic -> messages
messageQueueMu sync.RWMutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to PubSub object.

Copy link
Collaborator Author

@kitkatchoco2002 kitkatchoco2002 Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it from the server struct into a separate PubSub struct. is this what you mean sir?

maybe like this?

type PubSub struct {
visibilityTimeouts sync.Map // messageID -> VisibilityInfo
lockMu sync.RWMutex
messageQueue map[string][]*pb.Message // topic -> messages
messageQueueMu sync.RWMutex
}
type server struct {
client *spanner.Client
op *hedge.Op
pb.UnimplementedPubSubServiceServer
pubSub *PubSub
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check app.go

}

type broadCastInput struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the one in broadcast.go?

Type string `json:"type"`
Msg interface{} `json:"msg"`
}

type VisibilityInfo struct {
MessageID string `json:"messageId"`
SubscriberID string `json:"subscriberId"`
ExpiresAt time.Time `json:"expiresAt"`
NodeID string `json:"nodeId"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I now what is NodeID for? It seems it's an uuid and generated every locking of message. So I assume it's different per message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeID is generated as a UUID when locking a message. it is different per message, it means each message lock instance gets a unique identifier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, when can nodeID be used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be useful in distributed environments where multiple nodes manage message processing.
If multiple nodes handle subscriptions, NodeID can help track which node locked a message. Example: If NodeID is stored with VisibilityInfo, a node can check if it owns a lock before attempting an update.
Before allowing a visibility timeout extension, we could verify if the request comes from the correct node using NodeID. Example: Only the original locking node (with the correct NodeID) can extend the timeout.
If issues arise where messages are incorrectly locked/unlocked, NodeID can help trace which node last held the message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should i remove it sir?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss this further.

}

const (
MessagesTable = "Messages"
visibilityTimeout = 5 * time.Minute
cleanupInterval = 30 * time.Second
)

func NewServer(client *spanner.Client, op *hedge.Op) *server {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to initialize server

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try to do it in main.go

s := &server{
client: client,
op: op,
messageQueue: make(map[string][]*pb.Message),
}
go s.startVisibilityCleanup()
return s
}

func (s *server) Publish(ctx context.Context, in *pb.PublishRequest) (*pb.PublishResponse, error) {
if in.Topic == "" {
return nil, status.Error(codes.InvalidArgument, "topic must not be empty")
Expand All @@ -38,16 +68,18 @@ func (s *server) Publish(ctx context.Context, in *pb.PublishRequest) (*pb.Publis

messageID := uuid.New().String()
mutation := spanner.InsertOrUpdate(
MessagesTable,
[]string{"id", "topic", "payload", "createdAt", "updatedAt"},
[]interface{}{
messageID,
in.Topic,
in.Payload,
spanner.CommitTimestamp,
spanner.CommitTimestamp,
},
)
MessagesTable,
[]string{"id", "topic", "payload", "createdAt", "updatedAt", "visibilityTimeout", "processed"},
[]interface{}{
messageID,
in.Topic,
in.Payload,
spanner.CommitTimestamp,
spanner.CommitTimestamp,
nil, // Explicitly set visibilityTimeout as NULL
false, // Default to unprocessed
},
)

_, err := s.client.Apply(ctx, []*spanner.Mutation{mutation})
if err != nil {
Expand All @@ -72,3 +104,211 @@ func (s *server) Publish(ctx context.Context, in *pb.PublishRequest) (*pb.Publis
log.Printf("[Publish] Message successfully broadcasted and wrote to spanner with ID: %s", messageID)
return &pb.PublishResponse{MessageId: messageID}, nil
}

func (s *server) Subscribe(req *pb.SubscribeRequest, stream pb.PubSubService_SubscribeServer) error {
subscriberID := uuid.New().String()
ctx := stream.Context()

log.Printf("[Subscribe] New subscriber: %s for topic: %s", subscriberID, req.Topic)
go s.keepAliveSubscriber(ctx, stream)

for {
select {
case <-ctx.Done():
s.cleanupSubscriberLocks(subscriberID)
return nil
default:
s.messageQueueMu.RLock()
msgs, exists := s.messageQueue[req.Topic]
s.messageQueueMu.RUnlock()

if !exists || len(msgs) == 0 {
time.Sleep(100 * time.Millisecond)
continue
}
// Check visibility timeout before sending
info, exists := s.visibilityTimeouts.Load(msg.Id)
if exists && time.Now().Before(info.(VisibilityInfo).ExpiresAt) {
continue // Skip locked messages
}

s.messageQueueMu.Lock()
msg := msgs[0]
s.messageQueue[req.Topic] = msgs[1:]
s.messageQueueMu.Unlock()


locked, err := s.tryLockMessage(msg.Id, subscriberID)
if err != nil || !locked {
continue
}

if err := stream.Send(msg); err != nil {
s.releaseMessageLock(msg.Id, subscriberID)
return err
}
}
}
}

func (s *server) tryLockMessage(messageID, subscriberID string) (bool, error) {
s.lockMu.Lock()
defer s.lockMu.Unlock()

if _, exists := s.visibilityTimeouts.Load(messageID); exists {
return false, nil
}

visInfo := VisibilityInfo{
MessageID: messageID,
SubscriberID: subscriberID,
ExpiresAt: time.Now().Add(visibilityTimeout),
NodeID: uuid.New().String(),
}

s.visibilityTimeouts.Store(messageID, visInfo)
return true, s.broadcastVisibilityUpdate("lock", visInfo)
}

func (s *server) Acknowledge(ctx context.Context, req *pb.AcknowledgeRequest) (*pb.AcknowledgeResponse, error) {
s.messageQueueMu.Lock()
defer s.messageQueueMu.Unlock()

if err := s.releaseMessageLock(req.Id, req.SubscriberId); err != nil {
log.Printf("Error releasing message lock: %v", err)
}

mutation := spanner.Update(
MessagesTable,
[]string{"id", "processed", "updatedAt"},
[]interface{}{req.Id, true, spanner.CommitTimestamp},
)

_, err := s.client.Apply(ctx, []*spanner.Mutation{mutation})
if err != nil {
return nil, err
}

s.messageQueue[req.Topic] = s.messageQueue[req.Topic][1:]

bcastin := broadCastInput{
Type: "ack",
Msg: map[string]string{
"messageId": req.Id,
"topic": req.Topic,
},
}
if err := s.broadcastAck(bcastin); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there are no definitions for broadcastAck()

log.Printf("Error broadcasting ack: %v", err)
}

return &pb.AcknowledgeResponse{Success: true}, nil
}

func (s *server) releaseMessageLock(messageID, subscriberID string) error {
s.lockMu.Lock()
defer s.lockMu.Unlock()

if info, exists := s.visibilityTimeouts.Load(messageID); exists {
visInfo := info.(VisibilityInfo)
if visInfo.SubscriberID == subscriberID {
s.visibilityTimeouts.Delete(messageID)
return s.broadcastVisibilityUpdate("unlock", visInfo)
}
}
return nil
}

func (s *server) ExtendVisibilityTimeout(ctx context.Context, req *pb.ExtendTimeoutRequest) (*pb.ExtendTimeoutResponse, error) {
s.lockMu.Lock()
defer s.lockMu.Unlock()

info, exists := s.visibilityTimeouts.Load(req.MessageId)
if !exists {
return nil, status.Error(codes.NotFound, "Message lock not found")
}

visInfo := info.(VisibilityInfo)
if visInfo.SubscriberID != req.SubscriberId {
return nil, status.Error(codes.PermissionDenied, "Not allowed to extend timeout for this message")
}

newExpiry := time.Now().Add(time.Duration(req.ExtensionSeconds) * time.Second)
visInfo.ExpiresAt = newExpiry
s.visibilityTimeouts.Store(req.MessageId, visInfo)

// Update Spanner to reflect the new timeout
go func() {
mutation := spanner.Update(
MessagesTable,
[]string{"id", "visibilityTimeout", "updatedAt"},
[]interface{}{req.MessageId, newExpiry, spanner.CommitTimestamp},
)
_, err := s.client.Apply(ctx, []*spanner.Mutation{mutation})
if err != nil {
log.Printf("Spanner update error: %v", err)
}
}()

// Broadcast new timeout info
_ = s.broadcastVisibilityUpdate("extend", visInfo)

return &pb.ExtendTimeoutResponse{Success: true}, nil
}


func (s *server) broadcastVisibilityUpdate(cmdType string, info VisibilityInfo) error {
bcastin := broadCastInput{
Type: "visibility",
Msg: struct {
Command string `json:"command"`
Info VisibilityInfo `json:"info"`
}{
Command: cmdType,
Info: info,
},
}

data, err := json.Marshal(bcastin)
if err != nil {
return err
}

results := s.op.Broadcast(context.Background(), data)
for _, result := range results {
if result.Error != nil {
log.Printf("Broadcast error to node %s: %v", result.NodeID, result.Error)
}
}

return nil
}

func (s *server) startVisibilityCleanup() {
ticker := time.NewTicker(cleanupInterval)
for range ticker.C {
s.cleanupExpiredLocks()
}
}

func (s *server) cleanupExpiredLocks() {
now := time.Now()
s.lockMu.Lock()
defer s.lockMu.Unlock()

s.visibilityTimeouts.Range(func(key, value interface{}) bool {
visInfo := value.(VisibilityInfo)
if now.After(visInfo.ExpiresAt) {
// Double-check before deleting
if info, exists := s.visibilityTimeouts.Load(key); exists {
if time.Now().Before(info.(VisibilityInfo).ExpiresAt) {
return true // Another node extended it
}
}
s.visibilityTimeouts.Delete(key)
s.broadcastVisibilityUpdate("unlock", visInfo)
}
return true
})
}