Skip to content

Commit

Permalink
Merge pull request #756 from openziti/dual_pathed
Browse files Browse the repository at this point in the history
Dual-pathed CLI Implementation (#751)
  • Loading branch information
michaelquigley authored Sep 25, 2024
2 parents f07d181 + e429556 commit 73b3203
Show file tree
Hide file tree
Showing 22 changed files with 515 additions and 578 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

MAJOR RELEASE: zrok reaches version 1.0.0!

FEATURE: New "zrok Agent", a background manager process for your zrok environments, which allows you to easily manage and work with multiple `zrok share` and `zrok access` processes (https://github.com/openziti/zrok/issues/463)
FEATURE: New "zrok Agent", a background manager process for your zrok environments, which allows you to easily manage and work with multiple `zrok share` and `zrok access` processes. New `--subordinate` flag added to `zrok share [public|private|reserved]` and `zrok access private` to operate in a mode that allows an Agent to manage shares and accesses (https://github.com/openziti/zrok/issues/463)

FEATURE: `zrok share [public|private|reserved]` and `zrok access private` now auto-detect if the zrok Agent is running in an environment and will automatically service share and access requests through the Agent, rather than in-process if the Agent is running. If the Agent is not running, operation remains as it was in `v0.4.x` and the share or access is handled in-process. New `--force-agent` and `--force-local` flags exist to skip Agent detection and manually select an operating mode (https://github.com/openziti/zrok/issues/751)

## v0.4.41

Expand Down
4 changes: 2 additions & 2 deletions agent/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ type access struct {
bootComplete chan struct{}
bootErr error

a *Agent
agent *Agent
}

func (a *access) monitor() {
if err := proctree.WaitChild(a.process); err != nil {
pfxlog.ChannelLogger(a.token).Error(err)
}
a.a.outAccesses <- a
a.agent.rmAccess <- a
}

func (a *access) tail(data []byte) {
Expand Down
6 changes: 3 additions & 3 deletions agent/accessPrivate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPr
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
}

accCmd := []string{os.Args[0], "access", "private", "--agent", "-b", req.BindAddress, req.Token}
accCmd := []string{os.Args[0], "access", "private", "--subordinate", "-b", req.BindAddress, req.Token}
acc := &access{
token: req.Token,
bindAddress: req.BindAddress,
responseHeaders: req.ResponseHeaders,
bootComplete: make(chan struct{}),
a: i.a,
agent: i.agent,
}

logrus.Infof("executing '%v'", accCmd)
Expand All @@ -40,7 +40,7 @@ func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPr
<-acc.bootComplete

if acc.bootErr == nil {
i.a.inAccesses <- acc
i.agent.addAccess <- acc
return &agentGrpc.AccessPrivateResponse{FrontendToken: acc.frontendToken}, nil
}

Expand Down
91 changes: 58 additions & 33 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/environment/env_core"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand All @@ -15,25 +16,25 @@ type Agent struct {
root env_core.Root
agentSocket string
shares map[string]*share
inShares chan *share
outShares chan *share
addShare chan *share
rmShare chan *share
accesses map[string]*access
inAccesses chan *access
outAccesses chan *access
addAccess chan *access
rmAccess chan *access
}

func NewAgent(root env_core.Root) (*Agent, error) {
if !root.IsEnabled() {
return nil, errors.Errorf("unable to load environment; did you 'zrok enable'?")
}
return &Agent{
root: root,
shares: make(map[string]*share),
inShares: make(chan *share),
outShares: make(chan *share),
accesses: make(map[string]*access),
inAccesses: make(chan *access),
outAccesses: make(chan *access),
root: root,
shares: make(map[string]*share),
addShare: make(chan *share),
rmShare: make(chan *share),
accesses: make(map[string]*access),
addAccess: make(chan *access),
rmAccess: make(chan *access),
}, nil
}

Expand All @@ -56,7 +57,7 @@ func (a *Agent) Run() error {
a.agentSocket = agentSocket

srv := grpc.NewServer()
agentGrpc.RegisterAgentServer(srv, &agentGrpcImpl{a: a})
agentGrpc.RegisterAgentServer(srv, &agentGrpcImpl{agent: a})
if err := srv.Serve(l); err != nil {
return err
}
Expand All @@ -72,11 +73,11 @@ func (a *Agent) Shutdown() {
}
for _, shr := range a.shares {
logrus.Debugf("stopping share '%v'", shr.token)
a.outShares <- shr
a.rmShare <- shr
}
for _, acc := range a.accesses {
logrus.Debugf("stopping access '%v'", acc.token)
a.outAccesses <- acc
a.rmAccess <- acc
}
}

Expand All @@ -86,46 +87,70 @@ func (a *Agent) manager() {

for {
select {
case inShare := <-a.inShares:
case inShare := <-a.addShare:
logrus.Infof("adding new share '%v'", inShare.token)
a.shares[inShare.token] = inShare

case outShare := <-a.outShares:
if outShare.token != "" {
logrus.Infof("removing share '%v'", outShare.token)
if err := proctree.StopChild(outShare.process); err != nil {
logrus.Errorf("error stopping share '%v': %v", outShare.token, err)
case outShare := <-a.rmShare:
if shr, found := a.shares[outShare.token]; found {
logrus.Infof("removing share '%v'", shr.token)
if err := proctree.StopChild(shr.process); err != nil {
logrus.Errorf("error stopping share '%v': %v", shr.token, err)
}
if err := proctree.WaitChild(outShare.process); err != nil {
logrus.Errorf("error joining share '%v': %v", outShare.token, err)
if err := proctree.WaitChild(shr.process); err != nil {
logrus.Errorf("error joining share '%v': %v", shr.token, err)
}
delete(a.shares, outShare.token)
if !shr.reserved {
if err := a.deleteShare(shr.token); err != nil {
logrus.Errorf("error deleting share '%v': %v", shr.token, err)
}
}
delete(a.shares, shr.token)
} else {
logrus.Debug("skipping unidentified (orphaned) share removal")
}

case inAccess := <-a.inAccesses:
case inAccess := <-a.addAccess:
logrus.Infof("adding new access '%v'", inAccess.frontendToken)
a.accesses[inAccess.frontendToken] = inAccess

case outAccess := <-a.outAccesses:
if outAccess.frontendToken != "" {
logrus.Infof("removing access '%v'", outAccess.frontendToken)
if err := proctree.StopChild(outAccess.process); err != nil {
logrus.Errorf("error stopping access '%v': %v", outAccess.frontendToken, err)
case outAccess := <-a.rmAccess:
if acc, found := a.accesses[outAccess.frontendToken]; found {
logrus.Infof("removing access '%v'", acc.frontendToken)
if err := proctree.StopChild(acc.process); err != nil {
logrus.Errorf("error stopping access '%v': %v", acc.frontendToken, err)
}
if err := proctree.WaitChild(acc.process); err != nil {
logrus.Errorf("error joining access '%v': %v", acc.frontendToken, err)
}
if err := proctree.WaitChild(outAccess.process); err != nil {
logrus.Errorf("error joining access '%v': %v", outAccess.frontendToken, err)
if err := a.deleteAccess(acc.token, acc.frontendToken); err != nil {
logrus.Errorf("error deleting access '%v': %v", acc.frontendToken, err)
}
delete(a.accesses, outAccess.frontendToken)
delete(a.accesses, acc.frontendToken)
} else {
logrus.Debug("skipping unidentified (orphaned) access removal")
}
}
}
}

func (a *Agent) deleteShare(token string) error {
logrus.Debugf("deleting share '%v'", token)
if err := sdk.DeleteShare(a.root, &sdk.Share{Token: token}); err != nil {
return err
}
return nil
}

func (a *Agent) deleteAccess(token, frontendToken string) error {
logrus.Debugf("deleting access '%v'", frontendToken)
if err := sdk.DeleteAccess(a.root, &sdk.Access{Token: frontendToken, ShareToken: token}); err != nil {
return err
}
return nil
}

type agentGrpcImpl struct {
agentGrpc.UnimplementedAgentServer
a *Agent
agent *Agent
}
24 changes: 21 additions & 3 deletions agent/agentClient/agentClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package agentClient
import (
"context"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/build"
"github.com/openziti/zrok/environment/env_core"
"github.com/openziti/zrok/tui"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"net"
"strings"
)

func NewClient(root env_core.Root) (client agentGrpc.AgentClient, conn *grpc.ClientConn, err error) {
agentSocket, err := root.AgentSocket()
if err != nil {
tui.Error("error getting agent socket", err)
return nil, nil, err
}

opts := []grpc.DialOption{
Expand All @@ -26,8 +28,24 @@ func NewClient(root env_core.Root) (client agentGrpc.AgentClient, conn *grpc.Cli
resolver.SetDefaultScheme("passthrough")
conn, err = grpc.NewClient(agentSocket, opts...)
if err != nil {
tui.Error("error connecting to agent socket", err)
return nil, nil, err
}

return agentGrpc.NewAgentClient(conn), conn, nil
}

func IsAgentRunning(root env_core.Root) (bool, error) {
client, conn, err := NewClient(root)
if err != nil {
return false, err
}
defer func() { _ = conn.Close() }()
resp, err := client.Version(context.Background(), &agentGrpc.VersionRequest{})
if err != nil {
return false, nil
}
if !strings.HasPrefix(resp.GetV(), build.Series) {
return false, errors.Errorf("agent reported version '%v'; we expected version '%v'", resp.GetV(), build.Series)
}
return true, nil
}
1 change: 0 additions & 1 deletion agent/proctree/impl_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package proctree

import (
"github.com/kolesnikovae/go-winjob"
"golang.org/x/sys/windows"
"os/exec"
"sync"
Expand Down
16 changes: 3 additions & 13 deletions agent/releaseAccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,15 @@ package agent
import (
"context"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func (i *agentGrpcImpl) ReleaseAccess(_ context.Context, req *agentGrpc.ReleaseAccessRequest) (*agentGrpc.ReleaseAccessResponse, error) {
if acc, found := i.a.accesses[req.FrontendToken]; found {
logrus.Infof("stopping access '%v'", acc.frontendToken)

if err := proctree.StopChild(acc.process); err != nil {
logrus.Error(err)
}

if err := proctree.WaitChild(acc.process); err != nil {
logrus.Error(err)
}

delete(i.a.accesses, acc.frontendToken)
if acc, found := i.agent.accesses[req.FrontendToken]; found {
i.agent.rmAccess <- acc
logrus.Infof("released access '%v'", acc.frontendToken)

} else {
return nil, errors.Errorf("agent has no access with frontend token '%v'", req.FrontendToken)
}
Expand Down
16 changes: 3 additions & 13 deletions agent/releaseShare.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,15 @@ package agent
import (
"context"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func (i *agentGrpcImpl) ReleaseShare(_ context.Context, req *agentGrpc.ReleaseShareRequest) (*agentGrpc.ReleaseShareResponse, error) {
if shr, found := i.a.shares[req.Token]; found {
logrus.Infof("stopping share '%v'", shr.token)

if err := proctree.StopChild(shr.process); err != nil {
logrus.Error(err)
}

if err := proctree.WaitChild(shr.process); err != nil {
logrus.Error(err)
}

delete(i.a.shares, shr.token)
if shr, found := i.agent.shares[req.Token]; found {
i.agent.rmShare <- shr
logrus.Infof("released share '%v'", shr.token)

} else {
return nil, errors.Errorf("agent has no share with token '%v'", req.Token)
}
Expand Down
4 changes: 2 additions & 2 deletions agent/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ type share struct {
bootComplete chan struct{}
bootErr error

a *Agent
agent *Agent
}

func (s *share) monitor() {
if err := proctree.WaitChild(s.process); err != nil {
pfxlog.ChannelLogger(s.token).Error(err)
}
s.a.outShares <- s
s.agent.rmShare <- s
}

func (s *share) tail(data []byte) {
Expand Down
6 changes: 3 additions & 3 deletions agent/sharePrivate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
}

shrCmd := []string{os.Args[0], "share", "private", "--agent", "-b", req.BackendMode}
shrCmd := []string{os.Args[0], "share", "private", "--subordinate", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PrivateShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
bootComplete: make(chan struct{}),
a: i.a,
agent: i.agent,
}

if req.Insecure {
Expand Down Expand Up @@ -58,7 +58,7 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
<-shr.bootComplete

if shr.bootErr == nil {
i.a.inShares <- shr
i.agent.addShare <- shr
return &agentGrpc.SharePrivateResponse{Token: shr.token}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions agent/sharePublic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
}

shrCmd := []string{os.Args[0], "share", "public", "--agent", "-b", req.BackendMode}
shrCmd := []string{os.Args[0], "share", "public", "--subordinate", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
bootComplete: make(chan struct{}),
a: i.a,
agent: i.agent,
}

for _, basicAuth := range req.BasicAuth {
Expand Down Expand Up @@ -82,7 +82,7 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
<-shr.bootComplete

if shr.bootErr == nil {
i.a.inShares <- shr
i.agent.addShare <- shr
return &agentGrpc.SharePublicResponse{
Token: shr.token,
FrontendEndpoints: shr.frontendEndpoints,
Expand Down
Loading

0 comments on commit 73b3203

Please sign in to comment.