Skip to content

Commit

Permalink
NOISSUE: Return Response on Computation Termination. (#211)
Browse files Browse the repository at this point in the history
* send response to manager on computation termination

Signed-off-by: WashingtonKK <[email protected]>

* fix tests

Signed-off-by: WashingtonKK <[email protected]>

* refactor: enhance stop computation

Signed-off-by: WashingtonKK <[email protected]>

* remove comment and add event

Signed-off-by: WashingtonKK <[email protected]>

---------

Signed-off-by: WashingtonKK <[email protected]>
  • Loading branch information
WashingtonKK authored Aug 23, 2024
1 parent 4c80b57 commit 7155027
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 143 deletions.
6 changes: 6 additions & 0 deletions manager/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
cancel()
return errors.Wrap(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
case *pkgmanager.ServerStreamMessage_StopComputation:
msg := &pkgmanager.ClientStreamMessage_StopComputationRes{StopComputationRes: &pkgmanager.StopComputationResponse{
ComputationId: mes.StopComputation.ComputationId,
}}
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
msg.StopComputationRes.Message = err.Error()
}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: msg}); err != nil {
return err
}
case *pkgmanager.ServerStreamMessage_BackendInfoReq:
Expand Down
6 changes: 6 additions & 0 deletions manager/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ message StopComputation {
string computation_id = 1;
}

message StopComputationResponse {
string computation_id = 1;
string message = 2;
}

message RunResponse{
string agent_port = 1;
string computation_id = 2;
Expand Down Expand Up @@ -53,6 +58,7 @@ message ClientStreamMessage {
AgentEvent agent_event = 2;
RunResponse run_res = 3;
BackendInfo backendInfo = 4;
StopComputationResponse stopComputationRes = 5;
}
}

Expand Down
3 changes: 3 additions & 0 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
func (ms *managerService) Stop(ctx context.Context, computationID string) error {
cvm, ok := ms.vms[computationID]
if !ok {
defer ms.publishEvent("stop-computation", computationID, "failed", json.RawMessage{})
return ErrNotFound
}
if err := cvm.Stop(); err != nil {
defer ms.publishEvent("stop-computation", computationID, "failed", json.RawMessage{})
return err
}
delete(ms.vms, computationID)
defer ms.publishEvent("stop-computation", computationID, "complete", json.RawMessage{})
return nil
}

Expand Down
15 changes: 13 additions & 2 deletions manager/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func TestRun(t *testing.T) {

vmf.AssertExpectations(t)

// Clear the events channel
for len(eventsChan) > 0 {
<-eventsChan
}
Expand All @@ -124,6 +123,10 @@ func TestRun(t *testing.T) {
}

func TestStop(t *testing.T) {
vmf := new(mocks.Provider)
vmMock := new(mocks.VM)
vmf.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(vmMock)

tests := []struct {
name string
computationID string
Expand Down Expand Up @@ -156,8 +159,12 @@ func TestStop(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := slog.Default()
eventsChan := make(chan *manager.ClientStreamMessage, 10)
ms := &managerService{
vms: make(map[string]vm.VM),
logger: logger,
vms: make(map[string]vm.VM),
eventsChan: eventsChan,
}
vmMock := new(mocks.VM)

Expand All @@ -180,6 +187,10 @@ func TestStop(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, ms.vms, 0)
}

for len(eventsChan) > 0 {
<-eventsChan
}
})
}
}
Expand Down
Loading

0 comments on commit 7155027

Please sign in to comment.