Skip to content

Commit

Permalink
Agent/beats grpc comms over domain socket/named pipe (#4249)
Browse files Browse the repository at this point in the history
* Agent/beats grpc comms over domain socket/named pipe

* Add changelog fragment

* Fix log message typo

Co-authored-by: Leszek Kubik <[email protected]>

* Implement domain socket/named pipe support for connection info endpoint

* Remove fmt.Print from test

* Remove leftover commented line from utz

* Fix windows related utz

* format imports in one test file

* Address code review

* Set Agent RPC to use domain sockets/named pipes by default. Update the configuration struct based on code review feedback

* Adjut grpc port from int16 to int32. Adjust unit tests

* Make local rpc socket name configurable

* Fail on empty local socket address

* Remove default socket name

* Use TCP gRPC for comms and local (domain socket/named pipe) for connection info server.

* Rollback the unit test changes, because the local gRPC configuration is disabled

---------

Co-authored-by: Pierre HILBERT <[email protected]>
Co-authored-by: Leszek Kubik <[email protected]>
  • Loading branch information
3 people authored Jun 10, 2024
1 parent 452afdf commit efd9ee6
Show file tree
Hide file tree
Showing 20 changed files with 683 additions and 193 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Agent/beats grpc comms over domain socket/named pipe

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/4249

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/4248
1 change: 0 additions & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func New(
runtime, err := runtime.NewManager(
log,
baseLogger,
cfg.Settings.GRPC.String(),
agentInfo,
tracer,
monitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
require.NoError(t, err)

monitoringMgr := newTestMonitoringMgr()
rm, err := runtime.NewManager(l, l, "localhost:0", ai, apmtest.DiscardTracer, monitoringMgr, configuration.DefaultGRPCConfig())
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, configuration.DefaultGRPCConfig())
require.NoError(t, err)

caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
Expand Down
19 changes: 15 additions & 4 deletions internal/pkg/agent/configuration/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@

package configuration

import "fmt"
import (
"fmt"
)

// GRPCConfig is a configuration of GRPC server.
type GRPCConfig struct {
Address string `config:"address"`
Port uint16 `config:"port"`
Port uint16 `config:"port"` // [gRPC:8.15] Change to int32 instead of uint16, when Endpoint is ready for local gRPC
MaxMsgSize int `config:"max_message_size"`
CheckinChunkingDisabled bool `config:"checkin_chunking_disabled"`
}

// DefaultGRPCConfig creates a default server configuration.
func DefaultGRPCConfig() *GRPCConfig {
return &GRPCConfig{
Address: "localhost",
Port: 6789,
Address: "localhost",
// [gRPC:8.15] The line below is commented out for 8.14 and should replace the current port default once Endpoint is ready for domain socket gRPC
// Port: -1, // -1 (negative) port value by default enabled "local" rpc utilizing domain sockets and named pipes
Port: 6789, // Set TCP gRPC by default
MaxMsgSize: 1024 * 1024 * 100, // grpc default 4MB is unsufficient for diagnostics
CheckinChunkingDisabled: false, // on by default
}
Expand All @@ -28,3 +32,10 @@ func DefaultGRPCConfig() *GRPCConfig {
func (cfg *GRPCConfig) String() string {
return fmt.Sprintf("%s:%d", cfg.Address, cfg.Port)
}

// IsLocal returns true if port value is less than 0
func (cfg *GRPCConfig) IsLocal() bool {
// [gRPC:8.15] Use the commented implementation once Endpoint is ready for local gRPC
// return cfg.Port < 0
return false
}
15 changes: 13 additions & 2 deletions pkg/component/runtime/conn_info_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/ipc"
)

const (
Expand All @@ -27,8 +28,18 @@ type connInfoServer struct {
stopTimeout time.Duration
}

func newConnInfoServer(log *logger.Logger, comm Communicator, port int) (*connInfoServer, error) {
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
func newConnInfoServer(log *logger.Logger, comm Communicator, address string) (*connInfoServer, error) {
var (
listener net.Listener
err error
)

if ipc.IsLocal(address) {
listener, err = ipc.CreateListener(log, address)
} else {
listener, err = net.Listen("tcp", address)
}

if err != nil {
return nil, fmt.Errorf("failed to start connection credentials listener: %w", err)
}
Expand Down
135 changes: 113 additions & 22 deletions pkg/component/runtime/conn_info_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"io"
"net"
"net/url"
"os"
"runtime"
"syscall"
"testing"
Expand All @@ -22,18 +24,19 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/ipc"
)

type mockCommunicator struct {
ch chan *proto.CheckinObserved
startupInfo *proto.StartUpInfo
}

func newMockCommunicator() *mockCommunicator {
func newMockCommunicator(address string) *mockCommunicator {
return &mockCommunicator{
ch: make(chan *proto.CheckinObserved, 1),
startupInfo: &proto.StartUpInfo{
Addr: getAddress(),
Addr: address,
ServerName: "endpoint",
Token: "some token",
CaCert: []byte("some CA cert"),
Expand Down Expand Up @@ -69,22 +72,82 @@ func (c *mockCommunicator) CheckinObserved() <-chan *proto.CheckinObserved {

const testPort = 6788

func getAddress() string {
// Test Elastic Agent Connection Info sock
const testSock = ".teaci.sock"

func getAddress(dir string, isLocal bool) string {
if isLocal {
u := url.URL{}
u.Path = "/"

if runtime.GOOS == "windows" {
u.Scheme = "npipe"
return u.JoinPath("/", testSock).String()
}

u.Scheme = "unix"
return u.JoinPath(dir, testSock).String()
}
return fmt.Sprintf("127.0.0.1:%d", testPort)
}

func runTests(t *testing.T, fn func(*testing.T, string)) {
sockdir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(sockdir)

tests := []struct {
name string
address string
}{
{
name: "port",
address: getAddress("", false),
},
{
name: "local",
address: getAddress(sockdir, true),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc.address)
})
}
}

func TestConnInfoNormal(t *testing.T) {
runTests(t, testConnInfoNormal)
}

func dialAddress(address string) (net.Conn, error) {
// Connect to the server
if ipc.IsLocal(address) {
return dialLocal(address)
}

return net.Dial("tcp", address)
}

func testConnInfoNormal(t *testing.T, address string) {
log := testutils.NewErrorLogger(t)

comm := newMockCommunicator()
comm := newMockCommunicator(address)

// Start server
srv, err := newConnInfoServer(log, comm, testPort)
srv, err := newConnInfoServer(log, comm, address)
if err != nil {
t.Fatal(err)
}
defer func() {

err := srv.stop()
if ipc.IsLocal(address) {
ipc.CleanupListener(log, address)
}
if err != nil {
t.Fatal(err)
}
Expand All @@ -93,8 +156,7 @@ func TestConnInfoNormal(t *testing.T) {
const count = 2 // read connection info a couple of times to make sure the server keeps working for multiple calls

for i := 0; i < count; i++ {
// Connect to the server
conn, err := net.Dial("tcp", getAddress())
conn, err := dialAddress(address)
if err != nil {
t.Fatal(err)
}
Expand All @@ -119,12 +181,16 @@ func TestConnInfoNormal(t *testing.T) {
}

func TestConnInfoConnCloseThenAnotherConn(t *testing.T) {
runTests(t, testConnInfoConnCloseThenAnotherConn)
}

func testConnInfoConnCloseThenAnotherConn(t *testing.T, address string) {
log := testutils.NewErrorLogger(t)

comm := newMockCommunicator()
comm := newMockCommunicator("")

// Start server
srv, err := newConnInfoServer(log, comm, testPort)
srv, err := newConnInfoServer(log, comm, address)
if err != nil {
t.Fatal(err)
}
Expand All @@ -136,7 +202,7 @@ func TestConnInfoConnCloseThenAnotherConn(t *testing.T) {
}()

// Connect to the server
conn, err := net.Dial("tcp", getAddress())
conn, err := dialAddress(address)
if err != nil {
t.Fatal(err)
}
Expand All @@ -148,7 +214,7 @@ func TestConnInfoConnCloseThenAnotherConn(t *testing.T) {
}

// Connect again after closed
conn, err = net.Dial("tcp", getAddress())
conn, err = dialAddress(address)
if err != nil {
t.Fatal(err)
}
Expand All @@ -172,12 +238,16 @@ func TestConnInfoConnCloseThenAnotherConn(t *testing.T) {
}

func TestConnInfoClosed(t *testing.T) {
runTests(t, testConnInfoClosed)
}

func testConnInfoClosed(t *testing.T, address string) {
log := testutils.NewErrorLogger(t)

comm := newMockCommunicator()
comm := newMockCommunicator("")

// Start server
srv, err := newConnInfoServer(log, comm, testPort)
srv, err := newConnInfoServer(log, comm, address)
if err != nil {
t.Fatal(err)
}
Expand All @@ -187,7 +257,7 @@ func TestConnInfoClosed(t *testing.T) {
t.Fatal(err)
}

_, err = net.Dial("tcp", getAddress())
_, err = dialAddress(address)
if err == nil {
t.Fatal("want non-nil err")
}
Expand All @@ -198,9 +268,19 @@ func TestConnInfoClosed(t *testing.T) {
// causes issue for *nix builds: "imports golang.org/x/sys/windows: build constraints exclude all Go files".
// In order to avoid creating extra plaform specific files compare just errno for this test.
wantErrNo := int(syscall.ECONNREFUSED)
if runtime.GOOS == windows {
wantErrNo = 10061 // windows.WSAECONNREFUSED
if ipc.IsLocal(address) {
if runtime.GOOS == windows {
wantErrNo = 2 // windows.ERROR_FILE_NOT_FOUND
} else {
// For local IPC on *nix the syscall.ENOENT is expected
wantErrNo = int(syscall.ENOENT)
}
} else {
if runtime.GOOS == windows {
wantErrNo = 10061 // windows.WSAECONNREFUSED
}
}

var (
syserr syscall.Errno
errno int
Expand All @@ -216,12 +296,16 @@ func TestConnInfoClosed(t *testing.T) {
}

func TestConnInfoDoubleStop(t *testing.T) {
runTests(t, testConnInfoDoubleStop)
}

func testConnInfoDoubleStop(t *testing.T, address string) {
log := testutils.NewErrorLogger(t)

comm := newMockCommunicator()
comm := newMockCommunicator("")

// Start server
srv, err := newConnInfoServer(log, comm, testPort)
srv, err := newConnInfoServer(log, comm, address)
if err != nil {
t.Fatal(err)
}
Expand All @@ -232,18 +316,25 @@ func TestConnInfoDoubleStop(t *testing.T) {
}

err = srv.stop()
if err == nil {
t.Fatal("want err, got nil ")
// Double close on named pipe doesn't cause the error
if !(ipc.IsLocal(address) && runtime.GOOS == "windows") {
if err == nil {
t.Fatal("want err, got nil ")
}
}
}

func TestConnInfoStopTimeout(t *testing.T) {
runTests(t, testConnInfoStopTimeout)
}

func testConnInfoStopTimeout(t *testing.T, address string) {
log := testutils.NewErrorLogger(t)

comm := newMockCommunicator()
comm := newMockCommunicator("")

// Start server
srv, err := newConnInfoServer(log, comm, testPort)
srv, err := newConnInfoServer(log, comm, address)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit efd9ee6

Please sign in to comment.