Skip to content

Commit

Permalink
rename cvm to cvms plural
Browse files Browse the repository at this point in the history
Signed-off-by: Sammy Oina <[email protected]>
  • Loading branch information
SammyOina committed Jan 15, 2025
1 parent fa6816b commit 41fd64e
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 383 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protoc:
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/agent.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative manager/manager.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/events/events.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/cvm/cvm.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/cvms/cvms.proto

mocks:
mockery --config ./mockery.yml
Expand Down
40 changes: 20 additions & 20 deletions agent/cvm/api/grpc/client.go → agent/cvms/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

"github.com/absmach/magistrala/pkg/errors"
"github.com/ultravioletrs/cocos/agent"
"github.com/ultravioletrs/cocos/agent/cvm"
"github.com/ultravioletrs/cocos/agent/cvm/server"
"github.com/ultravioletrs/cocos/agent/cvms"
"github.com/ultravioletrs/cocos/agent/cvms/server"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
Expand All @@ -23,16 +23,16 @@ var (

type CVMClient struct {
mu sync.Mutex
stream cvm.CVMService_ProcessClient
stream cvms.CVMsService_ProcessClient
svc agent.Service
messageQueue chan *cvm.ClientStreamMessage
messageQueue chan *cvms.ClientStreamMessage
logger *slog.Logger
runReqManager *runRequestManager
sp server.AgentServer
}

// NewClient returns new gRPC client instance.
func NewClient(stream cvm.CVMService_ProcessClient, svc agent.Service, messageQueue chan *cvm.ClientStreamMessage, logger *slog.Logger, sp server.AgentServer) CVMClient {
func NewClient(stream cvms.CVMsService_ProcessClient, svc agent.Service, messageQueue chan *cvms.ClientStreamMessage, logger *slog.Logger, sp server.AgentServer) CVMClient {
return CVMClient{
stream: stream,
svc: svc,
Expand Down Expand Up @@ -74,23 +74,23 @@ func (client *CVMClient) handleIncomingMessages(ctx context.Context) error {
}
}

func (client *CVMClient) processIncomingMessage(ctx context.Context, req *cvm.ServerStreamMessage) error {
func (client *CVMClient) processIncomingMessage(ctx context.Context, req *cvms.ServerStreamMessage) error {
switch mes := req.Message.(type) {
case *cvm.ServerStreamMessage_RunReqChunks:
case *cvms.ServerStreamMessage_RunReqChunks:
return client.handleRunReqChunks(ctx, mes)
case *cvm.ServerStreamMessage_StopComputation:
case *cvms.ServerStreamMessage_StopComputation:
go client.handleStopComputation(ctx, mes)
default:
return errors.New("unknown message type")
}
return nil
}

func (client *CVMClient) handleRunReqChunks(ctx context.Context, mes *cvm.ServerStreamMessage_RunReqChunks) error {
func (client *CVMClient) handleRunReqChunks(ctx context.Context, mes *cvms.ServerStreamMessage_RunReqChunks) error {
buffer, complete := client.runReqManager.addChunk(mes.RunReqChunks.Id, mes.RunReqChunks.Data, mes.RunReqChunks.IsLast)

if complete {
var runReq cvm.ComputationRunReq
var runReq cvms.ComputationRunReq
if err := proto.Unmarshal(buffer, &runReq); err != nil {
return errors.Wrap(err, errCorruptedManifest)
}
Expand All @@ -101,7 +101,7 @@ func (client *CVMClient) handleRunReqChunks(ctx context.Context, mes *cvm.Server
return nil
}

func (client *CVMClient) executeRun(ctx context.Context, runReq *cvm.ComputationRunReq) {
func (client *CVMClient) executeRun(ctx context.Context, runReq *cvms.ComputationRunReq) {
ac := agent.Computation{
ID: runReq.Id,
Name: runReq.Name,
Expand Down Expand Up @@ -137,11 +137,11 @@ func (client *CVMClient) executeRun(ctx context.Context, runReq *cvm.Computation
defer client.mu.Unlock()

if runReq.AgentConfig == nil {
runReq.AgentConfig = &cvm.AgentConfig{}
runReq.AgentConfig = &cvms.AgentConfig{}
}

runRes := &cvm.ClientStreamMessage_RunRes{
RunRes: &cvm.RunResponse{
runRes := &cvms.ClientStreamMessage_RunRes{
RunRes: &cvms.RunResponse{
ComputationId: runReq.Id,
},
}
Expand All @@ -160,12 +160,12 @@ func (client *CVMClient) executeRun(ctx context.Context, runReq *cvm.Computation
runRes.RunRes.Error = err.Error()
}

client.sendMessage(&cvm.ClientStreamMessage{Message: runRes})
client.sendMessage(&cvms.ClientStreamMessage{Message: runRes})
}

func (client *CVMClient) handleStopComputation(ctx context.Context, mes *cvm.ServerStreamMessage_StopComputation) {
msg := &cvm.ClientStreamMessage_StopComputationRes{
StopComputationRes: &cvm.StopComputationResponse{
func (client *CVMClient) handleStopComputation(ctx context.Context, mes *cvms.ServerStreamMessage_StopComputation) {
msg := &cvms.ClientStreamMessage_StopComputationRes{
StopComputationRes: &cvms.StopComputationResponse{
ComputationId: mes.StopComputation.ComputationId,
},
}
Expand All @@ -180,7 +180,7 @@ func (client *CVMClient) handleStopComputation(ctx context.Context, mes *cvm.Ser
msg.StopComputationRes.Message = err.Error()
}

client.sendMessage(&cvm.ClientStreamMessage{Message: msg})
client.sendMessage(&cvms.ClientStreamMessage{Message: msg})
}

func (client *CVMClient) handleOutgoingMessages(ctx context.Context) error {
Expand All @@ -196,7 +196,7 @@ func (client *CVMClient) handleOutgoingMessages(ctx context.Context) error {
}
}

func (client *CVMClient) sendMessage(mes *cvm.ClientStreamMessage) {
func (client *CVMClient) sendMessage(mes *cvms.ClientStreamMessage) {
ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
mglog "github.com/absmach/magistrala/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ultravioletrs/cocos/agent/cvm"
servermocks "github.com/ultravioletrs/cocos/agent/cvm/server/mocks"
"github.com/ultravioletrs/cocos/agent/cvms"
servermocks "github.com/ultravioletrs/cocos/agent/cvms/server/mocks"
"github.com/ultravioletrs/cocos/agent/mocks"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
Expand All @@ -22,12 +22,12 @@ type mockStream struct {
grpc.ClientStream
}

func (m *mockStream) Recv() (*cvm.ServerStreamMessage, error) {
func (m *mockStream) Recv() (*cvms.ServerStreamMessage, error) {
args := m.Called()
return args.Get(0).(*cvm.ServerStreamMessage), args.Error(1)
return args.Get(0).(*cvms.ServerStreamMessage), args.Error(1)
}

func (m *mockStream) Send(msg *cvm.ClientStreamMessage) error {
func (m *mockStream) Send(msg *cvms.ClientStreamMessage) error {
args := m.Called(msg)
return args.Error(0)
}
Expand All @@ -42,9 +42,9 @@ func TestManagerClient_Process1(t *testing.T) {
{
name: "Stop computation",
setupMocks: func(mockStream *mockStream, mockSvc *mocks.Service, mockServerSvc *servermocks.AgentServerProvider) {
mockStream.On("Recv").Return(&cvm.ServerStreamMessage{
Message: &cvm.ServerStreamMessage_StopComputation{
StopComputation: &cvm.StopComputation{},
mockStream.On("Recv").Return(&cvms.ServerStreamMessage{
Message: &cvms.ServerStreamMessage_StopComputation{
StopComputation: &cvms.StopComputation{},
},
}, nil)
mockStream.On("Send", mock.Anything).Return(nil)
Expand All @@ -57,9 +57,9 @@ func TestManagerClient_Process1(t *testing.T) {
{
name: "Run request chunks",
setupMocks: func(mockStream *mockStream, mockSvc *mocks.Service, mockServerSvc *servermocks.AgentServerProvider) {
mockStream.On("Recv").Return(&cvm.ServerStreamMessage{
Message: &cvm.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvm.RunReqChunks{},
mockStream.On("Recv").Return(&cvms.ServerStreamMessage{
Message: &cvms.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvms.RunReqChunks{},
},
}, nil)
mockStream.On("Send", mock.Anything).Return(nil).Once()
Expand All @@ -70,7 +70,7 @@ func TestManagerClient_Process1(t *testing.T) {
{
name: "Receive error",
setupMocks: func(mockStream *mockStream, mockSvc *mocks.Service, mockServerSvc *servermocks.AgentServerProvider) {
mockStream.On("Recv").Return(&cvm.ServerStreamMessage{}, assert.AnError)
mockStream.On("Recv").Return(&cvms.ServerStreamMessage{}, assert.AnError)
},
expectError: true,
},
Expand All @@ -81,7 +81,7 @@ func TestManagerClient_Process1(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
mockServerSvc := new(servermocks.AgentServerProvider)
messageQueue := make(chan *cvm.ClientStreamMessage, 10)
messageQueue := make(chan *cvms.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger, mockServerSvc)
Expand Down Expand Up @@ -109,25 +109,25 @@ func TestManagerClient_handleRunReqChunks(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
mockServerSvc := new(servermocks.AgentServerProvider)
messageQueue := make(chan *cvm.ClientStreamMessage, 10)
messageQueue := make(chan *cvms.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger, mockServerSvc)

runReq := &cvm.ComputationRunReq{
runReq := &cvms.ComputationRunReq{
Id: "test-id",
}
runReqBytes, _ := proto.Marshal(runReq)

chunk1 := &cvm.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvm.RunReqChunks{
chunk1 := &cvms.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvms.RunReqChunks{
Id: "chunk-1",
Data: runReqBytes[:len(runReqBytes)/2],
IsLast: false,
},
}
chunk2 := &cvm.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvm.RunReqChunks{
chunk2 := &cvms.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvms.RunReqChunks{
Id: "chunk-1",
Data: runReqBytes[len(runReqBytes)/2:],
IsLast: true,
Expand All @@ -150,7 +150,7 @@ func TestManagerClient_handleRunReqChunks(t *testing.T) {
assert.Len(t, messageQueue, 1)

msg := <-messageQueue
runRes, ok := msg.Message.(*cvm.ClientStreamMessage_RunRes)
runRes, ok := msg.Message.(*cvms.ClientStreamMessage_RunRes)
assert.True(t, ok)
assert.Equal(t, "test-id", runRes.RunRes.ComputationId)
}
Expand All @@ -159,13 +159,13 @@ func TestManagerClient_handleStopComputation(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
mockServerSvc := new(servermocks.AgentServerProvider)
messageQueue := make(chan *cvm.ClientStreamMessage, 10)
messageQueue := make(chan *cvms.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger, mockServerSvc)

stopReq := &cvm.ServerStreamMessage_StopComputation{
StopComputation: &cvm.StopComputation{
stopReq := &cvms.ServerStreamMessage_StopComputation{
StopComputation: &cvms.StopComputation{
ComputationId: "test-comp-id",
},
}
Expand All @@ -182,7 +182,7 @@ func TestManagerClient_handleStopComputation(t *testing.T) {
assert.Len(t, messageQueue, 1)

msg := <-messageQueue
stopRes, ok := msg.Message.(*cvm.ClientStreamMessage_StopComputationRes)
stopRes, ok := msg.Message.(*cvms.ClientStreamMessage_StopComputationRes)
assert.True(t, ok)
assert.Equal(t, "test-comp-id", stopRes.StopComputationRes.ComputationId)
assert.Empty(t, stopRes.StopComputationRes.Message)
Expand Down
File renamed without changes.
28 changes: 14 additions & 14 deletions agent/cvm/api/grpc/server.go → agent/cvms/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,28 @@ import (
"io"
"time"

"github.com/ultravioletrs/cocos/agent/cvm"
"github.com/ultravioletrs/cocos/agent/cvms"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
)

var (
_ cvm.CVMServiceServer = (*grpcServer)(nil)
ErrUnexpectedMsg = errors.New("unknown message type")
_ cvms.CVMsServiceServer = (*grpcServer)(nil)
ErrUnexpectedMsg = errors.New("unknown message type")
)

const (
bufferSize = 1024 * 1024 // 1 MB
runReqTimeout = 30 * time.Second
)

type SendFunc func(*cvm.ServerStreamMessage) error
type SendFunc func(*cvms.ServerStreamMessage) error

type grpcServer struct {
cvm.UnimplementedCVMServiceServer
incoming chan *cvm.ClientStreamMessage
cvms.UnimplementedCVMsServiceServer
incoming chan *cvms.ClientStreamMessage
svc Service
}

Expand All @@ -39,14 +39,14 @@ type Service interface {
}

// NewServer returns new AuthServiceServer instance.
func NewServer(incoming chan *cvm.ClientStreamMessage, svc Service) cvm.CVMServiceServer {
func NewServer(incoming chan *cvms.ClientStreamMessage, svc Service) cvms.CVMsServiceServer {
return &grpcServer{
incoming: incoming,
svc: svc,
}
}

func (s *grpcServer) Process(stream cvm.CVMService_ProcessServer) error {
func (s *grpcServer) Process(stream cvms.CVMsService_ProcessServer) error {
client, ok := peer.FromContext(stream.Context())
if !ok {
return errors.New("failed to get peer info")
Expand All @@ -70,13 +70,13 @@ func (s *grpcServer) Process(stream cvm.CVMService_ProcessServer) error {
})

eg.Go(func() error {
sendMessage := func(msg *cvm.ServerStreamMessage) error {
sendMessage := func(msg *cvms.ServerStreamMessage) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
switch m := msg.Message.(type) {
case *cvm.ServerStreamMessage_RunReq:
case *cvms.ServerStreamMessage_RunReq:
return s.sendRunReqInChunks(stream, m.RunReq)
default:
return stream.Send(msg)
Expand All @@ -91,7 +91,7 @@ func (s *grpcServer) Process(stream cvm.CVMService_ProcessServer) error {
return eg.Wait()
}

func (s *grpcServer) sendRunReqInChunks(stream cvm.CVMService_ProcessServer, runReq *cvm.ComputationRunReq) error {
func (s *grpcServer) sendRunReqInChunks(stream cvms.CVMsService_ProcessServer, runReq *cvms.ComputationRunReq) error {
data, err := proto.Marshal(runReq)
if err != nil {
return err
Expand All @@ -110,9 +110,9 @@ func (s *grpcServer) sendRunReqInChunks(stream cvm.CVMService_ProcessServer, run
return err
}

chunk := &cvm.ServerStreamMessage{
Message: &cvm.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvm.RunReqChunks{
chunk := &cvms.ServerStreamMessage{
Message: &cvms.ServerStreamMessage_RunReqChunks{
RunReqChunks: &cvms.RunReqChunks{
Id: runReq.Id,
Data: buf[:n],
IsLast: isLast,
Expand Down
Loading

0 comments on commit 41fd64e

Please sign in to comment.