Skip to content

Commit

Permalink
fix: getting stream error for dependency server
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Dec 18, 2024
1 parent 9071378 commit 5bdcd29
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
21 changes: 18 additions & 3 deletions core/grpc/server/dependency_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package server
import (
"context"
"errors"
"fmt"
"github.com/cenkalti/backoff/v4"
"io"
"sync"
"time"
Expand Down Expand Up @@ -248,12 +250,25 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g
return nil, nil
}

func (svr DependencyServiceServer) GetStream(key string) (stream *grpc.DependencyService_ConnectServer, err error) {
func (svr DependencyServiceServer) GetStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) {
b := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 30)
err = backoff.Retry(func() error {
stream, err = svr.getStream(nodeKey)
return err
}, b)
if err != nil {
log.Errorf("get stream error: %v", err)
return nil, err
}
return stream, nil
}

func (svr DependencyServiceServer) getStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) {
svr.mu.Lock()
defer svr.mu.Unlock()
stream, ok := svr.streams[key]
stream, ok := svr.streams[nodeKey]
if !ok {
return nil, errors.New("stream not found")
return nil, fmt.Errorf("stream not found for node: %s", nodeKey)
}
return stream, nil
}
Expand Down
11 changes: 0 additions & 11 deletions core/task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,17 +767,6 @@ func (r *Runner) handleIPC() {
}
}

// SendToChild sends a message to the child process through the IPC channel
// msgType: type of message being sent
// payload: data to be sent to the child process
func (r *Runner) SendToChild(msgType string, payload interface{}) {
r.ipcChan <- entity.IPCMessage{
Type: msgType,
Payload: payload,
IPC: true, // Explicitly mark as IPC message
}
}

// SetIPCHandler sets the handler for incoming IPC messages
func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) {
r.ipcHandler = handler
Expand Down

0 comments on commit 5bdcd29

Please sign in to comment.