Skip to content

Commit

Permalink
Attempt to restart plugin and kill runtime on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Dennor committed Sep 19, 2022
1 parent 1cc8486 commit dca92cd
Showing 1 changed file with 63 additions and 33 deletions.
96 changes: 63 additions & 33 deletions pkg/driver/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type Plugin struct {
client Client
runnersCount uint8
lock sync.RWMutex
clilock sync.RWMutex
secrets driver.Secrets
cmdRef *exec.Cmd
done chan struct{}
Expand Down Expand Up @@ -154,8 +155,14 @@ func (p *Plugin) createRunners() {
}
}

func (p *Plugin) getClient() (plugin.ClientProtocol, error) {
p.clilock.RLock()
defer p.clilock.RUnlock()
return p.client.Client()
}

func (p *Plugin) getClientShim() (driverShim, error) {
rpcClient, err := p.client.Client()
rpcClient, err := p.getClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,44 +195,64 @@ func (p *Plugin) getRunners() []pluginRunner {
return runners
}

func (p *Plugin) setClient(cmd *exec.Cmd) {
p.clilock.Lock()
defer p.clilock.Unlock()
p.client = NewPluginClient(&plugin.ClientConfig{
HandshakeConfig: p.handshake(),
Plugins: map[string]plugin.Plugin{
"driver_grpc": &GRPC{},
},
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: NewLogger("plugin"),
})
}

func (p *Plugin) setCmdRef(cmd *exec.Cmd) {
p.clilock.Lock()
defer p.clilock.Unlock()
p.cmdRef = cmd
}

func (p *Plugin) createClient() error {
cmd := ExecCommand(p.cmd)
for k, v := range p.secrets {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
createProcGroup(cmd)
p.setClient(cmd)
d, err := p.getClientShim()
if err != nil {
if cmd.Process != nil {
cmd.Process.Kill()
}
return err
}
p.setCmdRef(cmd)
ctx := context.Background()
go func() {
if err := d.Stdout(ctx, "plugin."+filepath.Base(cmd.Path)); err != nil {
klog.Error(err)
}
}()
go func() {
if err := d.Stderr(ctx, "plugin."+filepath.Base(cmd.Path)); err != nil {
klog.Error(err)
}
}()
return err
}

func (p *Plugin) start() error {
if p.getRunners() == nil {
p.lock.Lock()
defer p.lock.Unlock()
if p.runners == nil {
cmd := ExecCommand(p.cmd)
for k, v := range p.secrets {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
createProcGroup(cmd)
p.client = NewPluginClient(&plugin.ClientConfig{
HandshakeConfig: p.handshake(),
Plugins: map[string]plugin.Plugin{
"driver_grpc": &GRPC{},
},
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: NewLogger("plugin"),
})
d, err := p.getClientShim()
err := p.createClient()
if err != nil {
if cmd.Process != nil {
cmd.Process.Kill()
}
return err
}
p.cmdRef = cmd
ctx := context.Background()
go func() {
if err := d.Stdout(ctx, "plugin."+filepath.Base(cmd.Path)); err != nil {
klog.Error(err)
}
}()
go func() {
if err := d.Stderr(ctx, "plugin."+filepath.Base(cmd.Path)); err != nil {
klog.Error(err)
}
}()
done := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Second * 5)
Expand All @@ -238,8 +265,11 @@ func (p *Plugin) start() error {
err = rpcClient.Ping()
}
if err != nil {
klog.Error(errors.Wrap(err, "plugin error, quitting: "))
return
err = p.createClient()
if err != nil {
klog.Fatal(errors.Wrap(err, "plugin error, quitting: "))
return
}
}
case <-done:
return
Expand Down

0 comments on commit dca92cd

Please sign in to comment.