Skip to content

Commit

Permalink
stub: support re-start after stub stopped
Browse files Browse the repository at this point in the history
In some scenarios such as runtime restart or the occurrence of
nri request timeout, the ttrpc connections between the plugin
and the runtime will be actively closed by the runtime, even the
underlying network connection will be closed together. After this,
the plugin must need to re-register to the adaptation side, but now
the stub object cannot be reused for this; if the running plugin
wants to reconnect to the runtime, the only way is to create a new
stub for the plugin.

This commit has split the lifecycle of the stub and the ttrpc
connection to better support the development of the external type
of plugins. The plugin developer can build stub once and use it to
connect to adaptation side many times, just need re-call Start()
function.

Signed-off-by: Lei Liu <[email protected]>
  • Loading branch information
zhaodiaoer committed Jul 25, 2024
1 parent 5a4c86a commit 316daf5
Showing 1 changed file with 46 additions and 28 deletions.
74 changes: 46 additions & 28 deletions pkg/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ type PostUpdateContainerInterface interface {

// Stub is the interface the stub provides for the plugin implementation.
type Stub interface {
// Run the plugin. Starts the plugin then waits for an error or the plugin to stop
// Run starts the plugin then waits for the plugin service to exit, either due to a
// critical error or an explicit call to Stop(). Once Run() returns, the plugin can be
// restarted by calling Run() or Start() again.
Run(context.Context) error
// Start the plugin.
Start(context.Context) error
Expand Down Expand Up @@ -255,7 +257,6 @@ type stub struct {
rpcs *ttrpc.Server
rpcc *ttrpc.Client
runtime api.RuntimeService
closeOnce sync.Once
started bool
doneC chan struct{}
srvErrC chan error
Expand Down Expand Up @@ -288,7 +289,6 @@ func New(p interface{}, opts ...Option) (Stub, error) {
idx: os.Getenv(api.PluginIdxEnvVar),
socketPath: api.DefaultSocketPath,
dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) },
doneC: make(chan struct{}),
}

for _, o := range opts {
Expand Down Expand Up @@ -316,10 +316,10 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
stub.Lock()
defer stub.Unlock()

if stub.started {
if stub.isStarted() {
return fmt.Errorf("stub already started")
}
stub.started = true
stub.doneC = make(chan struct{})

err := stub.connect()
if err != nil {
Expand Down Expand Up @@ -401,6 +401,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {

log.Infof(ctx, "Started plugin %s...", stub.Name())

stub.started = true
return nil
}

Expand All @@ -413,24 +414,42 @@ func (stub *stub) Stop() {
stub.close()
}

// IsStarted returns true if the plugin has been started either by Start() or by Run().
func (stub *stub) IsStarted() bool {
stub.Lock()
defer stub.Unlock()
return stub.isStarted()
}

func (stub *stub) isStarted() bool {
return stub.started
}

// reset stub to the status that can initiate a new
// NRI connection, the caller must hold lock.
func (stub *stub) close() {
stub.closeOnce.Do(func() {
if stub.rpcl != nil {
stub.rpcl.Close()
}
if stub.rpcs != nil {
stub.rpcs.Close()
}
if stub.rpcc != nil {
stub.rpcc.Close()
}
if stub.rpcm != nil {
stub.rpcm.Close()
}
if stub.srvErrC != nil {
<-stub.doneC
}
})
if !stub.isStarted() {
return
}

if stub.rpcl != nil {
stub.rpcl.Close()
}
if stub.rpcs != nil {
stub.rpcs.Close()
}
if stub.rpcc != nil {
stub.rpcc.Close()
}
if stub.rpcm != nil {
stub.rpcm.Close()
}
if stub.srvErrC != nil {
<-stub.doneC
}

stub.started = false
stub.conn = nil
}

// Run the plugin. Start event processing then wait for an error or getting stopped.
Expand All @@ -449,14 +468,11 @@ func (stub *stub) Run(ctx context.Context) error {
return err
}

// Wait for the plugin to stop.
// Wait for the plugin to stop, should be called after Start() or Run().
func (stub *stub) Wait() {
stub.Lock()
if stub.srvErrC == nil {
return
if stub.IsStarted() {
<-stub.doneC
}
stub.Unlock()
<-stub.doneC
}

// Name returns the full indexed name of the plugin.
Expand Down Expand Up @@ -518,7 +534,9 @@ func (stub *stub) register(ctx context.Context) error {

// Handle a lost connection.
func (stub *stub) connClosed() {
stub.Lock()
stub.close()
stub.Unlock()
if stub.onClose != nil {
stub.onClose()
return
Expand Down

0 comments on commit 316daf5

Please sign in to comment.