Skip to content

Commit

Permalink
better share lifecycle monitoring and error handling (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Sep 13, 2024
1 parent 8525348 commit 2cf484e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 18 deletions.
33 changes: 30 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,33 @@ type Agent struct {
root env_core.Root
agentSocket string
shares map[string]*share
inShares chan *share
outShares chan *share
accesses map[string]*access
inAccesses chan *access
outAccesses chan *access
}

func NewAgent(root env_core.Root) (*Agent, error) {
if !root.IsEnabled() {
return nil, errors.Errorf("unable to load environment; did you 'zrok enable'?")
}
return &Agent{
root: root,
shares: make(map[string]*share),
accesses: make(map[string]*access),
root: root,
shares: make(map[string]*share),
inShares: make(chan *share),
outShares: make(chan *share),
accesses: make(map[string]*access),
inAccesses: make(chan *access),
outAccesses: make(chan *access),
}, nil
}

func (a *Agent) Run() error {
logrus.Infof("started")

go a.manager()

agentSocket, err := a.root.AgentSocket()
if err != nil {
return err
Expand All @@ -55,3 +65,20 @@ func (a *Agent) Shutdown() {
logrus.Warnf("unable to remove agent socket: %v", err)
}
}

func (a *Agent) manager() {
logrus.Info("started")
defer logrus.Warn("exited")

for {
select {
case inShare := <-a.inShares:
logrus.Infof("adding new share '%v'", inShare.token)
a.shares[inShare.token] = inShare

case outShare := <-a.outShares:
logrus.Infof("removing share '%v'", outShare.token)
delete(a.shares, outShare.token)
}
}
}
27 changes: 22 additions & 5 deletions agent/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/pkg/errors"
"strings"
"time"
)
Expand All @@ -27,15 +28,27 @@ type share struct {
closed bool
accessGrants []string

process *proctree.Child
readBuffer bytes.Buffer
ready chan struct{}
process *proctree.Child
readBuffer bytes.Buffer
booted bool
bootComplete chan struct{}
bootErr error

a *Agent
}

func (s *share) monitor() {
if err := proctree.WaitChild(s.process); err != nil {
pfxlog.ChannelLogger(s.token).Error(err)
}
s.a.outShares <- s
}

func (s *share) tail(data []byte) {
s.readBuffer.Write(data)
if line, err := s.readBuffer.ReadString('\n'); err == nil {
if s.token == "" {
line = strings.Trim(line, "\n")
if !s.booted {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
if v, found := in["token"]; found {
Expand All @@ -52,8 +65,12 @@ func (s *share) tail(data []byte) {
}
}
}
s.booted = true
} else {
s.bootErr = errors.New(line)
}
close(s.ready)
close(s.bootComplete)

} else {
if strings.HasPrefix(line, "{") {
in := make(map[string]interface{})
Expand Down
25 changes: 15 additions & 10 deletions agent/publicShare.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ func (i *agentGrpcImpl) PublicShare(_ context.Context, req *agentGrpc.PublicShar

shrCmd := []string{os.Args[0], "share", "public", "--agent", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
ready: make(chan struct{}),
shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
bootComplete: make(chan struct{}),
a: i.a,
}

for _, basicAuth := range req.BasicAuth {
Expand Down Expand Up @@ -74,14 +75,18 @@ func (i *agentGrpcImpl) PublicShare(_ context.Context, req *agentGrpc.PublicShar

shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
if err != nil {
return nil, err
return &agentGrpc.PublicShareReply{}, err
}

<-shr.ready
i.a.shares[shr.token] = shr
go shr.monitor()
<-shr.bootComplete

return &agentGrpc.PublicShareReply{
Token: shr.token,
FrontendEndpoints: shr.frontendEndpoints,
}, nil
if shr.bootErr == nil {
i.a.inShares <- shr
return &agentGrpc.PublicShareReply{
Token: shr.token,
FrontendEndpoints: shr.frontendEndpoints,
}, nil
}
return &agentGrpc.PublicShareReply{}, shr.bootErr
}

0 comments on commit 2cf484e

Please sign in to comment.