Skip to content

Commit

Permalink
Add nodelatencystats REST implementation (#6479)
Browse files Browse the repository at this point in the history
Implement REST server for NodeLatencyStats in v1alpha1.stats.antrea.io
With this change the feature is now usable.
`kubectl get nodelatencystats` will display the latest latency information.

For #5514 

Signed-off-by: Asklv <[email protected]>
Signed-off-by: Antonin Bas <[email protected]>
Co-authored-by: Antonin Bas <[email protected]>
  • Loading branch information
IRONICBo and antoninbas authored Jul 25, 2024
1 parent fa1ceb7 commit 4d9953f
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 12 deletions.
108 changes: 96 additions & 12 deletions pkg/apiserver/registry/stats/nodelatencystats/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ package nodelatencystats

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metatable "k8s.io/apimachinery/pkg/api/meta/table"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/tools/cache"

statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1"
)

type REST struct {
indexer cache.Indexer
}

var (
Expand All @@ -39,7 +45,9 @@ var (

// NewREST returns a REST object that will work against API services.
func NewREST() *REST {
return &REST{}
return &REST{
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}),
}
}

func (r *REST) New() runtime.Object {
Expand All @@ -50,38 +58,114 @@ func (r *REST) Destroy() {
}

func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
// TODO: fill this function in next PR
return &statsv1alpha1.NodeLatencyStats{}, nil
// Update will add the object if the key does not exist.
summary := obj.(*statsv1alpha1.NodeLatencyStats)
if err := r.indexer.Update(summary); err != nil {
return nil, errors.NewInternalError(err)
}

return summary, nil
}

func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
// TODO: fill this function in next PR
return &statsv1alpha1.NodeLatencyStats{}, nil
obj, exists, err := r.indexer.GetByKey(name)
if err != nil {
return nil, errors.NewInternalError(err)
}
if !exists {
return nil, errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), name)
}

return obj.(*statsv1alpha1.NodeLatencyStats), nil
}

func (r *REST) NewList() runtime.Object {
return &statsv1alpha1.NodeLatencyStats{}
}

func (r *REST) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
// TODO: fill this function in next PR
return &statsv1alpha1.NodeLatencyStatsList{}, nil
objs := r.indexer.List()

// Due to the unordered nature of map iteration and the complexity of controlling 'continue',
// we will ignore paging here and plan to implement it in the future.

entries := make([]statsv1alpha1.NodeLatencyStats, 0, len(objs))
for _, obj := range objs {
entries = append(entries, *obj.(*statsv1alpha1.NodeLatencyStats))
}

return &statsv1alpha1.NodeLatencyStatsList{Items: entries}, nil
}

func (r *REST) ConvertToTable(ctx context.Context, obj runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
// TODO: fill this function in next PR
table := &metav1.Table{
ColumnDefinitions: []metav1.TableColumnDefinition{
{Name: "SourceNodeName", Type: "string", Format: "name", Description: "Source node name."},
{Name: "NodeLatencyStatsList", Type: "array", Format: "string", Description: "Node IP latency list."},
{Name: "Node Name", Type: "string", Format: "name", Description: "Name of Node from which latency was measured."},
{Name: "Num Latency Entries", Type: "integer", Format: "int64", Description: "Number of peers for which latency measurements are available."},
{Name: "Avg Latency", Type: "string", Format: "", Description: "Average latency value across all peers."},
{Name: "Max Latency", Type: "string", Format: "", Description: "Largest latency value across all peers."},
},
}
if m, err := meta.ListAccessor(obj); err == nil {
table.ResourceVersion = m.GetResourceVersion()
table.Continue = m.GetContinue()
table.RemainingItemCount = m.GetRemainingItemCount()
} else {
if m, err := meta.CommonAccessor(obj); err == nil {
table.ResourceVersion = m.GetResourceVersion()
}
}

return table, nil
var err error
table.Rows, err = metatable.MetaToTableRow(obj, func(obj runtime.Object, m metav1.Object, name, age string) ([]interface{}, error) {
summary := obj.(*statsv1alpha1.NodeLatencyStats)

// Calculate the max and average latency values.
peerNodeLatencyEntriesCount := len(summary.PeerNodeLatencyStats)
var targetIPLatencyCount int64
var maxLatency int64
var avgLatency int64

for i := range summary.PeerNodeLatencyStats {
targetIPLatency := summary.PeerNodeLatencyStats[i]

for j := range targetIPLatency.TargetIPLatencyStats {
targetIPLatencyCount++
currentLatency := targetIPLatency.TargetIPLatencyStats[j].LastMeasuredRTTNanoseconds
if currentLatency > maxLatency {
maxLatency = currentLatency
}

// Due to int64 max value is enough for the sum of all latencies,
// we don't need to check overflow in this case.
avgLatency += currentLatency
}
}

if targetIPLatencyCount > 0 {
avgLatency = avgLatency / targetIPLatencyCount
}

return []interface{}{name, peerNodeLatencyEntriesCount, time.Duration(avgLatency).String(), time.Duration(maxLatency).String()}, nil
})
return table, err
}

func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
return &statsv1alpha1.NodeLatencyStats{}, true, nil
// Ignore the deleteValidation and options for now.
obj, exists, err := r.indexer.GetByKey(name)
if err != nil {
return nil, false, errors.NewInternalError(err)
}
if !exists {
return nil, false, errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), name)
}

if err = r.indexer.Delete(obj); err != nil {
return nil, false, errors.NewInternalError(err)
}

return obj.(*statsv1alpha1.NodeLatencyStats), true, nil
}

func (r *REST) NamespaceScoped() bool {
Expand Down
206 changes: 206 additions & 0 deletions pkg/apiserver/registry/stats/nodelatencystats/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,209 @@
// limitations under the License.

package nodelatencystats

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1"
)

func TestREST(t *testing.T) {
r := NewREST()
assert.Equal(t, &statsv1alpha1.NodeLatencyStats{}, r.New())
assert.Equal(t, &statsv1alpha1.NodeLatencyStats{}, r.NewList())
assert.False(t, r.NamespaceScoped())
}

func TestRESTCreate(t *testing.T) {
summary := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
}
expectedObj := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
}

r := NewREST()
ctx := context.Background()

obj, err := r.Create(ctx, summary, nil, nil)
require.NoError(t, err)
assert.Equal(t, expectedObj, obj)
}

func TestRESTGet(t *testing.T) {
tests := []struct {
name string
summary *statsv1alpha1.NodeLatencyStats
nodeName string
expectedObj runtime.Object
err error
}{
{
name: "get summary",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node1",
expectedObj: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
err: nil,
},
{
name: "get summary not found",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node2",
expectedObj: nil,
err: errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), "node2"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, tt.summary, nil, nil)
require.NoError(t, err)

obj, err := r.Get(ctx, tt.nodeName, nil)
if tt.err != nil {
assert.EqualError(t, tt.err, err.Error())
} else {
require.NoError(t, err)
assert.Equal(t, tt.expectedObj, obj)
}
})
}
}

func TestRESTDelete(t *testing.T) {
tests := []struct {
name string
summary *statsv1alpha1.NodeLatencyStats
nodeName string
expectedObj runtime.Object
err error
}{
{
name: "delete summary",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node1",
expectedObj: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
err: nil,
},
{
name: "delete summary not found",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node2",
expectedObj: nil,
err: errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), "node2"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, tt.summary, nil, nil)
require.NoError(t, err)
obj, deleted, err := r.Delete(ctx, tt.nodeName, nil, nil)
if tt.err != nil {
assert.EqualError(t, tt.err, err.Error())
} else {
require.NoError(t, err)
assert.True(t, deleted)
assert.Equal(t, tt.expectedObj, obj)
}
})
}
}

func TestRESTList(t *testing.T) {
summary := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
}
expectedObj := &statsv1alpha1.NodeLatencyStatsList{
Items: []statsv1alpha1.NodeLatencyStats{
{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
},
}

r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, summary, nil, nil)
require.NoError(t, err)
objs, err := r.List(ctx, nil)
require.NoError(t, err)
assert.Equal(t, expectedObj, objs)
}

func TestRESTConvertToTable(t *testing.T) {
mockTime := time.Date(2024, time.January, 1, 0, 0, 0, 0, time.UTC)
summary := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: []statsv1alpha1.PeerNodeLatencyStats{
{
NodeName: "node2",
TargetIPLatencyStats: []statsv1alpha1.TargetIPLatencyStats{
{
TargetIP: "192.168.0.2",
LastSendTime: metav1.Time{Time: mockTime},
LastRecvTime: metav1.Time{Time: mockTime},
LastMeasuredRTTNanoseconds: 1000000,
},
},
},
{
NodeName: "node3",
TargetIPLatencyStats: []statsv1alpha1.TargetIPLatencyStats{
{
TargetIP: "192.168.0.3",
LastSendTime: metav1.Time{Time: mockTime},
LastRecvTime: metav1.Time{Time: mockTime},
LastMeasuredRTTNanoseconds: 2000000,
},
},
},
},
}
expectedCells := []interface{}{"node1", 2, "1.5ms", "2ms"}

r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, summary, nil, nil)
require.NoError(t, err)
obj, err := r.ConvertToTable(ctx, summary, nil)
require.NoError(t, err)
assert.Equal(t, expectedCells, obj.Rows[0].Cells)
}

0 comments on commit 4d9953f

Please sign in to comment.