Skip to content

Commit

Permalink
Merge pull request #3164 from buildkite/jitter-within-loops
Browse files Browse the repository at this point in the history
Jitter within ping, status, log loops
  • Loading branch information
DrJosh9000 authored Jan 23, 2025
2 parents 837e6f3 + cbde7d0 commit 437dfd9
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 93 deletions.
155 changes: 84 additions & 71 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math/rand/v2"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -291,85 +292,102 @@ func (a *AgentWorker) runPingLoop(ctx context.Context, idleMonitor *IdleMonitor)
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()

first := make(chan struct{}, 1)
first <- struct{}{}

lastActionTime := time.Now()
a.logger.Info("Waiting for work...")

// Continue this loop until the closing of the stop channel signals termination
for {
setStat("😴 Waiting until next ping interval tick")
select {
case <-first:
// continue below
case <-pingTicker.C:
// continue below
case <-a.stop:
return nil
}

// Within the interval, wait a random amount of time to avoid
// spontaneous synchronisation across agents.
jitter := rand.N(pingInterval)
setStat(fmt.Sprintf("🫨 Jittering for %v", jitter))
select {
case <-time.After(jitter):
// continue below
case <-a.stop:
return nil
}

a.stopMutex.Lock()
stopping := a.stopping
a.stopMutex.Unlock()
if !stopping {
setStat("📡 Pinging Buildkite for work")
job, err := a.Ping(ctx)
if err != nil {
if errors.Is(err, &errUnrecoverable{}) {
a.logger.Error("%v", err)
} else {
a.logger.Warn("%v", err)
}
} else if job != nil {
// Let other agents know this agent is now busy and
// not to idle terminate
idleMonitor.MarkBusy(a.agent.UUID)

setStat("💼 Accepting job")
if stopping {
return nil
}

// Runs the job, only errors if something goes wrong
if runErr := a.AcceptAndRunJob(ctx, job); runErr != nil {
a.logger.Error("%v", runErr)
} else {
if a.agentConfiguration.DisconnectAfterJob {
a.logger.Info("Job finished. Disconnecting...")
return nil
}
lastActionTime = time.Now()

// Observation: jobs are rarely the last within a pipeline,
// thus if this worker just completed a job,
// there is likely another immediately available.
// Skip waiting for the ping interval until
// a ping without a job has occurred,
// but in exchange, ensure the next ping must wait a full
// pingInterval to avoid too much server load.

pingTicker.Reset(pingInterval)

continue
}
setStat("✅ Finished job")
setStat("📡 Pinging Buildkite for work")
job, err := a.Ping(ctx)
if err != nil {
if errors.Is(err, &errUnrecoverable{}) {
a.logger.Error("%v", err)
} else {
a.logger.Warn("%v", err)
}
} else if job != nil {
// Let other agents know this agent is now busy and
// not to idle terminate
idleMonitor.MarkBusy(a.agent.UUID)

// Handle disconnect after idle timeout (and deprecated disconnect-after-job-timeout)
if a.agentConfiguration.DisconnectAfterIdleTimeout > 0 {
idleDeadline := lastActionTime.Add(time.Second *
time.Duration(a.agentConfiguration.DisconnectAfterIdleTimeout))

if time.Now().After(idleDeadline) {
// Let other agents know this agent is now idle and termination
// is possible
idleMonitor.MarkIdle(a.agent.UUID)

// But only terminate if everyone else is also idle
if idleMonitor.Idle() {
a.logger.Info("All agents have been idle for %d seconds. Disconnecting...",
a.agentConfiguration.DisconnectAfterIdleTimeout)
return nil
} else {
a.logger.Debug("Agent has been idle for %.f seconds, but other agents haven't",
time.Since(lastActionTime).Seconds())
}
setStat("💼 Accepting job")

// Runs the job, only errors if something goes wrong
if runErr := a.AcceptAndRunJob(ctx, job); runErr != nil {
a.logger.Error("%v", runErr)
} else {
if a.agentConfiguration.DisconnectAfterJob {
a.logger.Info("Job finished. Disconnecting...")
return nil
}
lastActionTime = time.Now()

// Observation: jobs are rarely the last within a pipeline,
// thus if this worker just completed a job,
// there is likely another immediately available.
// Skip waiting for the ping interval until
// a ping without a job has occurred,
// but in exchange, ensure the next ping must wait a full
// pingInterval to avoid too much server load.

pingTicker.Reset(pingInterval)

continue
}
setStat("✅ Finished job")
}

setStat("😴 Sleeping for a bit")

select {
case <-pingTicker.C:
continue
case <-a.stop:
return nil
// Handle disconnect after idle timeout (and deprecated disconnect-after-job-timeout)
if a.agentConfiguration.DisconnectAfterIdleTimeout > 0 {
idleDeadline := lastActionTime.Add(time.Second *
time.Duration(a.agentConfiguration.DisconnectAfterIdleTimeout))

if time.Now().After(idleDeadline) {
// Let other agents know this agent is now idle and termination
// is possible
idleMonitor.MarkIdle(a.agent.UUID)

// But only terminate if everyone else is also idle
if idleMonitor.Idle() {
a.logger.Info("All agents have been idle for %d seconds. Disconnecting...",
a.agentConfiguration.DisconnectAfterIdleTimeout)
return nil
} else {
a.logger.Debug("Agent has been idle for %.f seconds, but other agents haven't",
time.Since(lastActionTime).Seconds())
}
}
}
}
}
Expand Down Expand Up @@ -552,13 +570,8 @@ func (a *AgentWorker) Ping(ctx context.Context) (*api.Job, error) {
func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error {
ctx, cancel := context.WithCancel(ctx)
go func() {
for {
time.Sleep(500 * time.Millisecond)
if a.stopping {
cancel()
return
}
}
<-a.stop
cancel()
}()

job, err := a.client.AcquireJob(ctx, jobId)
Expand Down
2 changes: 2 additions & 0 deletions agent/integration/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/buildkite/agent/v3/agent"
"github.com/buildkite/agent/v3/api"
Expand Down Expand Up @@ -69,6 +70,7 @@ func runJob(t *testing.T, ctx context.Context, cfg testRunJobConfig) error {
JWKS: cfg.verificationJWKS,
AgentConfiguration: cfg.agentCfg,
MetricsScope: scope,
JobStatusInterval: 1 * time.Second,
})

if err != nil {
Expand Down
42 changes: 31 additions & 11 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand/v2"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -697,7 +698,37 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro
return
}

intervalTicker := time.NewTicker(r.conf.JobStatusInterval)
defer intervalTicker.Stop()
first := make(chan struct{}, 1)
first <- struct{}{}

for {
setStat("😴 Waiting for next job status interval tick")
select {
case <-first:
// continue below
case <-intervalTicker.C:
// continue below
case <-ctx.Done():
return
case <-r.process.Done():
return
}

// Within the interval, wait a random amount of time to avoid
// spontaneous synchronisation across agents.
jitter := rand.N(r.conf.JobStatusInterval)
setStat(fmt.Sprintf("🫨 Jittering for %v", jitter))
select {
case <-time.After(jitter):
// continue below
case <-ctx.Done():
return
case <-r.process.Done():
return
}

setStat("📡 Fetching job state from Buildkite")

// Re-get the job and check its status to see if it's been cancelled
Expand All @@ -718,17 +749,6 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro
r.agentLogger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.conf.Job.ID, err)
}
}

setStat("😴 Sleeping for a bit")

// Sleep for a bit, or until the job is finished
select {
case <-time.After(r.conf.JobStatusInterval):
case <-ctx.Done():
return
case <-r.process.Done():
return
}
}
}

Expand Down
43 changes: 32 additions & 11 deletions agent/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"os"
"regexp"
"strconv"
Expand Down Expand Up @@ -416,7 +417,38 @@ func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync
return
}

const processInterval = 1 * time.Second // TODO: make configurable?
intervalTicker := time.NewTicker(processInterval)
defer intervalTicker.Stop()
first := make(chan struct{}, 1)
first <- struct{}{}

for {
setStat("😴 Waiting for next log processing interval tick")
select {
case <-first:
// continue below
case <-intervalTicker.C:
// continue below
case <-ctx.Done():
return
case <-r.process.Done():
return
}

// Within the interval, wait a random amount of time to avoid
// spontaneous synchronisation across agents.
jitter := rand.N(processInterval)
setStat(fmt.Sprintf("🫨 Jittering for %v", jitter))
select {
case <-time.After(jitter):
// continue below
case <-ctx.Done():
return
case <-r.process.Done():
return
}

setStat("📨 Sending process output to log streamer")

// Send the output of the process to the log streamer for processing
Expand All @@ -430,17 +462,6 @@ func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync
r.output.Close()
return
}

setStat("😴 Sleeping for a bit")

// Sleep for a bit, or until the job is finished
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
return
case <-r.process.Done():
return
}
}

// The final output after the process has finished is processed in Run().
Expand Down

0 comments on commit 437dfd9

Please sign in to comment.