diff --git a/test/logstream/README.md b/test/logstream/README.md index b644fa744a..5bece38b37 100644 --- a/test/logstream/README.md +++ b/test/logstream/README.md @@ -12,11 +12,16 @@ This is a guide to start using `logstream` in your e2e testing. and linking it like [this](https://github.com/knative/serving/blob/e797247322b5aa35001152d2a2715dbc20a86cc4/test/conformance.go#L20-L23) -2) Test resources must be named with +2. Test resources must be named with [`test.ObjectNameForTest(t)`](https://github.com/knative/networking/blob/40ef99aa5db0d38730a89a1de7e5b28b8ef6eed5/vendor/knative.dev/pkg/test/helpers/name.go#L50) 3. At the start of your test add: `t.Cleanup(logstream.Start(t))` +4. To enable logcapture from containers across multiple namespaces configure SYSTEM_NAMESPACE + to contains a csv list of namespaces (`knative-serving,knative-test ??????{}`). Specific, well + known containers that do not produce key decorated logs (see detailed description below) need + to be enumerated in WellKnownContainers in stream.go. + With that, you will start getting logs from the processes in the system namespace interleaved into your test output via `t.Log`. diff --git a/test/logstream/interface.go b/test/logstream/interface.go index 6c2109d8fb..1ca333b593 100644 --- a/test/logstream/interface.go +++ b/test/logstream/interface.go @@ -19,6 +19,7 @@ package logstream import ( "context" "os" + "strings" "sync" "knative.dev/pkg/system" @@ -51,7 +52,9 @@ func Start(t ti) Canceler { return } - stream = &shim{logstreamv2.FromNamespace(context.TODO(), kc, ns)} + // handle case when ns contains a csv list + namespaces := strings.Split(ns, ",") + stream = &shim{logstreamv2.FromNamespaces(context.Background(), kc, namespaces)} } else { // Otherwise set up a null stream. diff --git a/test/logstream/v2/stream.go b/test/logstream/v2/stream.go index 815519fedd..e22963115f 100644 --- a/test/logstream/v2/stream.go +++ b/test/logstream/v2/stream.go @@ -34,19 +34,28 @@ import ( "knative.dev/pkg/ptr" ) +func FromNamespaces(ctx context.Context, c kubernetes.Interface, namespaces []string) Source { + return &namespaceSource{ + ctx: ctx, + kc: c, + namespaces: namespaces, + keys: make(map[string]Callback, 1), + } +} + func FromNamespace(ctx context.Context, c kubernetes.Interface, namespace string) Source { return &namespaceSource{ - ctx: ctx, - kc: c, - namespace: namespace, - keys: make(map[string]Callback, 1), + ctx: ctx, + kc: c, + namespaces: []string{namespace}, + keys: make(map[string]Callback, 1), } } type namespaceSource struct { - namespace string - kc kubernetes.Interface - ctx context.Context + namespaces []string + kc kubernetes.Interface + ctx context.Context m sync.RWMutex once sync.Once @@ -57,7 +66,7 @@ type namespaceSource struct { func (s *namespaceSource) StartStream(name string, l Callback) (Canceler, error) { s.once.Do(func() { s.watchErr = s.watchPods() }) if s.watchErr != nil { - return nil, fmt.Errorf("failed to watch pods in namespace %q: %w", s.namespace, s.watchErr) + return nil, fmt.Errorf("failed to watch pods in one of the namespace(s) %q: %w", s.namespaces, s.watchErr) } // Register a key @@ -74,38 +83,40 @@ func (s *namespaceSource) StartStream(name string, l Callback) (Canceler, error) } func (s *namespaceSource) watchPods() error { - wi, err := s.kc.CoreV1().Pods(s.namespace).Watch(s.ctx, metav1.ListOptions{}) - if err != nil { - return err - } - - go func() { - defer wi.Stop() - watchedPods := sets.NewString() + for _, ns := range s.namespaces { + wi, err := s.kc.CoreV1().Pods(ns).Watch(s.ctx, metav1.ListOptions{}) + if err != nil { + return err + } - for { - select { - case <-s.ctx.Done(): - return - case ev := <-wi.ResultChan(): - // We have reports of this being randomly nil. - if ev.Object == nil || reflect.ValueOf(ev.Object).IsNil() { - continue - } - p := ev.Object.(*corev1.Pod) - switch ev.Type { - case watch.Deleted: - watchedPods.Delete(p.Name) - case watch.Added, watch.Modified: - if !watchedPods.Has(p.Name) && isPodReady(p) { - watchedPods.Insert(p.Name) - s.startForPod(p) + go func() { + defer wi.Stop() + watchedPods := sets.NewString() + + for { + select { + case <-s.ctx.Done(): + return + case ev := <-wi.ResultChan(): + // We have reports of this being randomly nil. + if ev.Object == nil || reflect.ValueOf(ev.Object).IsNil() { + continue + } + p := ev.Object.(*corev1.Pod) + switch ev.Type { + case watch.Deleted: + watchedPods.Delete(p.Name) + case watch.Added, watch.Modified: + if !watchedPods.Has(p.Name) && isPodReady(p) { + watchedPods.Insert(p.Name) + s.startForPod(p) + } } - } + } } - } - }() + }() + } return nil } @@ -119,9 +130,12 @@ func (s *namespaceSource) startForPod(pod *corev1.Pod) { psn, pn, cn := pod.Namespace, pod.Name, container.Name handleLine := s.handleLine - if cn == ChaosDuck { - // Specialcase logs from chaosduck to be able to easily see when pods - // have been killed throughout all tests. + if wellKnownContainers.Has(cn) { + // Specialcase logs from chaosduck, queueproxy etc. + // - ChaosDuck logs enable easy + // monitoring of killed pods throughout all tests. + // - QueueProxy logs enable + // debugging troubleshooting data plane request handling issues. handleLine = s.handleGenericLine } @@ -137,13 +151,13 @@ func (s *namespaceSource) startForPod(pod *corev1.Pod) { req := s.kc.CoreV1().Pods(psn).GetLogs(pn, options) stream, err := req.Stream(context.Background()) if err != nil { - s.handleGenericLine([]byte(err.Error()), pn) + s.handleGenericLine([]byte(err.Error()), pn, cn) return } defer stream.Close() // Read this container's stream. for scanner := bufio.NewScanner(stream); scanner.Scan(); { - handleLine(scanner.Bytes(), pn) + handleLine(scanner.Bytes(), pn, cn) } // Pods get killed with chaos duck, so logs might end // before the test does. So don't report an error here. @@ -167,9 +181,16 @@ const ( timeFormat = "15:04:05.000" // ChaosDuck is the well known name for the chaosduck. ChaosDuck = "chaosduck" + // QueueProxy is the well known name for the queueproxy. + QueueProxy = "queueproxy" ) -func (s *namespaceSource) handleLine(l []byte, pod string) { +// Names of well known containers that do not produce nicely formatted logs that +// could be easily filtered and parsed by handleLine. Logs from these containers +// are captured without filtering. +var wellKnownContainers = sets.NewString(ChaosDuck, QueueProxy) + +func (s *namespaceSource) handleLine(l []byte, pod string, _ string) { // This holds the standard structure of our logs. var line struct { Level string `json:"severity"` @@ -231,12 +252,12 @@ func (s *namespaceSource) handleLine(l []byte, pod string) { // handleGenericLine prints the given logline to all active tests as it cannot be parsed // and/or doesn't contain any correlation data (like the chaosduck for example). -func (s *namespaceSource) handleGenericLine(l []byte, pod string) { +func (s *namespaceSource) handleGenericLine(l []byte, pod string, cn string) { s.m.RLock() defer s.m.RUnlock() for _, logf := range s.keys { // I 15:04:05.000 webhook-699b7b668d-9smk2 this is my message - logf("I %s %s %s", time.Now().Format(timeFormat), pod, string(l)) + logf("I %s %s %s %s", time.Now().Format(timeFormat), pod, cn, string(l)) } } diff --git a/test/logstream/v2/stream_test.go b/test/logstream/v2/stream_test.go index 202b33712c..4244adc0ee 100644 --- a/test/logstream/v2/stream_test.go +++ b/test/logstream/v2/stream_test.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" @@ -41,19 +42,79 @@ import ( ) const ( - noLogTimeout = 100 * time.Millisecond - testKey = "horror-movie-2020" - testLine = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller","knative.dev/key":"default/horror-movie-2020", "error":"el-otoño-eternal" }` + knativeContainer = "knativeContainer" + userContainer = "userContainer" + noLogTimeout = 100 * time.Millisecond + testKey = "horror-movie-2020" + // default test controller line with all matchin keys and attributes + testLine = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller","knative.dev/key":"default/horror-movie-2020", "error":"el-otoño-eternal" }` + + // test controller line with mismatched key entry (knative.dev/key) + testLineWithMissmatchedKey = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller","knative.dev/key":"default/romcom-1990", "error":"el-otoño-eternal" }` + + // test controller line with missing key entry (knative.dev/key) + testLineWithMissingKey = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller", "error":"el-otoño-eternal" }` + + testNonJSONLine = `Some non-json string produced by controller` + + // this line doesn't have json entry for knative.dev/controller so we expect + // log parsing s to fallback to using "caller" attribute. + testNonControllerLine = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"non_controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/key":"default/horror-movie-2020", "error":"non_controller_error" }` + + testChaosDuckLine = `Some non-json Chaos Duck string` + testQueueProxyLine = `Some non-json Queueproxy string` + testUserContainerLine = `Some non-json user container string` + + testLinePattern = "el-otoño-eternal" + testNonControllerLinePattern = "non_controller_error" ) -var pod = &corev1.Pod{ +// This map determines test log lines to be produced by each fake container +var ( + logProductionMap = map[string][]string{ + knativeContainer: {testLine, testLineWithMissmatchedKey, testLineWithMissingKey, testNonJSONLine, testNonControllerLine}, + logstream.ChaosDuck: {testChaosDuckLine}, + logstream.QueueProxy: {testQueueProxyLine}, + userContainer: {testUserContainerLine}, + } + + singlePod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "RandomPodName", + Namespace: "defaultNameSpace", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: knativeContainer, + }}, + }, + } + + knativePod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "RandomPodName", + Namespace: "defaultNameSpace", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: knativeContainer, + }, { + Name: logstream.ChaosDuck, + }}, + }, + } +) + +var userPod = &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: logstream.ChaosDuck, - Namespace: "default", + Name: "SomeOtherRandomPodName", + Namespace: "usertestNamespace", }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: logstream.ChaosDuck, + Name: logstream.QueueProxy, + }, { + Name: userContainer, }}, }, } @@ -69,15 +130,15 @@ var readyStatus = corev1.PodStatus{ func TestWatchErr(t *testing.T) { f := newK8sFake(fake.NewSimpleClientset(), errors.New("lookin' good"), nil) stream := logstream.FromNamespace(context.Background(), f, "a-namespace") - _, err := stream.StartStream(pod.Name, nil) + _, err := stream.StartStream(knativePod.Name, nil) if err == nil { t.Fatal("LogStream creation should have failed") } } func TestFailToStartStream(t *testing.T) { - pod := pod.DeepCopy() - pod.Status = readyStatus + singlePod := singlePod.DeepCopy() + singlePod.Status = readyStatus const want = "hungry for apples" f := newK8sFake(fake.NewSimpleClientset(), nil, /*watcher*/ @@ -92,8 +153,8 @@ func TestFailToStartStream(t *testing.T) { close(logFuncInvoked) } ctx, cancel := context.WithCancel(context.Background()) - stream := logstream.FromNamespace(ctx, f, pod.Namespace) - streamC, err := stream.StartStream(pod.Name, logFunc) + stream := logstream.FromNamespace(ctx, f, singlePod.Namespace) + streamC, err := stream.StartStream(singlePod.Name, logFunc) if err != nil { t.Fatal("Failed to start the stream: ", err) } @@ -101,8 +162,8 @@ func TestFailToStartStream(t *testing.T) { streamC() cancel() }) - podClient := f.CoreV1().Pods(pod.Namespace) - if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil { + podClient := f.CoreV1().Pods(singlePod.Namespace) + if _, err := podClient.Create(context.Background(), singlePod, metav1.CreateOptions{}); err != nil { t.Fatal("CreatePod()=", err) } @@ -113,27 +174,64 @@ func TestFailToStartStream(t *testing.T) { } } +func processLogEntries(t *testing.T, logFuncInvoked <-chan string, patterns []string) { + expectedLogMatchesSet := sets.NewString(patterns...) + +OUTER: + for len(expectedLogMatchesSet) > 0 { + // we expect exactly len(expectedLogMatchesSet) log entries + // each need to be matched with exactly one pattern from + // patterns... + select { + case <-time.After(noLogTimeout): + t.Error("Timed out: log message wasn't received") + case logLine := <-logFuncInvoked: + + // classify string that we got here + for _, s := range sets.StringKeySet(expectedLogMatchesSet).List() { + if strings.Contains(logLine, s) { + expectedLogMatchesSet.Delete(s) + continue OUTER + } + } + t.Fatal("Unexpected log entry received:", logLine) + } + } + + // now we expected timeout without any logs + select { + case <-time.After(noLogTimeout): + case logLine := <-logFuncInvoked: + t.Fatal("No more logs expected at this point, got:", logLine) + } +} + func TestNamespaceStream(t *testing.T) { - pod := pod.DeepCopy() // Needed to run the test multiple times in a row + knativePod := knativePod.DeepCopy() // Needed to run the test multiple times in a row + userPod := userPod.DeepCopy() f := newK8sFake(fake.NewSimpleClientset(), nil, nil) - logFuncInvoked := make(chan struct{}) + logFuncInvoked := make(chan string) t.Cleanup(func() { close(logFuncInvoked) }) logFunc := func(format string, args ...interface{}) { - logFuncInvoked <- struct{}{} + logFuncInvoked <- fmt.Sprintf(format, args) } ctx, cancel := context.WithCancel(context.Background()) - stream := logstream.FromNamespace(ctx, f, pod.Namespace) + stream := logstream.FromNamespaces(ctx, f, []string{knativePod.Namespace, userPod.Namespace}) streamC, err := stream.StartStream(testKey, logFunc) if err != nil { t.Fatal("Failed to start the stream: ", err) } t.Cleanup(streamC) - podClient := f.CoreV1().Pods(pod.Namespace) - if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil { + podClient := f.CoreV1().Pods(knativePod.Namespace) + if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil { + t.Fatal("CreatePod()=", err) + } + userPodClient := f.CoreV1().Pods(userPod.Namespace) + if _, err := userPodClient.Create(context.Background(), userPod, metav1.CreateOptions{}); err != nil { t.Fatal("CreatePod()=", err) } @@ -143,18 +241,26 @@ func TestNamespaceStream(t *testing.T) { t.Error("Unready pod should not report logs") } - pod.Status = readyStatus - if _, err := podClient.Update(context.Background(), pod, metav1.UpdateOptions{}); err != nil { + knativePod.Status = readyStatus + if _, err := podClient.Update(context.Background(), knativePod, metav1.UpdateOptions{}); err != nil { t.Fatal("UpdatePod()=", err) } - - select { - case <-time.After(noLogTimeout): - t.Error("Timed out: log message wasn't received") - case <-logFuncInvoked: + userPod.Status = readyStatus + if _, err := userPodClient.Update(context.Background(), userPod, metav1.UpdateOptions{}); err != nil { + t.Fatal("UpdatePod()=", err) } - if _, err := podClient.Update(context.Background(), pod, metav1.UpdateOptions{}); err != nil { + // We are expecting to get back 4 log entries: + // 1. non filtered non json entries from queueproxy + // 2. non filtered non json entries from chaosduck + // 3. nicely formatted, filtered(with matching key) entry from knativeContainer + // 4. nicely formatted, filtered(with matching key) entry from knativeContainer (fallback to caller attribubute) + processLogEntries(t, logFuncInvoked, []string{testLinePattern, testNonControllerLinePattern, testChaosDuckLine, testQueueProxyLine}) + + if _, err := podClient.Update(context.Background(), knativePod, metav1.UpdateOptions{}); err != nil { + t.Fatal("UpdatePod()=", err) + } + if _, err := userPodClient.Update(context.Background(), userPod, metav1.UpdateOptions{}); err != nil { t.Fatal("UpdatePod()=", err) } @@ -164,7 +270,10 @@ func TestNamespaceStream(t *testing.T) { t.Error("Repeat updates to the same pod should not trigger GetLogs") } - if err := podClient.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil { + if err := podClient.Delete(context.Background(), knativePod.Name, metav1.DeleteOptions{}); err != nil { + t.Fatal("UpdatePod()=", err) + } + if err := userPodClient.Delete(context.Background(), userPod.Name, metav1.DeleteOptions{}); err != nil { t.Fatal("UpdatePod()=", err) } @@ -174,9 +283,9 @@ func TestNamespaceStream(t *testing.T) { t.Error("Deletion should not trigger GetLogs") } - pod.Spec.Containers[0].Name = "goose-with-a-flair" + knativePod.Spec.Containers[0].Name = "goose-with-a-flair" // Create pod with the same name? Why not. And let's make it ready from the get go. - if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil { + if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil { t.Fatal("CreatePod()=", err) } @@ -187,7 +296,7 @@ func TestNamespaceStream(t *testing.T) { } // Delete again. - if err := podClient.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil { + if err := podClient.Delete(context.Background(), knativePod.Name, metav1.DeleteOptions{}); err != nil { t.Fatal("UpdatePod()=", err) } // Kill the context. @@ -196,7 +305,7 @@ func TestNamespaceStream(t *testing.T) { // We can't assume that the cancel signal doesn't race the pod creation signal, so // we retry a few times to give some leeway. if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) { - if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil { + if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil { return false, err } @@ -205,7 +314,7 @@ func TestNamespaceStream(t *testing.T) { return true, nil case <-logFuncInvoked: t.Log("Log was still produced, trying again...") - if err := podClient.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil { + if err := podClient.Delete(context.Background(), knativePod.Name, metav1.DeleteOptions{}); err != nil { return false, err } return false, nil @@ -258,19 +367,32 @@ func (f *fakeclient) Pods(ns string) v1.PodInterface { } } -func (f *fakePods) GetLogs(name string, opts *corev1.PodLogOptions) *restclient.Request { +func logsForContainer(container string) string { + result := "" + + for _, s := range logProductionMap[container] { + if len(result) > 0 { + result += "\n" + } + result += s + + } + return result +} + +func (f *fakePods) GetLogs(podName string, opts *corev1.PodLogOptions) *restclient.Request { fakeClient := &fakerest.RESTClient{ Client: fakerest.CreateHTTPClient(func(request *http.Request) (*http.Response, error) { resp := &http.Response{ StatusCode: http.StatusOK, Body: ioutil.NopCloser( - strings.NewReader(testLine)), + strings.NewReader(logsForContainer(opts.Container))), } return resp, nil }), NegotiatedSerializer: scheme.Codecs.WithoutConversion(), GroupVersion: schema.GroupVersion{Version: "v1"}, - VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", f.ns, name), + VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", f.ns, podName), } ret := fakeClient.Request() if f.logsErr != nil {