diff --git a/pkg/apiserver/registry/stats/nodelatencystats/rest.go b/pkg/apiserver/registry/stats/nodelatencystats/rest.go index 189979ce8b5..b7f05e545bc 100644 --- a/pkg/apiserver/registry/stats/nodelatencystats/rest.go +++ b/pkg/apiserver/registry/stats/nodelatencystats/rest.go @@ -16,16 +16,22 @@ package nodelatencystats import ( "context" + "time" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metatable "k8s.io/apimachinery/pkg/api/meta/table" "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/client-go/tools/cache" statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1" ) type REST struct { + indexer cache.Indexer } var ( @@ -39,7 +45,9 @@ var ( // NewREST returns a REST object that will work against API services. func NewREST() *REST { - return &REST{} + return &REST{ + indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}), + } } func (r *REST) New() runtime.Object { @@ -50,13 +58,25 @@ func (r *REST) Destroy() { } func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { - // TODO: fill this function in next PR - return &statsv1alpha1.NodeLatencyStats{}, nil + // Update will add the object if the key does not exist. + summary := obj.(*statsv1alpha1.NodeLatencyStats) + if err := r.indexer.Update(summary); err != nil { + return nil, errors.NewInternalError(err) + } + + return summary, nil } func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { - // TODO: fill this function in next PR - return &statsv1alpha1.NodeLatencyStats{}, nil + obj, exists, err := r.indexer.GetByKey(name) + if err != nil { + return nil, errors.NewInternalError(err) + } + if !exists { + return nil, errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), name) + } + + return obj.(*statsv1alpha1.NodeLatencyStats), nil } func (r *REST) NewList() runtime.Object { @@ -64,24 +84,88 @@ func (r *REST) NewList() runtime.Object { } func (r *REST) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) { - // TODO: fill this function in next PR - return &statsv1alpha1.NodeLatencyStatsList{}, nil + objs := r.indexer.List() + + // Due to the unordered nature of map iteration and the complexity of controlling 'continue', + // we will ignore paging here and plan to implement it in the future. + + entries := make([]statsv1alpha1.NodeLatencyStats, 0, len(objs)) + for _, obj := range objs { + entries = append(entries, *obj.(*statsv1alpha1.NodeLatencyStats)) + } + + return &statsv1alpha1.NodeLatencyStatsList{Items: entries}, nil } func (r *REST) ConvertToTable(ctx context.Context, obj runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { - // TODO: fill this function in next PR table := &metav1.Table{ ColumnDefinitions: []metav1.TableColumnDefinition{ - {Name: "SourceNodeName", Type: "string", Format: "name", Description: "Source node name."}, - {Name: "NodeLatencyStatsList", Type: "array", Format: "string", Description: "Node IP latency list."}, + {Name: "Node Name", Type: "string", Format: "name", Description: "Name of Node from which latency was measured."}, + {Name: "Num Latency Entries", Type: "integer", Format: "int64", Description: "Number of peers for which latency measurements are available."}, + {Name: "Avg Latency", Type: "string", Format: "", Description: "Average latency value across all peers."}, + {Name: "Max Latency", Type: "string", Format: "", Description: "Largest latency value across all peers."}, }, } + if m, err := meta.ListAccessor(obj); err == nil { + table.ResourceVersion = m.GetResourceVersion() + table.Continue = m.GetContinue() + table.RemainingItemCount = m.GetRemainingItemCount() + } else { + if m, err := meta.CommonAccessor(obj); err == nil { + table.ResourceVersion = m.GetResourceVersion() + } + } - return table, nil + var err error + table.Rows, err = metatable.MetaToTableRow(obj, func(obj runtime.Object, m metav1.Object, name, age string) ([]interface{}, error) { + summary := obj.(*statsv1alpha1.NodeLatencyStats) + + // Calculate the max and average latency values. + peerNodeLatencyEntriesCount := len(summary.PeerNodeLatencyStats) + var targetIPLatencyCount int64 + var maxLatency int64 + var avgLatency int64 + + for i := range summary.PeerNodeLatencyStats { + targetIPLatency := summary.PeerNodeLatencyStats[i] + + for j := range targetIPLatency.TargetIPLatencyStats { + targetIPLatencyCount++ + currentLatency := targetIPLatency.TargetIPLatencyStats[j].LastMeasuredRTTNanoseconds + if currentLatency > maxLatency { + maxLatency = currentLatency + } + + // Due to int64 max value is enough for the sum of all latencies, + // we don't need to check overflow in this case. + avgLatency += currentLatency + } + } + + if targetIPLatencyCount > 0 { + avgLatency = avgLatency / targetIPLatencyCount + } + + return []interface{}{name, peerNodeLatencyEntriesCount, time.Duration(avgLatency).String(), time.Duration(maxLatency).String()}, nil + }) + return table, err } func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { - return &statsv1alpha1.NodeLatencyStats{}, true, nil + // Ignore the deleteValidation and options for now. + obj, exists, err := r.indexer.GetByKey(name) + if err != nil { + return nil, false, errors.NewInternalError(err) + } + if !exists { + return nil, false, errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), name) + } + + if err = r.indexer.Delete(obj); err != nil { + return nil, false, errors.NewInternalError(err) + } + + return obj.(*statsv1alpha1.NodeLatencyStats), true, nil } func (r *REST) NamespaceScoped() bool { diff --git a/pkg/apiserver/registry/stats/nodelatencystats/rest_test.go b/pkg/apiserver/registry/stats/nodelatencystats/rest_test.go index d610d014187..8a5d1071e90 100644 --- a/pkg/apiserver/registry/stats/nodelatencystats/rest_test.go +++ b/pkg/apiserver/registry/stats/nodelatencystats/rest_test.go @@ -13,3 +13,209 @@ // limitations under the License. package nodelatencystats + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1" +) + +func TestREST(t *testing.T) { + r := NewREST() + assert.Equal(t, &statsv1alpha1.NodeLatencyStats{}, r.New()) + assert.Equal(t, &statsv1alpha1.NodeLatencyStats{}, r.NewList()) + assert.False(t, r.NamespaceScoped()) +} + +func TestRESTCreate(t *testing.T) { + summary := &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + } + expectedObj := &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + } + + r := NewREST() + ctx := context.Background() + + obj, err := r.Create(ctx, summary, nil, nil) + require.NoError(t, err) + assert.Equal(t, expectedObj, obj) +} + +func TestRESTGet(t *testing.T) { + tests := []struct { + name string + summary *statsv1alpha1.NodeLatencyStats + nodeName string + expectedObj runtime.Object + err error + }{ + { + name: "get summary", + summary: &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + nodeName: "node1", + expectedObj: &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + err: nil, + }, + { + name: "get summary not found", + summary: &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + nodeName: "node2", + expectedObj: nil, + err: errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), "node2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := NewREST() + ctx := context.Background() + + _, err := r.Create(ctx, tt.summary, nil, nil) + require.NoError(t, err) + + obj, err := r.Get(ctx, tt.nodeName, nil) + if tt.err != nil { + assert.EqualError(t, tt.err, err.Error()) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedObj, obj) + } + }) + } +} + +func TestRESTDelete(t *testing.T) { + tests := []struct { + name string + summary *statsv1alpha1.NodeLatencyStats + nodeName string + expectedObj runtime.Object + err error + }{ + { + name: "delete summary", + summary: &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + nodeName: "node1", + expectedObj: &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + err: nil, + }, + { + name: "delete summary not found", + summary: &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + nodeName: "node2", + expectedObj: nil, + err: errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), "node2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := NewREST() + ctx := context.Background() + + _, err := r.Create(ctx, tt.summary, nil, nil) + require.NoError(t, err) + obj, deleted, err := r.Delete(ctx, tt.nodeName, nil, nil) + if tt.err != nil { + assert.EqualError(t, tt.err, err.Error()) + } else { + require.NoError(t, err) + assert.True(t, deleted) + assert.Equal(t, tt.expectedObj, obj) + } + }) + } +} + +func TestRESTList(t *testing.T) { + summary := &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + } + expectedObj := &statsv1alpha1.NodeLatencyStatsList{ + Items: []statsv1alpha1.NodeLatencyStats{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: nil, + }, + }, + } + + r := NewREST() + ctx := context.Background() + + _, err := r.Create(ctx, summary, nil, nil) + require.NoError(t, err) + objs, err := r.List(ctx, nil) + require.NoError(t, err) + assert.Equal(t, expectedObj, objs) +} + +func TestRESTConvertToTable(t *testing.T) { + mockTime := time.Date(2024, time.January, 1, 0, 0, 0, 0, time.UTC) + summary := &statsv1alpha1.NodeLatencyStats{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + PeerNodeLatencyStats: []statsv1alpha1.PeerNodeLatencyStats{ + { + NodeName: "node2", + TargetIPLatencyStats: []statsv1alpha1.TargetIPLatencyStats{ + { + TargetIP: "192.168.0.2", + LastSendTime: metav1.Time{Time: mockTime}, + LastRecvTime: metav1.Time{Time: mockTime}, + LastMeasuredRTTNanoseconds: 1000000, + }, + }, + }, + { + NodeName: "node3", + TargetIPLatencyStats: []statsv1alpha1.TargetIPLatencyStats{ + { + TargetIP: "192.168.0.3", + LastSendTime: metav1.Time{Time: mockTime}, + LastRecvTime: metav1.Time{Time: mockTime}, + LastMeasuredRTTNanoseconds: 2000000, + }, + }, + }, + }, + } + expectedCells := []interface{}{"node1", 2, "1.5ms", "2ms"} + + r := NewREST() + ctx := context.Background() + + _, err := r.Create(ctx, summary, nil, nil) + require.NoError(t, err) + obj, err := r.ConvertToTable(ctx, summary, nil) + require.NoError(t, err) + assert.Equal(t, expectedCells, obj.Rows[0].Cells) +}