Skip to content

Commit

Permalink
Enable multi namespace log collection in e2e test (#1959)
Browse files Browse the repository at this point in the history
* enable multi namespace log collection in e2e test

* Update test/logstream/README.md

Co-authored-by: Victor Agababov <[email protected]>

* Update test/logstream/README.md

Co-authored-by: Victor Agababov <[email protected]>

* Update test/logstream/interface.go

Co-authored-by: Victor Agababov <[email protected]>

* var grouping of global variables

* fix broken build

* Update test/logstream/README.md

Co-authored-by: Victor Agababov <[email protected]>

* Update test/logstream/v2/stream_test.go

Co-authored-by: Victor Agababov <[email protected]>

* Update test/logstream/v2/stream_test.go

Co-authored-by: Victor Agababov <[email protected]>

* Update test/logstream/v2/stream.go

Co-authored-by: Victor Agababov <[email protected]>

* minor misc style fixes

* closing brackets spacing

* Update test/logstream/v2/stream.go

Co-authored-by: Victor Agababov <[email protected]>

Co-authored-by: Victor Agababov <[email protected]>
  • Loading branch information
igorbelianski and vagababov authored Dec 15, 2020
1 parent a1d2289 commit ef8048c
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 83 deletions.
7 changes: 6 additions & 1 deletion test/logstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ This is a guide to start using `logstream` in your e2e testing.
and linking it like
[this](https://github.com/knative/serving/blob/e797247322b5aa35001152d2a2715dbc20a86cc4/test/conformance.go#L20-L23)

2) Test resources must be named with
2. Test resources must be named with
[`test.ObjectNameForTest(t)`](https://github.com/knative/networking/blob/40ef99aa5db0d38730a89a1de7e5b28b8ef6eed5/vendor/knative.dev/pkg/test/helpers/name.go#L50)

3. At the start of your test add: `t.Cleanup(logstream.Start(t))`

4. To enable logcapture from containers across multiple namespaces configure SYSTEM_NAMESPACE
to contains a csv list of namespaces (`knative-serving,knative-test ??????{}`). Specific, well
known containers that do not produce key decorated logs (see detailed description below) need
to be enumerated in WellKnownContainers in stream.go.

With that, you will start getting logs from the processes in the system
namespace interleaved into your test output via `t.Log`.

Expand Down
5 changes: 4 additions & 1 deletion test/logstream/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package logstream
import (
"context"
"os"
"strings"
"sync"

"knative.dev/pkg/system"
Expand Down Expand Up @@ -51,7 +52,9 @@ func Start(t ti) Canceler {
return
}

stream = &shim{logstreamv2.FromNamespace(context.TODO(), kc, ns)}
// handle case when ns contains a csv list
namespaces := strings.Split(ns, ",")
stream = &shim{logstreamv2.FromNamespaces(context.Background(), kc, namespaces)}

} else {
// Otherwise set up a null stream.
Expand Down
109 changes: 65 additions & 44 deletions test/logstream/v2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,28 @@ import (
"knative.dev/pkg/ptr"
)

func FromNamespaces(ctx context.Context, c kubernetes.Interface, namespaces []string) Source {
return &namespaceSource{
ctx: ctx,
kc: c,
namespaces: namespaces,
keys: make(map[string]Callback, 1),
}
}

func FromNamespace(ctx context.Context, c kubernetes.Interface, namespace string) Source {
return &namespaceSource{
ctx: ctx,
kc: c,
namespace: namespace,
keys: make(map[string]Callback, 1),
ctx: ctx,
kc: c,
namespaces: []string{namespace},
keys: make(map[string]Callback, 1),
}
}

type namespaceSource struct {
namespace string
kc kubernetes.Interface
ctx context.Context
namespaces []string
kc kubernetes.Interface
ctx context.Context

m sync.RWMutex
once sync.Once
Expand All @@ -57,7 +66,7 @@ type namespaceSource struct {
func (s *namespaceSource) StartStream(name string, l Callback) (Canceler, error) {
s.once.Do(func() { s.watchErr = s.watchPods() })
if s.watchErr != nil {
return nil, fmt.Errorf("failed to watch pods in namespace %q: %w", s.namespace, s.watchErr)
return nil, fmt.Errorf("failed to watch pods in one of the namespace(s) %q: %w", s.namespaces, s.watchErr)
}

// Register a key
Expand All @@ -74,38 +83,40 @@ func (s *namespaceSource) StartStream(name string, l Callback) (Canceler, error)
}

func (s *namespaceSource) watchPods() error {
wi, err := s.kc.CoreV1().Pods(s.namespace).Watch(s.ctx, metav1.ListOptions{})
if err != nil {
return err
}

go func() {
defer wi.Stop()
watchedPods := sets.NewString()
for _, ns := range s.namespaces {
wi, err := s.kc.CoreV1().Pods(ns).Watch(s.ctx, metav1.ListOptions{})
if err != nil {
return err
}

for {
select {
case <-s.ctx.Done():
return
case ev := <-wi.ResultChan():
// We have reports of this being randomly nil.
if ev.Object == nil || reflect.ValueOf(ev.Object).IsNil() {
continue
}
p := ev.Object.(*corev1.Pod)
switch ev.Type {
case watch.Deleted:
watchedPods.Delete(p.Name)
case watch.Added, watch.Modified:
if !watchedPods.Has(p.Name) && isPodReady(p) {
watchedPods.Insert(p.Name)
s.startForPod(p)
go func() {
defer wi.Stop()
watchedPods := sets.NewString()

for {
select {
case <-s.ctx.Done():
return
case ev := <-wi.ResultChan():
// We have reports of this being randomly nil.
if ev.Object == nil || reflect.ValueOf(ev.Object).IsNil() {
continue
}
p := ev.Object.(*corev1.Pod)
switch ev.Type {
case watch.Deleted:
watchedPods.Delete(p.Name)
case watch.Added, watch.Modified:
if !watchedPods.Has(p.Name) && isPodReady(p) {
watchedPods.Insert(p.Name)
s.startForPod(p)
}
}
}

}
}
}
}()
}()
}

return nil
}
Expand All @@ -119,9 +130,12 @@ func (s *namespaceSource) startForPod(pod *corev1.Pod) {
psn, pn, cn := pod.Namespace, pod.Name, container.Name

handleLine := s.handleLine
if cn == ChaosDuck {
// Specialcase logs from chaosduck to be able to easily see when pods
// have been killed throughout all tests.
if wellKnownContainers.Has(cn) {
// Specialcase logs from chaosduck, queueproxy etc.
// - ChaosDuck logs enable easy
// monitoring of killed pods throughout all tests.
// - QueueProxy logs enable
// debugging troubleshooting data plane request handling issues.
handleLine = s.handleGenericLine
}

Expand All @@ -137,13 +151,13 @@ func (s *namespaceSource) startForPod(pod *corev1.Pod) {
req := s.kc.CoreV1().Pods(psn).GetLogs(pn, options)
stream, err := req.Stream(context.Background())
if err != nil {
s.handleGenericLine([]byte(err.Error()), pn)
s.handleGenericLine([]byte(err.Error()), pn, cn)
return
}
defer stream.Close()
// Read this container's stream.
for scanner := bufio.NewScanner(stream); scanner.Scan(); {
handleLine(scanner.Bytes(), pn)
handleLine(scanner.Bytes(), pn, cn)
}
// Pods get killed with chaos duck, so logs might end
// before the test does. So don't report an error here.
Expand All @@ -167,9 +181,16 @@ const (
timeFormat = "15:04:05.000"
// ChaosDuck is the well known name for the chaosduck.
ChaosDuck = "chaosduck"
// QueueProxy is the well known name for the queueproxy.
QueueProxy = "queueproxy"
)

func (s *namespaceSource) handleLine(l []byte, pod string) {
// Names of well known containers that do not produce nicely formatted logs that
// could be easily filtered and parsed by handleLine. Logs from these containers
// are captured without filtering.
var wellKnownContainers = sets.NewString(ChaosDuck, QueueProxy)

func (s *namespaceSource) handleLine(l []byte, pod string, _ string) {
// This holds the standard structure of our logs.
var line struct {
Level string `json:"severity"`
Expand Down Expand Up @@ -231,12 +252,12 @@ func (s *namespaceSource) handleLine(l []byte, pod string) {

// handleGenericLine prints the given logline to all active tests as it cannot be parsed
// and/or doesn't contain any correlation data (like the chaosduck for example).
func (s *namespaceSource) handleGenericLine(l []byte, pod string) {
func (s *namespaceSource) handleGenericLine(l []byte, pod string, cn string) {
s.m.RLock()
defer s.m.RUnlock()

for _, logf := range s.keys {
// I 15:04:05.000 webhook-699b7b668d-9smk2 this is my message
logf("I %s %s %s", time.Now().Format(timeFormat), pod, string(l))
logf("I %s %s %s %s", time.Now().Format(timeFormat), pod, cn, string(l))
}
}
Loading

0 comments on commit ef8048c

Please sign in to comment.