Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nodelatencystats rest apis implementation. #6479

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 96 additions & 12 deletions pkg/apiserver/registry/stats/nodelatencystats/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ package nodelatencystats

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metatable "k8s.io/apimachinery/pkg/api/meta/table"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/tools/cache"

statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1"
)

type REST struct {
indexer cache.Indexer
}

var (
Expand All @@ -39,7 +45,9 @@ var (

// NewREST returns a REST object that will work against API services.
func NewREST() *REST {
return &REST{}
return &REST{
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}),
}
}

func (r *REST) New() runtime.Object {
Expand All @@ -50,38 +58,114 @@ func (r *REST) Destroy() {
}

func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
// TODO: fill this function in next PR
return &statsv1alpha1.NodeLatencyStats{}, nil
// Update will add the object if the key does not exist.
summary := obj.(*statsv1alpha1.NodeLatencyStats)
if err := r.indexer.Update(summary); err != nil {
IRONICBo marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.NewInternalError(err)
}

return summary, nil
}

func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
// TODO: fill this function in next PR
return &statsv1alpha1.NodeLatencyStats{}, nil
obj, exists, err := r.indexer.GetByKey(name)
if err != nil {
return nil, errors.NewInternalError(err)
}
if !exists {
return nil, errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), name)
}

return obj.(*statsv1alpha1.NodeLatencyStats), nil
}

func (r *REST) NewList() runtime.Object {
return &statsv1alpha1.NodeLatencyStats{}
}

func (r *REST) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
// TODO: fill this function in next PR
return &statsv1alpha1.NodeLatencyStatsList{}, nil
objs := r.indexer.List()

// Due to the unordered nature of map iteration and the complexity of controlling 'continue',
// we will ignore paging here and plan to implement it in the future.

entries := make([]statsv1alpha1.NodeLatencyStats, 0, len(objs))
for _, obj := range objs {
entries = append(entries, *obj.(*statsv1alpha1.NodeLatencyStats))
}

return &statsv1alpha1.NodeLatencyStatsList{Items: entries}, nil
}

func (r *REST) ConvertToTable(ctx context.Context, obj runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
IRONICBo marked this conversation as resolved.
Show resolved Hide resolved
// TODO: fill this function in next PR
table := &metav1.Table{
ColumnDefinitions: []metav1.TableColumnDefinition{
{Name: "SourceNodeName", Type: "string", Format: "name", Description: "Source node name."},
{Name: "NodeLatencyStatsList", Type: "array", Format: "string", Description: "Node IP latency list."},
{Name: "Node Name", Type: "string", Format: "name", Description: "Name of Node from which latency was measured."},
{Name: "Num Latency Entries", Type: "integer", Format: "int64", Description: "Number of peers for which latency measurements are available."},
{Name: "Avg Latency", Type: "string", Format: "", Description: "Average latency value across all peers."},
{Name: "Max Latency", Type: "string", Format: "", Description: "Largest latency value across all peers."},
},
}
if m, err := meta.ListAccessor(obj); err == nil {
table.ResourceVersion = m.GetResourceVersion()
table.Continue = m.GetContinue()
table.RemainingItemCount = m.GetRemainingItemCount()
} else {
if m, err := meta.CommonAccessor(obj); err == nil {
table.ResourceVersion = m.GetResourceVersion()
}
}

return table, nil
var err error
table.Rows, err = metatable.MetaToTableRow(obj, func(obj runtime.Object, m metav1.Object, name, age string) ([]interface{}, error) {
summary := obj.(*statsv1alpha1.NodeLatencyStats)

// Calculate the max and average latency values.
peerNodeLatencyEntriesCount := len(summary.PeerNodeLatencyStats)
var targetIPLatencyCount int64
var maxLatency int64
var avgLatency int64

for i := range summary.PeerNodeLatencyStats {
targetIPLatency := summary.PeerNodeLatencyStats[i]

for j := range targetIPLatency.TargetIPLatencyStats {
targetIPLatencyCount++
currentLatency := targetIPLatency.TargetIPLatencyStats[j].LastMeasuredRTTNanoseconds
if currentLatency > maxLatency {
maxLatency = currentLatency
}

// Due to int64 max value is enough for the sum of all latencies,
// we don't need to check overflow in this case.
avgLatency += currentLatency
}
}

if targetIPLatencyCount > 0 {
avgLatency = avgLatency / targetIPLatencyCount
}

return []interface{}{name, peerNodeLatencyEntriesCount, time.Duration(avgLatency).String(), time.Duration(maxLatency).String()}, nil
})
return table, err
}

func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
return &statsv1alpha1.NodeLatencyStats{}, true, nil
// Ignore the deleteValidation and options for now.
obj, exists, err := r.indexer.GetByKey(name)
if err != nil {
return nil, false, errors.NewInternalError(err)
}
if !exists {
return nil, false, errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), name)
}

if err = r.indexer.Delete(obj); err != nil {
return nil, false, errors.NewInternalError(err)
}

return obj.(*statsv1alpha1.NodeLatencyStats), true, nil
}

func (r *REST) NamespaceScoped() bool {
Expand Down
206 changes: 206 additions & 0 deletions pkg/apiserver/registry/stats/nodelatencystats/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,209 @@
// limitations under the License.

package nodelatencystats

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1"
)

func TestREST(t *testing.T) {
r := NewREST()
assert.Equal(t, &statsv1alpha1.NodeLatencyStats{}, r.New())
assert.Equal(t, &statsv1alpha1.NodeLatencyStats{}, r.NewList())
assert.False(t, r.NamespaceScoped())
}

func TestRESTCreate(t *testing.T) {
summary := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
}
expectedObj := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
}

r := NewREST()
ctx := context.Background()

obj, err := r.Create(ctx, summary, nil, nil)
require.NoError(t, err)
assert.Equal(t, expectedObj, obj)
}

func TestRESTGet(t *testing.T) {
tests := []struct {
name string
summary *statsv1alpha1.NodeLatencyStats
nodeName string
expectedObj runtime.Object
err error
}{
{
name: "get summary",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node1",
expectedObj: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
err: nil,
},
{
name: "get summary not found",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node2",
expectedObj: nil,
err: errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), "node2"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, tt.summary, nil, nil)
require.NoError(t, err)

obj, err := r.Get(ctx, tt.nodeName, nil)
if tt.err != nil {
assert.EqualError(t, tt.err, err.Error())
} else {
require.NoError(t, err)
assert.Equal(t, tt.expectedObj, obj)
}
})
}
}

func TestRESTDelete(t *testing.T) {
tests := []struct {
name string
summary *statsv1alpha1.NodeLatencyStats
nodeName string
expectedObj runtime.Object
err error
}{
{
name: "delete summary",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node1",
expectedObj: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
err: nil,
},
{
name: "delete summary not found",
summary: &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
nodeName: "node2",
expectedObj: nil,
err: errors.NewNotFound(statsv1alpha1.Resource("nodelatencystats"), "node2"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, tt.summary, nil, nil)
require.NoError(t, err)
obj, deleted, err := r.Delete(ctx, tt.nodeName, nil, nil)
if tt.err != nil {
assert.EqualError(t, tt.err, err.Error())
} else {
require.NoError(t, err)
assert.True(t, deleted)
assert.Equal(t, tt.expectedObj, obj)
}
})
}
}

func TestRESTList(t *testing.T) {
summary := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
}
expectedObj := &statsv1alpha1.NodeLatencyStatsList{
Items: []statsv1alpha1.NodeLatencyStats{
{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: nil,
},
},
}

r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, summary, nil, nil)
require.NoError(t, err)
objs, err := r.List(ctx, nil)
require.NoError(t, err)
assert.Equal(t, expectedObj, objs)
}

func TestRESTConvertToTable(t *testing.T) {
mockTime := time.Date(2024, time.January, 1, 0, 0, 0, 0, time.UTC)
summary := &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
PeerNodeLatencyStats: []statsv1alpha1.PeerNodeLatencyStats{
{
NodeName: "node2",
TargetIPLatencyStats: []statsv1alpha1.TargetIPLatencyStats{
{
TargetIP: "192.168.0.2",
LastSendTime: metav1.Time{Time: mockTime},
LastRecvTime: metav1.Time{Time: mockTime},
LastMeasuredRTTNanoseconds: 1000000,
},
},
},
{
NodeName: "node3",
TargetIPLatencyStats: []statsv1alpha1.TargetIPLatencyStats{
{
TargetIP: "192.168.0.3",
LastSendTime: metav1.Time{Time: mockTime},
LastRecvTime: metav1.Time{Time: mockTime},
LastMeasuredRTTNanoseconds: 2000000,
},
},
},
},
}
expectedCells := []interface{}{"node1", 2, "1.5ms", "2ms"}

r := NewREST()
ctx := context.Background()

_, err := r.Create(ctx, summary, nil, nil)
require.NoError(t, err)
obj, err := r.ConvertToTable(ctx, summary, nil)
require.NoError(t, err)
assert.Equal(t, expectedCells, obj.Rows[0].Cells)
}