Skip to content

Commit

Permalink
stub: support re-start after stub stopped
Browse files Browse the repository at this point in the history
In some scenarios such as runtime restart or the occurrence of
nri request timeout, the ttrpc connections between the plugin
and the runtime will be actively closed by the runtime, even the
underlying network connection will be closed together. After this,
the plugin must need to re-register to the adaptation side, but now
the stub object cannot be reused for this; if the running plugin
wants to reconnect to the runtime, the only way is to create a new
stub for the plugin.

This commit has split the lifecycle of the stub and the ttrpc
connection to better support the development of the external type
of plugins. The plugin developer can build stub once and use it to
connect to adaptation side many times, just need re-call Start()
function.

Signed-off-by: Lei Liu <[email protected]>
  • Loading branch information
zhaodiaoer committed Jul 16, 2024
1 parent 5a4c86a commit 2d6ea6f
Showing 1 changed file with 55 additions and 31 deletions.
86 changes: 55 additions & 31 deletions pkg/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ type PostUpdateContainerInterface interface {

// Stub is the interface the stub provides for the plugin implementation.
type Stub interface {
// Run the plugin. Starts the plugin then waits for an error or the plugin to stop
// Run the plugin in blocked way and wait for critical error return from plugin service.
// before this function return, stopped plugin can be restart.
Run(context.Context) error
// Start the plugin.
Start(context.Context) error
Expand Down Expand Up @@ -255,7 +256,6 @@ type stub struct {
rpcs *ttrpc.Server
rpcc *ttrpc.Client
runtime api.RuntimeService
closeOnce sync.Once
started bool
doneC chan struct{}
srvErrC chan error
Expand Down Expand Up @@ -288,7 +288,6 @@ func New(p interface{}, opts ...Option) (Stub, error) {
idx: os.Getenv(api.PluginIdxEnvVar),
socketPath: api.DefaultSocketPath,
dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) },
doneC: make(chan struct{}),
}

for _, o := range opts {
Expand Down Expand Up @@ -319,7 +318,12 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
if stub.started {
return fmt.Errorf("stub already started")
}
stub.started = true
defer func() {
if retErr == nil {
stub.started = true
}
}()
stub.doneC = make(chan struct{})

err := stub.connect()
if err != nil {
Expand Down Expand Up @@ -413,24 +417,31 @@ func (stub *stub) Stop() {
stub.close()
}

// reset stub to the status that can initiate a new
// NRI connection, the caller must hold lock.
func (stub *stub) close() {
stub.closeOnce.Do(func() {
if stub.rpcl != nil {
stub.rpcl.Close()
}
if stub.rpcs != nil {
stub.rpcs.Close()
}
if stub.rpcc != nil {
stub.rpcc.Close()
}
if stub.rpcm != nil {
stub.rpcm.Close()
}
if stub.srvErrC != nil {
<-stub.doneC
}
})
if !stub.started {
return
}

if stub.rpcl != nil {
stub.rpcl.Close()
}
if stub.rpcs != nil {
stub.rpcs.Close()
}
if stub.rpcc != nil {
stub.rpcc.Close()
}
if stub.rpcm != nil {
stub.rpcm.Close()
}
if stub.srvErrC != nil {
<-stub.doneC
}

stub.started = false
stub.conn = nil
}

// Run the plugin. Start event processing then wait for an error or getting stopped.
Expand All @@ -441,22 +452,33 @@ func (stub *stub) Run(ctx context.Context) error {
return err
}

err = <-stub.srvErrC
if err == ttrpc.ErrServerClosed {
return nil
for {
select {
case <-ctx.Done():
return nil
case err = <-stub.srvErrC:
if isRecoverableErr(err) {
log.Warnf(ctx, "Plugin service stopped", "error", err)
continue
}
return err
}
}
}

return err
func isRecoverableErr(err error) bool {
// For now, the error reports from the ttrpc level are regarded as tolerable errors.
return errors.Is(err, ttrpc.ErrProtocol) ||
errors.Is(err, ttrpc.ErrClosed) ||
errors.Is(err, ttrpc.ErrServerClosed) ||
errors.Is(err, ttrpc.ErrStreamClosed)
}

// Wait for the plugin to stop.
// Wait for the plugin to stop, should be called after Start() or Run().
func (stub *stub) Wait() {
stub.Lock()
if stub.srvErrC == nil {
return
if stub.started && stub.doneC != nil {
<-stub.doneC
}
stub.Unlock()
<-stub.doneC
}

// Name returns the full indexed name of the plugin.
Expand Down Expand Up @@ -518,7 +540,9 @@ func (stub *stub) register(ctx context.Context) error {

// Handle a lost connection.
func (stub *stub) connClosed() {
stub.Lock()
stub.close()
stub.Unlock()
if stub.onClose != nil {
stub.onClose()
return
Expand Down

0 comments on commit 2d6ea6f

Please sign in to comment.