Skip to content

Commit

Permalink
send response to manager on computation termination
Browse files Browse the repository at this point in the history
Signed-off-by: WashingtonKK <[email protected]>
  • Loading branch information
WashingtonKK committed Aug 21, 2024
1 parent 31391a3 commit 58139f9
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 222 deletions.
20 changes: 20 additions & 0 deletions manager/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,27 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
cancel()
return errors.Wrap(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
case *pkgmanager.ServerStreamMessage_StopComputation:
msg := &pkgmanager.ClientStreamMessage_StopComputationRes{StopComputationRes: &pkgmanager.StopComputationResponse{
ComputationId: mes.StopComputation.ComputationId,
}}
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
if errors.Contains(err, manager.ErrNotFound) {
msg.StopComputationRes.Message = err.Error()
msg.StopComputationRes.Stopped = false
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: msg}); err != nil {
return err
}
return nil
}
msg.StopComputationRes.Message = err.Error()
msg.StopComputationRes.Stopped = false
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: msg}); err != nil {
return err
}
return err
}
msg.StopComputationRes.Stopped = true
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: msg}); err != nil {
return err
}
case *pkgmanager.ServerStreamMessage_BackendInfoReq:
Expand Down
7 changes: 7 additions & 0 deletions manager/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ message StopComputation {
string computation_id = 1;
}

message StopComputationResponse {
string computation_id = 1;
bool stopped = 2;
string message = 3;
}

message RunResponse{
string agent_port = 1;
string computation_id = 2;
Expand Down Expand Up @@ -53,6 +59,7 @@ message ClientStreamMessage {
AgentEvent agent_event = 2;
RunResponse run_res = 3;
BackendInfo backendInfo = 4;
StopComputationResponse stopComputationRes = 5;
}
}

Expand Down
2 changes: 2 additions & 0 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,14 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
func (ms *managerService) Stop(ctx context.Context, computationID string) error {
cvm, ok := ms.vms[computationID]
if !ok {
defer ms.publishEvent("stop-computation", computationID, "failed", json.RawMessage{})
return ErrNotFound
}
if err := cvm.Stop(); err != nil {
return err
}
delete(ms.vms, computationID)
defer ms.publishEvent("stop-computation", computationID, "complete", json.RawMessage{})
return nil
}

Expand Down
Loading

0 comments on commit 58139f9

Please sign in to comment.