Skip to content

Commit

Permalink
NOISSUE - Handle larger manifests exceeding the default grpc limit (u…
Browse files Browse the repository at this point in the history
…ltravioletrs#161)

* Handle larger manifests exceeding the default grpc limit

Signed-off-by: Jilks Smith <[email protected]>

* Update manager tests

Signed-off-by: Jilks Smith <[email protected]>

* Update manager tests

* Update manager client.go

* Update manager client.go

* Update manager client.go

* Update manager grpc server.go

* Update manager grpc server and client

---------

Signed-off-by: Jilks Smith <[email protected]>
  • Loading branch information
smithjilks authored Aug 6, 2024
1 parent 9161d30 commit 3c855e3
Show file tree
Hide file tree
Showing 7 changed files with 698 additions and 356 deletions.
40 changes: 31 additions & 9 deletions manager/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
package grpc

import (
"bytes"
"context"
"errors"

"github.com/absmach/magistrala/pkg/errors"
"github.com/ultravioletrs/cocos/manager"
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)

var errTerminationFromServer = errors.New("server requested client termination")
var (
errTerminationFromServer = errors.New("server requested client termination")
errCorruptedManifest = errors.New("received manifest may be corrupted")
)

type ManagerClient struct {
stream pkgmanager.ManagerService_ProcessClient
Expand All @@ -32,24 +37,41 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
var runReqBuffer bytes.Buffer
for {
req, err := client.stream.Recv()
if err != nil {
return err
}

switch mes := req.Message.(type) {
case *pkgmanager.ServerStreamMessage_RunReq:
port, err := client.svc.Run(ctx, mes.RunReq)
if err != nil {
return err
case *pkgmanager.ServerStreamMessage_RunReqChunks:
if len(mes.RunReqChunks.Data) == 0 {
var runReq pkgmanager.ComputationRunReq
if err = proto.Unmarshal(runReqBuffer.Bytes(), &runReq); err != nil {
return errors.Wrap(err, errCorruptedManifest)
}
port, err := client.svc.Run(ctx, &runReq)
if err != nil {
return err
}
runRes := &pkgmanager.ClientStreamMessage_RunRes{
RunRes: &pkgmanager.RunResponse{
AgentPort: port,
ComputationId: runReq.Id,
},
}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
return err
}
}
runRes := &pkgmanager.ClientStreamMessage_RunRes{RunRes: &pkgmanager.RunResponse{AgentPort: port, ComputationId: mes.RunReq.Id}}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
if _, err := runReqBuffer.Write(mes.RunReqChunks.Data); err != nil {
return err
}

case *pkgmanager.ServerStreamMessage_TerminateReq:
cancel()
return errors.Join(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
return errors.Wrap(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
case *pkgmanager.ServerStreamMessage_StopComputation:
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
return err
Expand Down
45 changes: 42 additions & 3 deletions manager/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
package grpc

import (
"bytes"
"errors"
"io"

"github.com/ultravioletrs/cocos/pkg/manager"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
)

var (
_ manager.ManagerServiceServer = (*grpcServer)(nil)
ErrUnexpectedMsg = errors.New("unknown message type")
)

var _ manager.ManagerServiceServer = (*grpcServer)(nil)
const bufferSize = 1024 * 1024 // 1 MB

type grpcServer struct {
manager.UnimplementedManagerServiceServer
Expand Down Expand Up @@ -54,8 +64,37 @@ func (s *grpcServer) Process(stream manager.ManagerService_ProcessServer) error
case <-ctx.Done():
return nil
case req := <-runReqChan:
if err := stream.Send(req); err != nil {
return err
switch msg := req.Message.(type) {
case *manager.ServerStreamMessage_RunReq:
data, err := proto.Marshal(msg.RunReq)
if err != nil {
return err
}
dataBuffer := bytes.NewBuffer(data)
buf := make([]byte, bufferSize)
for {
n, err := dataBuffer.Read(buf)
chunk := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &manager.RunReqChunks{
Data: buf[:n],
},
},
}

if err := stream.Send(chunk); err != nil {
return err
}

if err == io.EOF {
break
}
}

default:
if err := stream.Send(req); err != nil {
return err
}
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions manager/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ message ClientStreamMessage {

message ServerStreamMessage {
oneof message {
ComputationRunReq runReq = 1;
Terminate terminateReq = 2;
StopComputation stopComputation = 3;
BackendInfoReq backendInfoReq = 4;
RunReqChunks runReqChunks = 1;
ComputationRunReq runReq = 2;
Terminate terminateReq = 3;
StopComputation stopComputation = 4;
BackendInfoReq backendInfoReq = 5;
}
}

message RunReqChunks {
bytes data = 1;
}

message ComputationRunReq {
string id = 1;
string name = 2;
Expand Down
Loading

0 comments on commit 3c855e3

Please sign in to comment.