diff --git a/cmd/integration_test.go b/cmd/integration_test.go index 1a64373fa1a..e9b858edffd 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -958,29 +958,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) { export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} `) - // FIXME: when VU initialization is properly synchronized, replace the - // following lines with this line only: - // - // ts := testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) - // - // See https://github.com/grafana/k6/issues/2790 for details. Right now we - // need the stdOut locking because VU initialization is not properly synchronized: - // when a test is aborted during the init phase, some logs might be emitted - // after the root command returns... - - ts := getSimpleCloudOutputTestState( - t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, int(exitcodes.ScriptAborted), - ) - newRootCommand(ts.globalState).execute() - - ts.outMutex.Lock() - stdOut := ts.stdOut.String() - ts.outMutex.Unlock() - t.Log(stdOut) - assert.Contains(t, stdOut, "test aborted: foo") - assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) - assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) - assert.NotContains(t, stdOut, "bogus summary") + testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) } func TestAbortedByScriptAbortInVUCode(t *testing.T) { @@ -1090,14 +1068,7 @@ func TestAbortedByScriptInitError(t *testing.T) { ) newRootCommand(ts.globalState).execute() - // FIXME: remove this locking after VU initialization accepts a context and - // is properly synchronized: currently when a test is aborted during the - // init phase, some logs might be emitted after the above command returns... - // see: https://github.com/grafana/k6/issues/2790 - ts.outMutex.Lock() stdOut := ts.stdOut.String() - ts.outMutex.Unlock() - t.Log(stdOut) assert.Contains(t, stdOut, `level=error msg="Error: oops in 2\n\tat file:///`) assert.Contains(t, stdOut, `hint="error while initializing VU #2 (script exception)"`) diff --git a/core/local/local.go b/core/local/local.go index 60312b14aee..61c9b1e59a1 100644 --- a/core/local/local.go +++ b/core/local/local.go @@ -173,15 +173,12 @@ func (e *ExecutionScheduler) initVUsConcurrently( ctx context.Context, samplesOut chan<- metrics.SampleContainer, count uint64, concurrency int, logger logrus.FieldLogger, ) chan error { - doneInits := make(chan error, count) // poor man's early-return waitgroup + doneInits := make(chan error, count) // poor man's waitgroup with results limiter := make(chan struct{}) for i := 0; i < concurrency; i++ { go func() { for range limiter { - // TODO: actually pass the context when we initialize VUs here, - // so we can cancel that initialization if there is an error, - // see https://github.com/grafana/k6/issues/2790 newVU, err := e.initVU(ctx, samplesOut, logger) if err == nil { e.state.AddInitializedVU(newVU) @@ -197,6 +194,10 @@ func (e *ExecutionScheduler) initVUsConcurrently( select { case limiter <- struct{}{}: case <-ctx.Done(): + for skipVu := vuNum; skipVu < count; skipVu++ { + // do not even start initializing the remaining VUs + doneInits <- ctx.Err() + } return } } @@ -290,23 +291,31 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics }), ) - // TODO: once VU initialization accepts a context, when a VU init fails, - // cancel the context and actually wait for all VUs to finish before this - // function returns - that way we won't have any trailing logs, see - // https://github.com/grafana/k6/issues/2790 + var initErr error for vuNum := uint64(0); vuNum < vusToInitialize; vuNum++ { + var err error select { - case err := <-doneInits: - if err != nil { - logger.WithError(err).Debug("VU initialization returned with an error, aborting...") - // the context's cancel() is called in a defer above and will - // abort any in-flight VU initializations - return err + case err = <-doneInits: + if err == nil { + atomic.AddUint64(initializedVUs, 1) } - atomic.AddUint64(initializedVUs, 1) case <-ctx.Done(): - return ctx.Err() + err = ctx.Err() + } + + if err == nil || initErr != nil { + // No error or a previous init error was already saved and we are + // just waiting for VUs to finish aborting + continue } + + logger.WithError(err).Debug("VU initialization returned with an error, aborting...") + initErr = err + cancel() + } + + if initErr != nil { + return initErr } e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) {