Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Handle larger manifests exceeding the default grpc limit #161

Merged
merged 8 commits into from
Aug 6, 2024

Conversation

smithjilks
Copy link
Contributor

What type of PR is this?

This is a feature to allow streaming manifests larger than the default grpc limit

What does this do?

Adds ability to stream large computation manifests in chunks.

Which issue(s) does this PR fix/relate to?

  • Related PR: Prism #303

Have you included tests for your changes?

No, I have not included tests.

Did you document any new/modified feature?

Notes

@smithjilks smithjilks requested a review from SammyOina June 27, 2024 08:42
Copy link
Contributor

@SammyOina SammyOina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add tests

return err
}
for start := 0; start < len(data); start += chunkSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a byte buffer to read chunks, see cocos sdk for an example

@smithjilks smithjilks force-pushed the cocos-249 branch 2 times, most recently from 2c6374f to 46e469e Compare July 7, 2024 13:33
)

func init() {
logger, err := mglog.New(os.Stdout, "debug")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use mock logger for tests

func (s *svc) Run(ipAddress string, runReqChan chan *manager.ServerStreamMessage, authInfo credentials.AuthInfo) {
privKey, err := rsa.GenerateKey(rand.Reader, keyBitSize)
if err != nil {
s.logger.Error(fmt.Sprintf("Error generating public key: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fail test rather than logging errors, pass testting.T on svc struct

attestedTLS = false
)

func init() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use testmain for setup rather than init for test setup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test cases for different senarios

@drasko
Copy link
Contributor

drasko commented Jul 10, 2024

@smithjilks please resolve these remarks

@smithjilks smithjilks force-pushed the cocos-249 branch 5 times, most recently from 93e1e2b to 546e2b4 Compare July 11, 2024 15:06
@@ -32,21 +37,37 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
var data bytes.Buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name appropriately

if len(mes.RunReqChunks.Data) == 0 {
var runReq pkgmanager.ComputationRunReq
if err = proto.Unmarshal(runReqBuffer.Bytes(), &runReq); err != nil {
return errCorruptedManifest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap the error

}
runReqBuffer.Write(mes.RunReqChunks.Data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle error

Comment on lines 68 to 106
case *manager.ServerStreamMessage_RunReq:
data, err := proto.Marshal(msg.RunReq)
if err != nil {
return err
}
dataBuffer := bytes.NewBuffer(data)
buf := make([]byte, bufferSize)
for {
n, err := dataBuffer.Read(buf)
chunk := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &manager.RunReqChunks{
Data: buf[:n],
},
},
}

if err := stream.Send(chunk); err != nil {
return err
}

if err == io.EOF {
break
}
}

case *manager.ServerStreamMessage_TerminateReq:
terminate := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_TerminateReq{
TerminateReq: msg.TerminateReq,
},
}
if err := stream.Send(terminate); err != nil {
return err
}

default:
return ErrUnexpectedMsg
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle all message types as in main

Comment on lines 94 to 124
case *manager.ServerStreamMessage_TerminateReq:
terminate := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_TerminateReq{
TerminateReq: msg.TerminateReq,
},
}
if err := stream.Send(terminate); err != nil {
return err
}

case *manager.ServerStreamMessage_StopComputation:
stopComp := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_StopComputation{
StopComputation: msg.StopComputation,
},
}
if err := stream.Send(stopComp); err != nil {
return err
}

case *manager.ServerStreamMessage_BackendInfoReq:
backendInfo := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_BackendInfoReq{
BackendInfoReq: msg.BackendInfoReq,
},
}
if err := stream.Send(backendInfo); err != nil {
return err
}
default:
return ErrUnexpectedMsg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the fault case to send all other messages, and no need to create new variables.

if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
return err
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil

is a goroutine on client and should not return here

@drasko drasko merged commit 3c855e3 into ultravioletrs:main Aug 6, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants