Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jose dale branch #17

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2254dfc
Add files via upload
quantdale Feb 17, 2025
f4a48d9
Created server and CRUD-topic initially with test-client
d4rkpunk69 Feb 19, 2025
9eae374
go.mod?
d4rkpunk69 Feb 19, 2025
8d7dd20
added a CRUD subscription api, but is not yet configured to spanner DB
quantdale Feb 20, 2025
3c632a7
initial subscription beta
quantdale Feb 24, 2025
1c9061a
updated CRUD-topic and added testing
d4rkpunk69 Feb 21, 2025
dbcb4f2
added test-client
d4rkpunk69 Feb 21, 2025
b702853
deleted image and clean readme
d4rkpunk69 Feb 21, 2025
9d5e168
Update crud-topic
d4rkpunk69 Feb 24, 2025
4c15a46
updated topic crud
d4rkpunk69 Feb 24, 2025
8511224
updated crud topic
d4rkpunk69 Feb 24, 2025
3fcf161
Done CRUD-topic
d4rkpunk69 Feb 24, 2025
cc2b5ea
updated crud topic
d4rkpunk69 Feb 24, 2025
cde31ab
unifinished spanner implementation
quantdale Feb 24, 2025
d86ede0
crud api for sub
quantdale Feb 24, 2025
822569a
crud api for subscription
quantdale Feb 24, 2025
c54a3d5
implemented crud-topic ins server
d4rkpunk69 Feb 25, 2025
d52f655
updated-crud-topic by jose
d4rkpunk69 Feb 25, 2025
549eaf5
Merge branches 'main' and 'jose-dale_branch' of https://github.com/al…
quantdale Feb 25, 2025
0f04393
updated flag to leader
d4rkpunk69 Feb 25, 2025
e03c257
removed redundant topic and merged to server.go
d4rkpunk69 Feb 25, 2025
726f27c
Merge remote-tracking branch 'origin/main' into jose-dale_branch
d4rkpunk69 Feb 25, 2025
7ea7833
updated mod
d4rkpunk69 Feb 25, 2025
623f2cd
modified crud api subscription
quantdale Feb 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
*.out
*.o
*.a
*.out
temp.txt
*.so
*.dll
*.dylib

/pubsub
/pubsub
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
- `testclient/testclient.go`: gRPC test client.
- `main.go`: Entry point of the application.
- `go.mod`: Go module file.
- `go.sum`: Go dependencies file.
- `go.sum`: Go dependencies file.
182 changes: 182 additions & 0 deletions crud-subscription/reject/subscription_beta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package topic

import (
"context"
"fmt"

spanner "cloud.google.com/go/spanner"
pb "github.com/alphauslabs/pubsub-proto/v1"
"github.com/flowerinthenight/hedge/v2"
"github.com/google/uuid"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

type SubscriptionService struct {
client pb.PubSubServiceClient
SpannerClient *spanner.Client
}

func NewSubscriptionService(conn *grpc.ClientConn, db *spanner.Client) *SubscriptionService {
client := pb.NewPubSubServiceClient(conn)
return &SubscriptionService{
client: client,
SpannerClient: db,
}


}


//update the CreateSubscription function to include subscription id
func (s *SubscriptionService) CreateSubscription(ctx context.Context, req *pb.CreateSubscriptionRequest) (*pb.Subscription, error) {
if req.id == "" || req.TopicId == "" {
return nil, fmt.Errorf("invalid request: Topic ID and Subscription ID are required")
}

//check if subscription exists
exist, err := s.SubscriptionExists(ctx, req.id)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check subscription existence: %v", err)
}
if exist {
return nil, status.Errorf(codes.AlreadyExists, "subscription name %q already exists", req. id)
}

stmt := spanner.Statement{
SQL: "INSERT INTO Subscriptions (subscriptionID, TopciID) VALUES (@id, @topic_id)",
Params: map[string]interface{}{"subscriptionID": req.id, "TopicID": *&req.TopicId},
}

_, err = s.SpannerClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
_, err = txn.Update(ctx, stmt)
return err
})
if err != nil {
return nil, fmt.Errorf("failed to create subscription: %v", err)
}

return &pb.Subscription{Id: req.id, TopicId: *&req.TopicId}, nil
}

//modify the GetSubscription function to include subscription id and done
func (s *SubscriptionService) GetSubscription(ctx context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error) {
if req.Id == "" {
return nil, fmt.Errorf("invalid request: exisitng subscription required")
}

stmt := spanner.Statement{
SQL: "SELECT subscriptionID FROM Subscriptions WHERE subscriptionID = @id LIMIT 1",
Params: map[string]interface{}{"id": req.Id},
}

iter := s.SpannerClient.Single().Query(ctx, stmt)
defer iter.Stop()

row, err := iter.Next()
if err == iterator.Done {
return nil, status.Errorf(codes.NotFound, "subscription with ID %q not found", req.Id)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to query subscription: %v", err)
}

var (
TopciID, SubscriptionID string
createdAt, updatedAt spanner.NullTime
)
if err := row.Columns(&TopciID, &SubscriptionID, &createdAt, &updatedAt); err != nil {
return nil, status.Errorf(codes.Internal, "failed to parse topic data: %v", err)
}

return &pb.Subscription{
Id: SubscriptionID,
TopicId: TopciID,
}, nil
}

//modify the GetSubscription function to include topic id and done
func (s *SubscriptionService) UpdateSubscription(ctx context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error) {
if req.Id == "" {
return nil, status.Error(codes.InvalidArgument, "subscription ID is required")
}
if req.TopicId == "" {
return nil, status.Error(codes.InvalidArgument, "attached topicId is required")
}

// Check if new Subscription already exists
exist, err := s.SubscriptionExists(ctx, req.Id)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check ID availability: %v", err)
}
if exist {
return nil, status.Errorf(codes.AlreadyExists, "Subscription ID %q already exists", req.Id)
}

// Get existing subscription to verify existence
current, err := s.GetSubscription(ctx, &pb.GetSubscriptionRequest{Id: req.Id})
if err != nil {
return nil, err
}

m := spanner.Update("Subsscriptions",
[]string{"subscriptionID", "TopicID", "updatedAt"},
[]interface{}{current.Id, req.TopicID, spanner.CommitTimestamp},
)

_, err = s.SpannerClient.Apply(ctx, []*spanner.Mutation{m})
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to update topic: %v", err)
}

updatedTopic := &pb.Topic{
Id: current.Id,
SubscriptionID: req.Id,
}

if err := s.notifyLeader(ctx, 1); err != nil {
log.Printf("Failed to notify leader: %v", err)
}

return updatedTopic, nil
}

func (s *SubscriptionService) DeleteSubscription(ctx context.Context, req *pb.DeleteSubscriptionRequest) (*pb.DeleteSubscriptionResponse, error) {
response, err := s.client.DeleteSubscription(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to delete subscription: %v", err)
}
// to be configured x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0xx0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x
return response, nil
}

func (s *SubscriptionService) ListSubscriptions(ctx context.Context, req *pb.Empty) (*pb.ListSubscriptionsResponse, error) {
subscriptions, err := s.client.ListSubscriptions(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to list subscriptions: %v", err)
}
// to be configured x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0xx0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x0x
return subscriptions, nil
}

//Additional functions
func (s *TopicService) SubscriptionExists(ctx context.Context, name string) (bool, error) {
stmt := spanner.Statement{
SQL: "SELECT 1 FROM Subscription WHERE subscriptionID = @subscriptionID LIMIT 1",
Params: map[string]interface{}{"subscriptionID": subscriptionID},
}

iter := s.SpannerClient.Single().Query(ctx, stmt)
defer iter.Stop()

_, err := iter.Next()
if err == iterator.Done {
return false, nil
}
if err != nil {
return false, fmt.Errorf("existence check failed: %w", err)
}
return true, nil
}
Loading