Skip to content

Commit

Permalink
Handle larger manifests exceeding the default grpc limit
Browse files Browse the repository at this point in the history
Signed-off-by: Jilks Smith <[email protected]>
  • Loading branch information
smithjilks committed Jul 7, 2024
1 parent 4c4161c commit 2c6374f
Show file tree
Hide file tree
Showing 8 changed files with 653 additions and 120 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/prometheus/client_golang v1.18.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0
Expand All @@ -30,6 +31,7 @@ require (
cloud.google.com/go/compute v1.23.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand All @@ -44,9 +46,11 @@ require (
github.com/mdlayher/socket v0.4.1 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/stretchr/objx v0.5.1 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand All @@ -55,4 +59,5 @@ require (
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
Expand All @@ -79,6 +83,8 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand All @@ -87,9 +93,12 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0=
github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down Expand Up @@ -162,6 +171,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
39 changes: 30 additions & 9 deletions manager/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
package grpc

import (
"bytes"
"context"
"errors"

"github.com/ultravioletrs/cocos/manager"
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)

var errTerminationFromServer = errors.New("server requested client termination")
var (
errTerminationFromServer = errors.New("server requested client termination")
errCorruptedManifest = errors.New("received manifest may be corrupted")
)

type ManagerClient struct {
stream pkgmanager.ManagerService_ProcessClient
Expand All @@ -32,21 +37,37 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
var data bytes.Buffer
for {
req, err := client.stream.Recv()
if err != nil {
return err
}

switch mes := req.Message.(type) {
case *pkgmanager.ServerStreamMessage_RunReq:
port, err := client.svc.Run(ctx, mes.RunReq)
if err != nil {
return err
}
runRes := &pkgmanager.ClientStreamMessage_RunRes{RunRes: &pkgmanager.RunResponse{AgentPort: port, ComputationId: mes.RunReq.Id}}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
return err
case *pkgmanager.ServerStreamMessage_RunReqChunks:
if len(mes.RunReqChunks.Data) == 0 {
var runReq pkgmanager.ComputationRunReq
if err = proto.Unmarshal(data.Bytes(), &runReq); err != nil {
return errCorruptedManifest
}
port, err := client.svc.Run(ctx, &runReq)
if err != nil {
return err
}
runRes := &pkgmanager.ClientStreamMessage_RunRes{
RunRes: &pkgmanager.RunResponse{
AgentPort: port,
ComputationId: runReq.Id,
},
}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
return err
}
return nil
}
data.Write(mes.RunReqChunks.Data)

case *pkgmanager.ServerStreamMessage_TerminateReq:
cancel()
return errors.Join(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
Expand Down
53 changes: 50 additions & 3 deletions manager/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
package grpc

import (
"bytes"
"errors"
"io"

"github.com/ultravioletrs/cocos/pkg/manager"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
)

var (
_ manager.ManagerServiceServer = (*grpcServer)(nil)
ErrUnexpectedMsg = errors.New("unknown message type")
)

var _ manager.ManagerServiceServer = (*grpcServer)(nil)
const bufferSize = 1024 * 1024 // 1 MB

type grpcServer struct {
manager.UnimplementedManagerServiceServer
Expand Down Expand Up @@ -54,8 +64,45 @@ func (s *grpcServer) Process(stream manager.ManagerService_ProcessServer) error
case <-ctx.Done():
return nil
case req := <-runReqChan:
if err := stream.Send(req); err != nil {
return err
switch msg := req.Message.(type) {
case *manager.ServerStreamMessage_RunReq:
data, err := proto.Marshal(msg.RunReq)
if err != nil {
return err
}
dataBuffer := bytes.NewBuffer(data)
buf := make([]byte, bufferSize)
for {
n, err := dataBuffer.Read(buf)
chunk := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &manager.RunReqChunks{
Data: buf[:n],
},
},
}

if err := stream.Send(chunk); err != nil {
return err
}

if err == io.EOF {
break
}
}

case *manager.ServerStreamMessage_TerminateReq:
terminate := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_TerminateReq{
TerminateReq: msg.TerminateReq,
},
}
if err := stream.Send(terminate); err != nil {
return err
}

default:
return ErrUnexpectedMsg
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions manager/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ message ClientStreamMessage {

message ServerStreamMessage {
oneof message {
ComputationRunReq runReq = 1;
Terminate terminateReq = 2;
RunReqChunks runReqChunks = 1;
ComputationRunReq runReq = 2;
Terminate terminateReq = 3;
}
}

message RunReqChunks {
bytes data = 1;
}

message ComputationRunReq {
string id = 1;
string name = 2;
Expand Down
Loading

0 comments on commit 2c6374f

Please sign in to comment.