Skip to content

Commit

Permalink
feat: improve config and start process of yurthub component
Browse files Browse the repository at this point in the history
Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch committed Feb 7, 2025
1 parent 5543dde commit 98f1edc
Show file tree
Hide file tree
Showing 31 changed files with 494 additions and 817 deletions.
4 changes: 2 additions & 2 deletions charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ spec:
host: {{ .Values.yurthubBindingAddr }}
path: /v1/readyz
port: 10267
initialDelaySeconds: 120
periodSeconds: 20
initialDelaySeconds: 30
periodSeconds: 15
failureThreshold: 3
resources:
requests:
Expand Down
4 changes: 2 additions & 2 deletions charts/yurthub/templates/yurthub-yurtstaticset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ spec:
host: {{ .Values.yurthubBindingAddr }}
path: /v1/readyz
port: 10267
initialDelaySeconds: 120
periodSeconds: 20
initialDelaySeconds: 30
periodSeconds: 15
failureThreshold: 3
resources:
requests:
Expand Down
245 changes: 123 additions & 122 deletions cmd/yurthub/app/config/config.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/yurthub/app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestComplete(t *testing.T) {
options.NodeName = "foo"
options.EnableDummyIf = false
options.HubAgentDummyIfIP = "169.254.2.1"
cfg, err := Complete(options)
cfg, err := Complete(options, nil)
if err != nil {
t.Errorf("expect no err, but got %v", err)
} else if cfg == nil {
Expand Down
19 changes: 10 additions & 9 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ func (options *YurtHubOptions) Validate() error {
return fmt.Errorf("server-address is empty")
}

if options.WorkingMode != string(util.WorkingModeLocal) {
if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) {
return fmt.Errorf("working mode %s is not supported", options.WorkingMode)
}

switch options.WorkingMode {
case string(util.WorkingModeLocal):
if len(options.HostControlPlaneAddr) == 0 {
return fmt.Errorf("host-control-plane-address is empty")

Check warning on line 146 in cmd/yurthub/app/options/options.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/options/options.go#L144-L146

Added lines #L144 - L146 were not covered by tests
}
default:
if options.BootstrapMode != certificate.KubeletCertificateBootstrapMode {
if len(options.JoinToken) == 0 && len(options.BootstrapFile) == 0 {
return fmt.Errorf("bootstrap token and bootstrap file are empty, one of them must be set")
Expand All @@ -147,10 +156,6 @@ func (options *YurtHubOptions) Validate() error {
return fmt.Errorf("lb mode(%s) is not supported", options.LBMode)
}

if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) {
return fmt.Errorf("working mode %s is not supported", options.WorkingMode)
}

if err := options.verifyDummyIP(); err != nil {
return fmt.Errorf("dummy ip %s is not invalid, %w", options.HubAgentDummyIfIP, err)
}
Expand All @@ -162,10 +167,6 @@ func (options *YurtHubOptions) Validate() error {
if len(options.CACertHashes) == 0 && !options.UnsafeSkipCAVerification {
return fmt.Errorf("set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue")
}
} else {
if len(options.HostControlPlaneAddr) == 0 {
return fmt.Errorf("host-control-plane-address is empty")
}
}

return nil
Expand Down
140 changes: 48 additions & 92 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ package app
import (
"context"
"fmt"
"net/url"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/component-base/cli/globalflag"
"k8s.io/klog/v2"

Expand All @@ -35,12 +31,10 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/directclient"
"github.com/openyurtio/openyurt/pkg/yurthub/locallb"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand All @@ -67,7 +61,7 @@ func NewCmdStartYurtHub(ctx context.Context) *cobra.Command {
klog.Fatalf("validate options: %v", err)
}

yurtHubCfg, err := config.Complete(yurtHubOptions)
yurtHubCfg, err := config.Complete(yurtHubOptions, ctx.Done())

Check warning on line 64 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L64

Added line #L64 was not covered by tests
if err != nil {
klog.Fatalf("complete %s configuration error, %v", projectinfo.GetHubName(), err)
}
Expand All @@ -89,127 +83,89 @@ func NewCmdStartYurtHub(ctx context.Context) *cobra.Command {

// Run runs the YurtHubConfiguration. This should never exit
func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
if cfg.WorkingMode != util.WorkingModeLocal {
defer cfg.CertManager.Stop()
trace := 1
klog.Infof("%d. new transport manager", trace)
transportManager, err := transport.NewTransportManager(cfg.CertManager, ctx.Done())
if err != nil {
return fmt.Errorf("could not new transport manager, %w", err)
}
trace++
klog.Infof("%s works in %s mode", projectinfo.GetHubName(), string(cfg.WorkingMode))

Check warning on line 86 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L86

Added line #L86 was not covered by tests

klog.Infof("%d. prepare cloud kube clients", trace)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager)
switch cfg.WorkingMode {
case util.WorkingModeLocal:
klog.Infof("new locallb manager for node %s ", cfg.NodeName)
locallbMgr, err := locallb.NewLocalLBManager(cfg.TenantKasService, cfg.SharedFactory)

Check warning on line 91 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L88-L91

Added lines #L88 - L91 were not covered by tests
// when local mode yurthub exits, we need to clean configured iptables
defer locallbMgr.CleanIptables()

Check warning on line 93 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L93

Added line #L93 was not covered by tests
if err != nil {
return fmt.Errorf("could not create cloud clients, %w", err)
return fmt.Errorf("could not new locallb manager, %w", err)

Check warning on line 95 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L95

Added line #L95 was not covered by tests
}
trace++
// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())

Check warning on line 98 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L98

Added line #L98 was not covered by tests

case util.WorkingModeCloud, util.WorkingModeEdge:
defer cfg.CertManager.Stop()
trace := 1

Check warning on line 102 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L100-L102

Added lines #L100 - L102 were not covered by tests
// compare cloud working mode, edge working mode need following preparations:
// 1. health checker: periodically check the health status of cloud kube-apiserver
// 1. cache manager: used for caching response on local disk.
// 2. gc: used for garbaging collect unused cache on local disk.
var cloudHealthChecker healthchecker.MultipleBackendsHealthChecker
var storageWrapper cachemanager.StorageWrapper
var cacheManager cachemanager.CacheManager
var err error

Check warning on line 110 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L108-L110

Added lines #L108 - L110 were not covered by tests
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checkers for remote servers", trace)
cloudHealthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, cloudClients, ctx.Done())
cloudHealthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, ctx.Done())

Check warning on line 113 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L113

Added line #L113 was not covered by tests
if err != nil {
return fmt.Errorf("could not new cloud health checker, %w", err)
return fmt.Errorf("could not new health checker for cloud kube-apiserver, %w", err)

Check warning on line 115 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L115

Added line #L115 was not covered by tests
}
} else {
klog.Infof("%d. disable health checker for node %s because it is a cloud node", trace, cfg.NodeName)
// In cloud mode, cloud health checker is not needed.
// This fake checker will always report that the cloud is healthy is unhealthy.
cloudHealthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
trace++

klog.Infof("%d. new direct client manager", trace)
directClientManager, err := directclient.NewRestClientManager(cfg.RemoteServers, transportManager, cloudHealthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
trace++
trace++

Check warning on line 117 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L117

Added line #L117 was not covered by tests

var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.ConfigManager)
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++
storageManager, err := disk.NewDiskStorage(cfg.DiskCachePath)
if err != nil {
klog.Errorf("could not create storage manager, %v", err)
return err

Check warning on line 123 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L120-L123

Added lines #L120 - L123 were not covered by tests
}
storageWrapper = cachemanager.NewStorageWrapper(storageManager)
cacheManager = cachemanager.NewCacheManager(storageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.ConfigManager)
cfg.StorageWrapper = storageWrapper
trace++

Check warning on line 128 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L125-L128

Added lines #L125 - L128 were not covered by tests

if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, directClientManager, ctx.Done())
gcMgr, err := gc.NewGCManager(cfg, cloudHealthChecker, ctx.Done())

Check warning on line 131 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L131

Added line #L131 was not covered by tests
if err != nil {
return fmt.Errorf("could not new gc manager, %w", err)
}
gcMgr.Run()
} else {
klog.Infof("%d. disable gc manager for node %s because it is a cloud node", trace, cfg.NodeName)
trace++

Check warning on line 136 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L136

Added line #L136 was not covered by tests
}
trace++

klog.Infof("%d. new tenant sa manager", trace)
tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, ctx.Done())
trace++
if cfg.NetworkMgr != nil {
klog.Infof("%d. start network manager for ensuing dummy interface", trace)
cfg.NetworkMgr.Run(ctx.Done())
trace++

Check warning on line 142 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L139-L142

Added lines #L139 - L142 were not covered by tests
}

// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
cfg.NodePoolInformerFactory.Start(ctx.Done())
cfg.DynamicSharedFactory.Start(ctx.Done())

Check warning on line 147 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L147

Added line #L147 was not covered by tests

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
// Start to prepare proxy handler and start server serving.
klog.Infof("%d. new reverse proxy handler for forwarding requests", trace)

Check warning on line 150 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L150

Added line #L150 was not covered by tests
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
directClientManager,
transportManager,
cacheManager,

Check warning on line 153 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L153

Added line #L153 was not covered by tests
cloudHealthChecker,
tenantMgr,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
}
trace++

if cfg.NetworkMgr != nil {
cfg.NetworkMgr.Run(ctx.Done())
}

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, directClientManager, ctx.Done()); err != nil {
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, cloudHealthChecker, ctx.Done()); err != nil {

Check warning on line 162 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L162

Added line #L162 was not covered by tests
return fmt.Errorf("could not run hub servers, %w", err)
}
} else {
klog.Infof("new locallb manager for node %s ", cfg.NodeName)
locallbMgr, err := locallb.NewLocalLBManager(cfg, cfg.SharedFactory)
// when local mode yurthub exits, we need to clean configured iptables
defer locallbMgr.CleanIptables()
if err != nil {
return fmt.Errorf("could not new locallb manager, %w", err)
}
// Start the informer factory if all informers have been registered
cfg.SharedFactory.Start(ctx.Done())
default:

Check warning on line 165 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L165

Added line #L165 was not covered by tests

}
<-ctx.Done()
klog.Info("hub agent exited")
return nil
}

// createClients will create clients for all cloud APIServer
// It will return a map, mapping cloud APIServer URL to its client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
cloudClients := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Host: remoteServers[i].String(),
Transport: tp.CurrentTransport(),
Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
}
c, err := kubernetes.NewForConfig(restConf)
if err != nil {
return cloudClients, err
}
cloudClients[remoteServers[i].String()] = c
}
return cloudClients, nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
var listGvk schema.GroupVersionKind
convertGVK, ok := util.ConvertGVKFrom(ctx)
if ok && convertGVK != nil {
// partial object metadata request
listGvk = schema.GroupVersionKind{
Group: convertGVK.Group,
Version: convertGVK.Version,
Expand Down Expand Up @@ -204,6 +205,7 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
if err != nil {
return nil, err
}

objs, err := cm.storage.List(key)
if err == storage.ErrStorageNotFound && isListRequestWithNameFieldSelector(req) {
// When the request is a list request with FieldSelector "metadata.name", we should not return error
Expand Down
30 changes: 22 additions & 8 deletions pkg/yurthub/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/directclient"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand All @@ -43,15 +44,16 @@ var (
// GCManager is responsible for cleanup garbage of yurthub
type GCManager struct {
store cachemanager.StorageWrapper
manager *directclient.DirectClientManager
healthChecker healthchecker.MultipleBackendsHealthChecker
clientManager transport.Interface
nodeName string
eventsGCFrequency time.Duration
lastTime time.Time
stopCh <-chan struct{}
}

// NewGCManager creates a *GCManager object
func NewGCManager(cfg *config.YurtHubConfiguration, directClientManager *directclient.DirectClientManager, stopCh <-chan struct{}) (*GCManager, error) {
func NewGCManager(cfg *config.YurtHubConfiguration, healthChecker healthchecker.MultipleBackendsHealthChecker, stopCh <-chan struct{}) (*GCManager, error) {

Check warning on line 56 in pkg/yurthub/gc/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/gc/gc.go#L56

Added line #L56 was not covered by tests
gcFrequency := cfg.GCFrequency
if gcFrequency == 0 {
gcFrequency = defaultEventGcInterval
Expand All @@ -60,7 +62,8 @@ func NewGCManager(cfg *config.YurtHubConfiguration, directClientManager *directc
// TODO: use disk storage directly
store: cfg.StorageWrapper,
nodeName: cfg.NodeName,
manager: directClientManager,
healthChecker: healthChecker,
clientManager: cfg.TransportAndDirectClientManager,

Check warning on line 66 in pkg/yurthub/gc/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/gc/gc.go#L65-L66

Added lines #L65 - L66 were not covered by tests
eventsGCFrequency: time.Duration(gcFrequency) * time.Minute,
stopCh: stopCh,
}
Expand All @@ -75,11 +78,16 @@ func (m *GCManager) Run() {
go wait.JitterUntil(func() {
klog.V(2).Infof("start gc events after waiting %v from previous gc", time.Since(m.lastTime))
m.lastTime = time.Now()
kubeClient := m.manager.GetDirectClientset(true)
if kubeClient == nil {
u, err := m.healthChecker.PickHealthyServer()
if err != nil || u == nil {

Check warning on line 82 in pkg/yurthub/gc/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/gc/gc.go#L81-L82

Added lines #L81 - L82 were not covered by tests
klog.Warningf("all remote servers are unhealthy, skip gc events")
return
}
kubeClient := m.clientManager.GetDirectClientset(u)
if kubeClient == nil {
klog.Warningf("couldn't get direct clientset for server %s, skip gc events", u.String())
return

Check warning on line 89 in pkg/yurthub/gc/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/gc/gc.go#L86-L89

Added lines #L86 - L89 were not covered by tests
}

m.gcEvents(kubeClient, "kubelet")
m.gcEvents(kubeClient, "kube-proxy")
Expand All @@ -105,11 +113,17 @@ func (m *GCManager) gcPodsWhenRestart() {
return
}

kubeClient := m.manager.GetDirectClientset(true)
if kubeClient == nil {
// get a clientset of a healthy kube-apiserver
u, err := m.healthChecker.PickHealthyServer()
if err != nil || u == nil {

Check warning on line 118 in pkg/yurthub/gc/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/gc/gc.go#L117-L118

Added lines #L117 - L118 were not covered by tests
klog.Warningf("all remote servers are unhealthy, skip gc pods")
return
}
kubeClient := m.clientManager.GetDirectClientset(u)
if kubeClient == nil {
klog.Warningf("couldn't get direct clientset for server %s, skip gc pods", u.String())
return

Check warning on line 125 in pkg/yurthub/gc/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/gc/gc.go#L122-L125

Added lines #L122 - L125 were not covered by tests
}

listOpts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector("spec.nodeName", m.nodeName).String()}
podList, err := kubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), listOpts)
Expand Down
1 change: 0 additions & 1 deletion pkg/yurthub/healthchecker/fake_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (fc *fakeChecker) IsHealthy() bool {
}

func (fc *fakeChecker) RenewKubeletLeaseTime() {
return
}

func (fc *fakeChecker) PickHealthyServer() (*url.URL, error) {
Expand Down
Loading

0 comments on commit 98f1edc

Please sign in to comment.