Skip to content

Commit

Permalink
refactor static pod registration code
Browse files Browse the repository at this point in the history
  • Loading branch information
AnishShah authored and haircommander committed Oct 3, 2024
1 parent 5c45f28 commit bd429bf
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 154 deletions.
155 changes: 77 additions & 78 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister
var cleanupNodeEventHandler func()
nodeRegistrationCh := make(chan struct{})

// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
// If not nil, we are running as part of a cluster and should sync w/API
Expand All @@ -413,22 +411,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeInformer := kubeInformers.Core().V1().Nodes().Informer()
nodeHasSynced = func() bool {
return nodeInformer.HasSynced()
}
handle, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
nodeRegistrationCh <- struct{}{}
},
})
if err != nil {
return nil, err
}
cleanupNodeEventHandler = func() {
if err := nodeInformer.RemoveEventHandler(handle); err != nil {
klog.ErrorS(err, "failed to remove event handler from node informer")
}
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Attempting to sync node with API server")
Expand All @@ -437,7 +421,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
cleanupNodeEventHandler = func() {}
klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
}

Expand Down Expand Up @@ -603,8 +586,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeStatusMaxImages: nodeStatusMaxImages,
tracer: tracer,
nodeStartupLatencyTracker: kubeDeps.NodeStartupLatencyTracker,
nodeRegistrationCh: nodeRegistrationCh,
cleanupNodeEventHandler: cleanupNodeEventHandler,
}

if klet.cloud != nil {
Expand Down Expand Up @@ -1380,15 +1361,13 @@ type Kubelet struct {
// Track node startup latencies
nodeStartupLatencyTracker util.NodeStartupLatencyTracker

// sends an update when node is registered.
nodeRegistrationCh chan struct{}

// clean up EventHandlers added to node informers.
cleanupNodeEventHandler func()

// whether initial static pods registration was completed or not
// before node became ready.
staticPodsRegistered bool
initialStaticPodsRegistered bool

// Lock to ensure it is concurrency-safe to access
// initialStaticPodsRegistered field.
initialStaticPodsRegisteredLock sync.Mutex
}

// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
Expand Down Expand Up @@ -1714,6 +1693,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

// start syncing lease
go kl.nodeLeaseController.Run(context.Background())

// During node startup, it is possible that the static pod is running but mirror pod was not created because
// either the node was not registered fast enough or node informers were not synced. The mirror pods will be
// created eventually when the static pods are resynced 1-1.5 mins later. However, this also affects node
// readiness latency as we gate node readiness on initial static pods being registered. To improve the node
// readiness latency, we want to ensure mirror pods exists as soon as the node is registered.
go kl.fastStaticPodsRegistration()
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

Expand Down Expand Up @@ -1929,37 +1915,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}

// Create Mirror Pod for Static Pod if it doesn't already exist
if kubetypes.IsStaticPod(pod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
}
kl.ensureMirrorPodExists(pod, mirrorPod)

// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
Expand Down Expand Up @@ -2556,19 +2512,6 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
}
klog.V(4).InfoS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
}
case <-kl.nodeRegistrationCh:
// It is possible that static pods are running but their corresponding mirror
// pods are not created because node was not registered. We want to sync static
// pods again quickly on node registration so that their resource requests are
// visible to the scheduler and avoid delaying node readiness.
defer kl.cleanupNodeEventHandler()
if kl.kubeClient != nil {
staticPods, mirrorPods := kl.podManager.GetStaticPodsAndMirrorPods()
if len(staticPods) != len(mirrorPods) {
handler.HandlePodSyncs(staticPods)
}
}

}
return true
}
Expand Down Expand Up @@ -3135,20 +3078,76 @@ func (kl *Kubelet) warnCgroupV1Usage() {

// staticPodRegistration ensures that all static pods are registered to the apiserver
// before node became ready.
func (kl *Kubelet) staticPodsRegistration() error {
func (kl *Kubelet) initialStaticPodsRegistration() error {
kl.initialStaticPodsRegisteredLock.Lock()
defer kl.initialStaticPodsRegisteredLock.Unlock()
// kubelet running in standalone mode does not register static pods to the apiserver.
if kl.kubeClient == nil {
return nil
}
// Check if initial static pod registration has already been completed.
if kl.staticPodsRegistered {
if kl.initialStaticPodsRegistered {
return nil
}
allStaticPods, allMirrorPods := kl.podManager.GetStaticPodsAndMirrorPods()
if len(allStaticPods) == len(allMirrorPods) {
kl.staticPodsRegistered = true
return nil
staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for _, mirrorPod := range staticPodToMirrorPodMap {
if mirrorPod == nil {
return fmt.Errorf("not all static pods are registered")
}
}

return fmt.Errorf("all static pods are not registered")
kl.initialStaticPodsRegistered = true
return nil
}

// Ensure Mirror Pod for Static Pod exists.
func (kl *Kubelet) ensureMirrorPodExists(staticPod, mirrorPod *v1.Pod) {
if kubetypes.IsStaticPod(staticPod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(staticPod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(staticPod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(staticPod))
}
}
}
}
}

// Ensure Mirror Pod for Static Pod exists as soon as node is registered.
func (kl *Kubelet) fastStaticPodsRegistration() {
for {
_, err := kl.GetNode()
if err == nil {
break
}
klog.V(5).ErrorS(err, "unable to ensure mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName)))
time.Sleep(time.Second)
}

staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for staticPod, mirrorPod := range staticPodToMirrorPodMap {
kl.ensureMirrorPodExists(staticPod, mirrorPod)
}
}
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) er
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors,
kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent, kl.staticPodsRegistration,
kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent, kl.initialStaticPodsRegistration,
kl.supportLocalStorageCapacityIsolation()),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
Expand Down
31 changes: 5 additions & 26 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,27 +465,6 @@ func TestSyncPodsStartPod(t *testing.T) {
fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
}

func TestSyncStaticPodsOnNodeRegistration(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
pods := []*v1.Pod{
podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{}),
podWithUIDNameNsSpec("1234", "bar", "kube-system", v1.PodSpec{}),
}
pods[1].Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
kubelet.podManager.SetPods(pods)
kubelet.cleanupNodeEventHandler = func() {}
kubelet.nodeRegistrationCh = make(chan struct{}, 1)
kubelet.nodeRegistrationCh <- struct{}{}

ok := kubelet.syncLoopIteration(ctx, make(chan kubetypes.PodUpdate), kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1))
require.True(t, ok, "Expected syncLoopIteration to return ok")
fakeRuntime.AssertStartedPods([]string{string(pods[1].UID)})
}

func TestHandlePodCleanupsPerQOS(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
Expand Down Expand Up @@ -3302,7 +3281,7 @@ func TestSyncPodSpans(t *testing.T) {
}
}

func TestStaticPodsRegistration(t *testing.T) {
func TestInitialStaticPodsRegistration(t *testing.T) {
normalPod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{})
staticPod := podWithUIDNameNsSpec("123456789", "kube-system", "new", v1.PodSpec{})
staticPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
Expand Down Expand Up @@ -3349,13 +3328,13 @@ func TestStaticPodsRegistration(t *testing.T) {
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.podManager.SetPods(tc.pods)
kubelet.staticPodsRegistered = tc.alreadyRegistered
err := kubelet.staticPodsRegistration()
kubelet.initialStaticPodsRegistered = tc.alreadyRegistered
err := kubelet.initialStaticPodsRegistration()
if tc.wantErr && err == nil {
t.Fatal("staticPodsRegistration() did not return any error, want error")
t.Fatal("initialStaticPodsRegistration() did not return any error, want error")
}
if !tc.wantErr && err != nil {
t.Fatal("staticPodsRegistration() returned error, want nil")
t.Fatal("initialStaticPodsRegistration() returned error, want nil")
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/nodestatus/setters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func ReadyCondition(
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
nodeShutdownManagerErrorsFunc func() error, // typically kubelet.shutdownManager.errors.
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
staticPodsRegistrationFunc func() error, // typically Kubelet.staticPodsRegistration
initialStaticPodsRegistrationFunc func() error, // typically Kubelet.initialStaticPodsRegistration
localStorageCapacityIsolation bool,
) Setter {
return func(ctx context.Context, node *v1.Node) error {
Expand All @@ -547,7 +547,7 @@ func ReadyCondition(
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc(), staticPodsRegistrationFunc()}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc(), initialStaticPodsRegistrationFunc()}
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if localStorageCapacityIsolation {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
Expand Down
16 changes: 8 additions & 8 deletions pkg/kubelet/nodestatus/setters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,7 +1638,7 @@ func TestReadyCondition(t *testing.T) {
storageErrors error
cmStatus cm.Status
nodeShutdownManagerErrors error
staticPodsRegistrationErrors error
initialStaticPodsRegistrationErrors error
expectConditions []v1.NodeCondition
expectEvents []testEvent
disableLocalStorageCapacityIsolation bool
Expand Down Expand Up @@ -1690,10 +1690,10 @@ func TestReadyCondition(t *testing.T) {
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)},
},
{
desc: "new, not ready: static pods not registered",
node: withCapacity.DeepCopy(),
staticPodsRegistrationErrors: errors.New("all static pods are not registered"),
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "all static pods are not registered", now, now)},
desc: "new, not ready: initial static pods not registered",
node: withCapacity.DeepCopy(),
initialStaticPodsRegistrationErrors: errors.New("all static pods are not registered"),
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "all static pods are not registered", now, now)},
},
// the transition tests ensure timestamps are set correctly, no need to test the entire condition matrix in this section
{
Expand Down Expand Up @@ -1767,8 +1767,8 @@ func TestReadyCondition(t *testing.T) {
nodeShutdownErrorsFunc := func() error {
return tc.nodeShutdownManagerErrors
}
staticPodsRegistrationErrorsFunc := func() error {
return tc.staticPodsRegistrationErrors
initialStaticPodsRegistrationErrorsFunc := func() error {
return tc.initialStaticPodsRegistrationErrors
}
events := []testEvent{}
recordEventFunc := func(eventType, event string) {
Expand All @@ -1778,7 +1778,7 @@ func TestReadyCondition(t *testing.T) {
})
}
// construct setter
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, cmStatusFunc, nodeShutdownErrorsFunc, recordEventFunc, staticPodsRegistrationErrorsFunc, !tc.disableLocalStorageCapacityIsolation)
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, cmStatusFunc, nodeShutdownErrorsFunc, recordEventFunc, initialStaticPodsRegistrationErrorsFunc, !tc.disableLocalStorageCapacityIsolation)
// call setter on node
if err := setter(ctx, tc.node); err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
20 changes: 9 additions & 11 deletions pkg/kubelet/pod/pod_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ type Manager interface {
// the pod fullnames of any orphaned mirror pods.
GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)

// GetStaticPodsAndMirrorPods return all static pods and mirror pods. It is possible that
// there is no mirror pod for a static pod if kubelet is running in standalone mode
// or is in the process of creating the mirror pod.
GetStaticPodsAndMirrorPods() (allStaticPods []*v1.Pod, allMirrorPods []*v1.Pod)
// GetStaticPodToMirrorPodMap return a map of static pod to its corresponding
// mirror pods. It is possible that there is no mirror pod for a static pod
// if kubelet is running in standalone mode or is in the process of creating
// the mirror pod and in that case, the mirror pod is nil.
GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod

// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
Expand Down Expand Up @@ -231,19 +232,16 @@ func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods
return allPods, allMirrorPods, orphanedMirrorPodFullnames
}

func (pm *basicManager) GetStaticPodsAndMirrorPods() (allStaticPods []*v1.Pod, allMirrorPods []*v1.Pod) {
func (pm *basicManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod {
pm.lock.RLock()
defer pm.lock.RUnlock()
staticPodsMapToMirrorPods := make(map[*v1.Pod]*v1.Pod)
for _, pod := range podsMapToPods(pm.podByUID) {
if kubetypes.IsStaticPod(pod) {
allStaticPods = append(allStaticPods, pod)
mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
if ok {
allMirrorPods = append(allMirrorPods, mirrorPod)
}
staticPodsMapToMirrorPods[pod] = pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
}
}
return allStaticPods, allMirrorPods
return staticPodsMapToMirrorPods
}

func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
Expand Down
Loading

0 comments on commit bd429bf

Please sign in to comment.