Skip to content

Commit

Permalink
Merge pull request #739 from openziti/daemon_mode
Browse files Browse the repository at this point in the history
zrok Agent (#463)
  • Loading branch information
michaelquigley authored Sep 18, 2024
2 parents e97620d + 37ed76e commit 5af4aa6
Show file tree
Hide file tree
Showing 42 changed files with 3,944 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

MAJOR RELEASE: zrok reaches version 1.0.0!

FEATURE: New "zrok Agent", a background manager process for your zrok environments, which allows you to easily manage and work with multiple `zrok share` and `zrok access` processes (https://github.com/openziti/zrok/issues/463)

## v0.4.41

FIX: Fixed crash when invoking `zrok share reserved` with no arguments (https://github.com/openziti/zrok/issues/740)
Expand Down
76 changes: 76 additions & 0 deletions agent/access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package agent

import (
"bytes"
"encoding/json"
"errors"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/zrok/agent/proctree"
"github.com/sirupsen/logrus"
"strings"
)

type access struct {
frontendToken string
token string
bindAddress string
responseHeaders []string

process *proctree.Child
readBuffer bytes.Buffer
booted bool
bootComplete chan struct{}
bootErr error

a *Agent
}

func (a *access) monitor() {
if err := proctree.WaitChild(a.process); err != nil {
pfxlog.ChannelLogger(a.token).Error(err)
}
a.a.outAccesses <- a
}

func (a *access) tail(data []byte) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("recovering: %v", r)
}
}()
a.readBuffer.Write(data)
if line, err := a.readBuffer.ReadString('\n'); err == nil {
line = strings.Trim(line, "\n")
if !a.booted {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
if v, found := in["frontend_token"]; found {
if str, ok := v.(string); ok {
a.frontendToken = str
}
}
if v, found := in["bind_address"]; found {
if str, ok := v.(string); ok {
a.bindAddress = str
}
}
a.booted = true
} else {
a.bootErr = errors.New(line)
}
close(a.bootComplete)

} else {
if strings.HasPrefix(line, "{") {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
pfxlog.ChannelLogger(a.token).Info(in)
}
} else {
pfxlog.ChannelLogger(a.token).Info(strings.Trim(line, "\n"))
}
}
} else {
a.readBuffer.WriteString(line)
}
}
48 changes: 48 additions & 0 deletions agent/accessPrivate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package agent

import (
"context"
"errors"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus"
"os"
)

func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPrivateRequest) (*agentGrpc.AccessPrivateResponse, error) {
root, err := environment.LoadRoot()
if err != nil {
return nil, err
}

if !root.IsEnabled() {
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
}

accCmd := []string{os.Args[0], "access", "private", "--agent", "-b", req.BindAddress, req.Token}
acc := &access{
token: req.Token,
bindAddress: req.BindAddress,
responseHeaders: req.ResponseHeaders,
bootComplete: make(chan struct{}),
a: i.a,
}

logrus.Infof("executing '%v'", accCmd)

acc.process, err = proctree.StartChild(acc.tail, accCmd...)
if err != nil {
return nil, err
}

go acc.monitor()
<-acc.bootComplete

if acc.bootErr == nil {
i.a.inAccesses <- acc
return &agentGrpc.AccessPrivateResponse{FrontendToken: acc.frontendToken}, nil
}

return nil, acc.bootErr
}
131 changes: 131 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package agent

import (
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/environment/env_core"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"net"
"os"
)

type Agent struct {
root env_core.Root
agentSocket string
shares map[string]*share
inShares chan *share
outShares chan *share
accesses map[string]*access
inAccesses chan *access
outAccesses chan *access
}

func NewAgent(root env_core.Root) (*Agent, error) {
if !root.IsEnabled() {
return nil, errors.Errorf("unable to load environment; did you 'zrok enable'?")
}
return &Agent{
root: root,
shares: make(map[string]*share),
inShares: make(chan *share),
outShares: make(chan *share),
accesses: make(map[string]*access),
inAccesses: make(chan *access),
outAccesses: make(chan *access),
}, nil
}

func (a *Agent) Run() error {
logrus.Infof("started")

if err := proctree.Init("zrok Agent"); err != nil {
return err
}
go a.manager()

agentSocket, err := a.root.AgentSocket()
if err != nil {
return err
}
l, err := net.Listen("unix", agentSocket)
if err != nil {
return err
}
a.agentSocket = agentSocket

srv := grpc.NewServer()
agentGrpc.RegisterAgentServer(srv, &agentGrpcImpl{a: a})
if err := srv.Serve(l); err != nil {
return err
}

return nil
}

func (a *Agent) Shutdown() {
logrus.Infof("stopping")

if err := os.Remove(a.agentSocket); err != nil {
logrus.Warnf("unable to remove agent socket: %v", err)
}
for _, shr := range a.shares {
logrus.Debugf("stopping share '%v'", shr.token)
a.outShares <- shr
}
for _, acc := range a.accesses {
logrus.Debugf("stopping access '%v'", acc.token)
a.outAccesses <- acc
}
}

func (a *Agent) manager() {
logrus.Info("started")
defer logrus.Warn("exited")

for {
select {
case inShare := <-a.inShares:
logrus.Infof("adding new share '%v'", inShare.token)
a.shares[inShare.token] = inShare

case outShare := <-a.outShares:
if outShare.token != "" {
logrus.Infof("removing share '%v'", outShare.token)
if err := proctree.StopChild(outShare.process); err != nil {
logrus.Errorf("error stopping share '%v': %v", outShare.token, err)
}
if err := proctree.WaitChild(outShare.process); err != nil {
logrus.Errorf("error joining share '%v': %v", outShare.token, err)
}
delete(a.shares, outShare.token)
} else {
logrus.Debug("skipping unidentified (orphaned) share removal")
}

case inAccess := <-a.inAccesses:
logrus.Infof("adding new access '%v'", inAccess.frontendToken)
a.accesses[inAccess.frontendToken] = inAccess

case outAccess := <-a.outAccesses:
if outAccess.frontendToken != "" {
logrus.Infof("removing access '%v'", outAccess.frontendToken)
if err := proctree.StopChild(outAccess.process); err != nil {
logrus.Errorf("error stopping access '%v': %v", outAccess.frontendToken, err)
}
if err := proctree.WaitChild(outAccess.process); err != nil {
logrus.Errorf("error joining access '%v': %v", outAccess.frontendToken, err)
}
delete(a.accesses, outAccess.frontendToken)
} else {
logrus.Debug("skipping unidentified (orphaned) access removal")
}
}
}
}

type agentGrpcImpl struct {
agentGrpc.UnimplementedAgentServer
a *Agent
}
33 changes: 33 additions & 0 deletions agent/agentClient/agentClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package agentClient

import (
"context"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/environment/env_core"
"github.com/openziti/zrok/tui"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"net"
)

func NewClient(root env_core.Root) (client agentGrpc.AgentClient, conn *grpc.ClientConn, err error) {
agentSocket, err := root.AgentSocket()
if err != nil {
tui.Error("error getting agent socket", err)
}

opts := []grpc.DialOption{
grpc.WithContextDialer(func(_ context.Context, addr string) (net.Conn, error) {
return net.Dial("unix", addr)
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
resolver.SetDefaultScheme("passthrough")
conn, err = grpc.NewClient(agentSocket, opts...)
if err != nil {
tui.Error("error connecting to agent socket", err)
}

return agentGrpc.NewAgentClient(conn), conn, nil
}
Loading

0 comments on commit 5af4aa6

Please sign in to comment.