Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-specified connections #513

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/nettop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func getVerbosity(args *inArgs) analyzer.Verbosity {
// (or NetworkPolicies to allow only this connectivity)
func detectTopology(args *inArgs) error {
logger := analyzer.NewDefaultLoggerWithVerbosity(getVerbosity(args))
synth := analyzer.NewPoliciesSynthesizer(analyzer.WithLogger(logger), analyzer.WithDNSPort(*args.DNSPort))
opts := []analyzer.PoliciesSynthesizerOption{analyzer.WithLogger(logger), analyzer.WithDNSPort(*args.DNSPort)}
if *args.connsFile != "" {
opts = append(opts, analyzer.WithConnectionsFile(*args.connsFile))
}
synth := analyzer.NewPoliciesSynthesizer(opts...)

var content interface{}
if args.SynthNetpols != nil && *args.SynthNetpols {
Expand Down
2 changes: 2 additions & 0 deletions cmd/nettop/parse_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type inArgs struct {
OutputFile *string
OutputFormat *string
DNSPort *int
connsFile *string
SynthNetpols *bool
Quiet *bool
Verbose *bool
Expand All @@ -47,6 +48,7 @@ func parseInArgs(cmdlineArgs []string) (*inArgs, error) {
args.OutputFormat = flagset.String("format", jsonFormat, "output format; must be either \"json\" or \"yaml\"")
args.SynthNetpols = flagset.Bool("netpols", false, "whether to synthesize NetworkPolicies to allow only the discovered connections")
args.DNSPort = flagset.Int("dnsport", analyzer.DefaultDNSPort, "DNS port to be used in egress rules of synthesized NetworkPolicies")
args.connsFile = flagset.String("conns", "", "a file specifying connections to enable")
args.Quiet = flagset.Bool("q", false, "runs quietly, reports only severe errors and results")
args.Verbose = flagset.Bool("v", false, "runs with more informative messages printed to log")
err := flagset.Parse(cmdlineArgs)
Expand Down
256 changes: 246 additions & 10 deletions pkg/analyzer/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,34 @@
package analyzer

import (
"bufio"
"fmt"
"os"
"slices"
"strings"

core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

type connectionExtractor struct {
workloads []*Resource
services []*Service
logger Logger
}

// This function is at the core of the topology analysis
// For each resource, it finds other resources that may use it and compiles a list of connections holding these dependencies
func discoverConnections(resources []*Resource, links []*Service, logger Logger) []*Connections {
func (ce *connectionExtractor) discoverConnections() []*Connections {
connections := []*Connections{}
for _, destRes := range resources {
deploymentServices := findServices(destRes, links)
logger.Debugf("services matched to %v: %v", destRes.Resource.Name, deploymentServices)
for _, destRes := range ce.workloads {
deploymentServices := ce.findServices(destRes)
ce.logger.Debugf("services matched to %v: %v", destRes.Resource.Name, deploymentServices)
for _, svc := range deploymentServices {
srcRes := findSource(resources, svc)
srcRes := ce.findSource(svc)
for _, r := range srcRes {
if !r.equals(destRes) {
logger.Debugf("source: %s target: %s link: %s", r.Resource.Name, destRes.Resource.Name, svc.Resource.Name)
ce.logger.Debugf("source: %s target: %s link: %s", r.Resource.Name, destRes.Resource.Name, svc.Resource.Name)
connections = append(connections, &Connections{Source: r, Target: destRes, Link: svc})
}
}
Expand Down Expand Up @@ -62,9 +75,9 @@
}

// findServices returns a list of services that may be in front of a given workload resource
func findServices(resource *Resource, links []*Service) []*Service {
func (ce *connectionExtractor) findServices(resource *Resource) []*Service {
var matchedSvc []*Service
for _, link := range links {
for _, link := range ce.services {
if link.Resource.Namespace != resource.Resource.Namespace {
continue
}
Expand All @@ -79,9 +92,9 @@
}

// findSource returns a list of resources that are likely trying to connect to the given service
func findSource(resources []*Resource, service *Service) []*Resource {
func (ce *connectionExtractor) findSource(service *Service) []*Resource {
tRes := []*Resource{}
for _, resource := range resources {
for _, resource := range ce.workloads {
serviceAddresses := getPossibleServiceAddresses(service, resource)
foundSrc := *resource // We copy the resource so we can specify the ports used by the source found
matched := false
Expand Down Expand Up @@ -133,3 +146,226 @@
}
return false, SvcNetworkAttr{}
}

const (
srcDstDelim = "=>"
endpointsPortDelim = ":"
commentToken = "#"
wildcardToken = "_"
strongWildcardToken = "*"
endpointParts = 3
)

func (ce *connectionExtractor) connectionsFromFile(filename string) ([]*Connections, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()

conns := []*Connections{}

scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
lineNum += 1
if line == "" || strings.HasPrefix(line, commentToken) {
continue
}
lineConns, err := ce.parseConnectionLine(line, lineNum)
if err != nil {
return nil, err
}
conns = slices.Concat(conns, lineConns)
}

if err := scanner.Err(); err != nil {
return nil, err
}

return conns, nil
}

func (ce *connectionExtractor) parseConnectionLine(line string, lineNum int) ([]*Connections, error) {
// Take only the part before # starts a comment
parts := strings.Split(line, commentToken)
if len(parts) == 0 {
return nil, syntaxError("unexpected comment", lineNum)
}

line = parts[0]

parts = strings.Split(line, srcDstDelim)
if len(parts) != 2 {

Check failure on line 200 in pkg/analyzer/connections.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Magic number: 2, in <condition> detected (mnd)
return nil, syntaxError("connection line must have exactly one => separator", lineNum)
}

src := strings.TrimSpace(parts[0])
srcWorkloads, err := ce.parseEndpoints(src, lineNum)
if err != nil {
return nil, err
}

parts = strings.Split(parts[1], endpointsPortDelim)
if len(parts) == 0 {
return nil, syntaxError("missing destination", lineNum)
}
if len(parts) > 2 {

Check failure on line 214 in pkg/analyzer/connections.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Magic number: 2, in <condition> detected (mnd)
return nil, syntaxError("connection line must have at most one | separator", lineNum)
}
dst := strings.TrimSpace(parts[0])
dstWorkloads, err := ce.parseEndpoints(dst, lineNum)
if err != nil {
return nil, err
}

protAndPort := &SvcNetworkAttr{Protocol: core.ProtocolTCP}
if len(parts) == 2 {

Check failure on line 224 in pkg/analyzer/connections.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Magic number: 2, in <condition> detected (mnd)
protAndPort, err = parsePort(parts[1], lineNum)
if err != nil {
return nil, err
}
}

svc := Service{}
svc.Resource.Network = []SvcNetworkAttr{*protAndPort}

conns := []*Connections{}
for _, srcWl := range srcWorkloads {
for _, dstWl := range dstWorkloads {
if srcWl.equals(dstWl) {
continue
}
conns = append(conns, &Connections{
Source: srcWl,
Target: dstWl,
Link: &svc,
})
ce.logger.Infof("Added connection: src: %v, dst: %v, link: %v", srcWl.Resource.Name, dstWl.Resource.Name, svc)
}
}
return conns, nil
}

func (ce *connectionExtractor) parseEndpoints(endpoint string, lineNum int) ([]*Resource, error) {
parts := strings.Split(endpoint, "/")
if len(parts) != endpointParts {
return nil, syntaxError("source and destination must be of the form namespace/kind/name", lineNum)
}
ns, kind, name := parts[0], parts[1], parts[2]
kind = strings.ToUpper(kind[:1]) + kind[1:] // Capitalize kind's first letter

if ns == strongWildcardToken || kind == strongWildcardToken || name == strongWildcardToken {
return ce.parseEndpointWithStrongWildcard(ns, kind, name)
}

var res []*Resource
switch kind {
case service:
res = ce.getWorkloadsBehindMatchingServices(ns, name)
case wildcardToken:
res = slices.Concat(ce.getWorkloadsBehindMatchingServices(ns, name), ce.getMatchingWorkloads(ns, kind, name))
default:
res = ce.getMatchingWorkloads(ns, kind, name)
}
if len(res) == 0 {
return nil, fmt.Errorf("no matching endpoints for %s in the provided manifests", endpoint)
}
return res, nil
}

func (ce *connectionExtractor) parseEndpointWithStrongWildcard(ns, kind, name string) ([]*Resource, error) {
if kind != strongWildcardToken || name != strongWildcardToken {
return nil, fmt.Errorf("bad endpoint pattern %s/%s/%s. Patterns with '*' should either equal '*/*/*' "+
"or have the form '<namespace>/*/*'", ns, kind, name)
}

return nil, fmt.Errorf("endpoints containing '*' are not yet supported")

/*res := Resource{}

Check failure on line 286 in pkg/analyzer/connections.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentedOutCode: may want to remove commented-out code (gocritic)
if ns != strongWildcardToken {
if len(validation.IsDNS1123Subdomain(ns)) != 0 {
return nil, fmt.Errorf("%s is not a proper namespace name", ns)
}
res.Resource.Namespace = ns
}
return []*Resource{&res}, nil*/
}

func (ce *connectionExtractor) getWorkloadsBehindMatchingServices(ns, svcName string) []*Resource {
workloads := []*Resource{}
for _, svc := range ce.services {
if strMatch(svc.Resource.Namespace, ns) && strMatch(svc.Resource.Name, svcName) {
workloads = slices.Concat(workloads, ce.workloadsOfSvc(svc))
}
}
return workloads
}

func (ce *connectionExtractor) workloadsOfSvc(svc *Service) []*Resource {
svcWorkloads := []*Resource{}
for _, workload := range ce.workloads {
if workload.Resource.Namespace == svc.Resource.Namespace &&
areSelectorsContained(workload.Resource.Labels, svc.Resource.Selectors) {
svcWorkloads = append(svcWorkloads, workload)
}
}
return svcWorkloads
}

func (ce *connectionExtractor) getMatchingWorkloads(ns, kind, name string) []*Resource {
workloads := []*Resource{}
for _, workload := range ce.workloads {
if strMatch(workload.Resource.Namespace, ns) && strMatch(workload.Resource.Kind, kind) &&
strMatch(workload.Resource.Name, name) {
workloads = append(workloads, workload)
}
}
return workloads
}

func parsePort(spec string, lineNum int) (*SvcNetworkAttr, error) {
protocol := core.ProtocolTCP
var port *intstr.IntOrString

parts := strings.Fields(spec)
switch len(parts) {
case 0:
case 2:

Check failure on line 335 in pkg/analyzer/connections.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Magic number: 2, in <case> detected (mnd)
parsedPort := intstr.Parse(parts[1])
port = &parsedPort
fallthrough
case 1:
var err error
protocol, err = parseProtocol(parts[0], lineNum)
if err != nil {
return nil, err
}
default:
return nil, syntaxError("port definition should have the form \"<protocol> [<port>]\"", lineNum)
}

ret := &SvcNetworkAttr{Protocol: protocol}
if port != nil {
ret.TargetPort = *port
}

return ret, nil
}

func parseProtocol(protocol string, lineNum int) (core.Protocol, error) {
protocols := []string{string(core.ProtocolTCP), string(core.ProtocolUDP), string(core.ProtocolSCTP)}
if !slices.Contains(protocols, protocol) {
return "", syntaxError("protocol must be one of TCP, UDP, SCTP", lineNum)
}
return core.Protocol(protocol), nil
}

func strMatch(str, pattern string) bool {
return pattern == wildcardToken || str == pattern
}

func syntaxError(errorStr string, lineNum int) error {
return fmt.Errorf("syntax error in line %d: %s", lineNum, errorStr)
}
49 changes: 49 additions & 0 deletions pkg/analyzer/connections_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package analyzer

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

func TestSelector(t *testing.T) {
testStr := "key1=val1, key2=val2"
reqs, err := labels.ParseToRequirements(testStr)
if err != nil {
t.Fatalf("Conversion error: %v", err)
}

res := map[string]string{}
for _, req := range reqs {
if req.Operator() != selection.Equals {
t.Fatalf("Wrong operator: %s", req.Operator())
}
res[req.Key()] = req.Values().List()[0]
}

t.Logf("labels: %v", res)
}

func TestConnectionsFile(t *testing.T) {
logger := NewDefaultLogger()
sockshopDir := filepath.Join(getTestsDir(), "sockshop")
manifestsDir := filepath.Join(sockshopDir, "manifests")
mf := manifestFinder{logger, false, filepath.WalkDir}
manifestFiles, fileErrors := mf.searchForManifestsInDirs([]string{manifestsDir})
require.Empty(t, fileErrors)

resAcc := newResourceAccumulator(logger, false)
parseErrors := resAcc.parseK8sYamls(manifestFiles)
require.Empty(t, parseErrors)

ce := connectionExtractor{workloads: resAcc.workloads, services: resAcc.services, logger: logger}
connections := ce.discoverConnections()
require.NotEmpty(t, connections)
connFilePath := filepath.Join(sockshopDir, "connections.txt")
fileConns, err := ce.connectionsFromFile(connFilePath)
require.Nil(t, err)
require.Len(t, fileConns, 15)
}
Loading
Loading