Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lock to fix flaky test #1261

Merged
merged 18 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func (cw *commandUnit) Status() *StatusFileData {
// UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (cw *commandUnit) UnredactedStatus() *StatusFileData {
cw.GetStatusLock().RLock()
defer cw.GetStatusLock().RUnlock()
status := cw.GetStatusWithoutExtraData()
ed, ok := cw.GetStatusCopy().ExtraData.(*CommandExtraData)
if ok {
edCopy := *ed
status.ExtraData = &edCopy
}
cw.GetStatusLock().RUnlock()

return status
}
Expand Down
36 changes: 34 additions & 2 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@
podOptions.SinceTime = &metav1.Time{Time: sinceTime}
}

KubeAPIWrapperLock.Lock()
logReq := KubeAPIWrapperInstance.GetLogs(kw.clientset, podNamespace, podName, podOptions)
KubeAPIWrapperLock.Unlock()
// get logstream, with retry
for retries := 5; retries > 0; retries-- {
logStream, err = logReq.Stream(kw.GetContext())
Expand Down Expand Up @@ -333,7 +335,9 @@

// get pod, with retry
for retries := 5; retries > 0; retries-- {
KubeAPIWrapperLock.Lock()
kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{})
KubeAPIWrapperLock.Unlock()
if err == nil {
break
}
Expand Down Expand Up @@ -522,8 +526,10 @@
pod.Spec.Containers[0].Env = evs
}

KubeAPIWrapperLock.Lock()
// get pod and store to kw.pod
kw.pod, err = KubeAPIWrapperInstance.Create(kw.GetContext(), kw.clientset, ked.KubeNamespace, pod, metav1.CreateOptions{})
KubeAPIWrapperLock.Unlock()
if err != nil {
return err
}
Expand All @@ -541,8 +547,8 @@
status.ExtraData.(*KubeExtraData).PodName = kw.pod.Name
})

// Wait for the pod to be running
KubeAPIWrapperLock.Lock()
// Wait for the pod to be running
fieldSelector := KubeAPIWrapperInstance.OneTermEqualSelector("metadata.name", kw.pod.Name).String()
KubeAPIWrapperLock.Unlock()
lw := &cache.ListWatch{
Expand Down Expand Up @@ -684,7 +690,9 @@
default:
}

KubeAPIWrapperLock.Lock()

Check warning on line 693 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L693

Added line #L693 was not covered by tests
kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{})
KubeAPIWrapperLock.Unlock()

Check warning on line 695 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L695

Added line #L695 was not covered by tests
if err == nil {
break
}
Expand All @@ -709,7 +717,9 @@
// Attach stdin stream to the pod
var exec remotecommand.Executor
if !skipStdin {
KubeAPIWrapperLock.Lock()
req := KubeAPIWrapperInstance.SubResource(kw.clientset, podName, podNamespace)
KubeAPIWrapperLock.Unlock()

req.VersionedParams(
&corev1.PodExecOptions{
Expand All @@ -722,7 +732,9 @@
scheme.ParameterCodec,
)
var err error
KubeAPIWrapperLock.Lock()
exec, err = KubeAPIWrapperInstance.NewSPDYExecutor(kw.config, "POST", req.URL())
KubeAPIWrapperLock.Unlock()
if err != nil {
errMsg := fmt.Sprintf("Error creating SPDY executor: %s", err)
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)
Expand Down Expand Up @@ -816,10 +828,12 @@

var err error
for retries := 5; retries > 0; retries-- {
KubeAPIWrapperLock.Lock()

Check warning on line 831 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L831

Added line #L831 was not covered by tests
err = KubeAPIWrapperInstance.StreamWithContext(kw.GetContext(), exec, remotecommand.StreamOptions{
Stdin: stdin,
Tty: false,
})
KubeAPIWrapperLock.Unlock()

Check warning on line 836 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L836

Added line #L836 was not covered by tests
if err != nil {
// NOTE: io.EOF for stdin is handled by remotecommand and will not trigger this
kw.GetWorkceptor().nc.GetLogger().Warning(
Expand Down Expand Up @@ -1194,8 +1208,12 @@
var err error
ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData)
if ked.KubeConfig == "" {
KubeAPIWrapperLock.Lock()

Check warning on line 1211 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1211

Added line #L1211 was not covered by tests
clr := KubeAPIWrapperInstance.NewDefaultClientConfigLoadingRules()
KubeAPIWrapperLock.Unlock()
KubeAPIWrapperLock.Lock()

Check warning on line 1214 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1213-L1214

Added lines #L1213 - L1214 were not covered by tests
kw.config, err = KubeAPIWrapperInstance.BuildConfigFromFlags("", clr.GetDefaultFilename())
KubeAPIWrapperLock.Unlock()

Check warning on line 1216 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1216

Added line #L1216 was not covered by tests
if ked.KubeNamespace == "" {
c, err := clr.Load()
if err != nil {
Expand All @@ -1211,7 +1229,9 @@
}
}
} else {
KubeAPIWrapperLock.Lock()

Check warning on line 1232 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1232

Added line #L1232 was not covered by tests
cfg, err := KubeAPIWrapperInstance.NewClientConfigFromBytes([]byte(ked.KubeConfig))
KubeAPIWrapperLock.Unlock()

Check warning on line 1234 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1234

Added line #L1234 was not covered by tests
if err != nil {
return err
}
Expand All @@ -1238,7 +1258,9 @@

func (kw *KubeUnit) connectUsingIncluster() error {
var err error
KubeAPIWrapperLock.Lock()
kw.config, err = KubeAPIWrapperInstance.InClusterConfig()
KubeAPIWrapperLock.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -1330,16 +1352,22 @@
if ok {
switch envRateLimiter {
case "never":
KubeAPIWrapperLock.Lock()

Check warning on line 1355 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1355

Added line #L1355 was not covered by tests
kw.config.RateLimiter = KubeAPIWrapperInstance.NewFakeNeverRateLimiter()
KubeAPIWrapperLock.Unlock()

Check warning on line 1357 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1357

Added line #L1357 was not covered by tests
case "always":
KubeAPIWrapperLock.Lock()

Check warning on line 1359 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1359

Added line #L1359 was not covered by tests
kw.config.RateLimiter = KubeAPIWrapperInstance.NewFakeAlwaysRateLimiter()
KubeAPIWrapperLock.Unlock()

Check warning on line 1361 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1361

Added line #L1361 was not covered by tests
default:
}
kw.GetWorkceptor().nc.GetLogger().Debug("RateLimiter: %s", envRateLimiter)
}

kw.GetWorkceptor().nc.GetLogger().Debug("QPS: %f, Burst: %d", kw.config.QPS, kw.config.Burst)
KubeAPIWrapperLock.Lock()
kw.clientset, err = KubeAPIWrapperInstance.NewForConfig(kw.config)
KubeAPIWrapperLock.Unlock()
if err != nil {
return err
}
Expand Down Expand Up @@ -1463,13 +1491,13 @@
// Status returns a copy of the status currently loaded in memory.
func (kw *KubeUnit) UnredactedStatus() *StatusFileData {
kw.GetStatusLock().RLock()
defer kw.GetStatusLock().RUnlock()
status := kw.GetStatusWithoutExtraData()
ked, ok := kw.GetStatusCopy().ExtraData.(*KubeExtraData)
if ok {
kedCopy := *ked
status.ExtraData = &kedCopy
}
kw.GetStatusLock().RUnlock()

return status
}
Expand Down Expand Up @@ -1508,7 +1536,9 @@
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Warning("Pod %s could not be deleted: %s", ked.PodName, err.Error())
} else {
KubeAPIWrapperLock.Lock()

Check warning on line 1539 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1539

Added line #L1539 was not covered by tests
err := KubeAPIWrapperInstance.Delete(context.Background(), kw.clientset, ked.KubeNamespace, ked.PodName, metav1.DeleteOptions{})
KubeAPIWrapperLock.Unlock()

Check warning on line 1541 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1541

Added line #L1541 was not covered by tests
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Warning("Pod %s could not be deleted: %s", ked.PodName, err.Error())
}
Expand All @@ -1533,7 +1563,9 @@
kw.CancelContext()
kw.UpdateBasicStatus(WorkStateCanceled, "Canceled", -1)
if kw.pod != nil {
KubeAPIWrapperLock.Lock()

Check warning on line 1566 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1566

Added line #L1566 was not covered by tests
err := KubeAPIWrapperInstance.Delete(context.Background(), kw.clientset, kw.pod.Namespace, kw.pod.Name, metav1.DeleteOptions{})
KubeAPIWrapperLock.Unlock()

Check warning on line 1568 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1568

Added line #L1568 was not covered by tests
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error("Error deleting pod %s: %s", kw.pod.Name, err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/workceptor/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ func TestKubeLoggingWithReconnect(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
tt.expectedCalls()
ku.Start()
time.Sleep(10 * time.Millisecond)
kw.CreatePod(nil)
wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ func (rw *remoteUnit) Status() *StatusFileData {
// UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (rw *remoteUnit) UnredactedStatus() *StatusFileData {
rw.GetStatusLock().RLock()
defer rw.GetStatusLock().RUnlock()
status := rw.GetStatusWithoutExtraData()
ed, ok := rw.GetStatusCopy().ExtraData.(*RemoteExtraData)
if ok {
Expand All @@ -558,6 +557,7 @@ func (rw *remoteUnit) UnredactedStatus() *StatusFileData {
}
status.ExtraData = &edCopy
}
rw.GetStatusLock().RUnlock()

return status
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/workceptor/workunitbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,9 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() {
watcherErrors = make(chan error)

if bwu.watcher != nil {
bwu.statusLock.Lock()
err := bwu.watcher.Add(statusFile)
bwu.statusLock.Unlock()
if err == nil {
defer func() {
werr := bwu.watcher.Close()
Expand Down