diff --git a/.changelog/24150.txt b/.changelog/24150.txt new file mode 100644 index 00000000000..cab219ca49a --- /dev/null +++ b/.changelog/24150.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Added job start command to allow starting a stopped job from the cli +``` diff --git a/command/commands.go b/command/commands.go index 89e0b35a11b..d516b158f27 100644 --- a/command/commands.go +++ b/command/commands.go @@ -521,6 +521,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job start": func() (cli.Command, error) { + return &JobStartCommand{ + Meta: meta, + }, nil + }, "job tag": func() (cli.Command, error) { return &JobTagCommand{ Meta: meta, @@ -1101,6 +1106,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "start": func() (cli.Command, error) { + return &JobStartCommand{ + Meta: meta, + }, nil + }, "system": func() (cli.Command, error) { return &SystemCommand{ Meta: meta, diff --git a/command/job_start.go b/command/job_start.go new file mode 100644 index 00000000000..cef5ce0b454 --- /dev/null +++ b/command/job_start.go @@ -0,0 +1,240 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "fmt" + "os" + "strings" + "sync" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type JobStartCommand struct { + Meta + versionSelected uint64 +} + +func (c *JobStartCommand) Help() string { + helpText := ` +Usage: nomad job start [options] +Alias: nomad start + + Start an existing stopped job. This command is used to start a previously stopped job's + most recent version. Upon successful start, an interactive + monitor session will start to display log lines as the job starts its + allocations based on its most recent version. It is safe to exit the monitor + early using ctrl+c. + + When ACLs are enabled, this command requires a token with the 'submit-job' + and 'read-job' capabilities for the job's namespace. The 'list-jobs' + capability is required to run the command with job prefixes instead of exact + job IDs. + +General Options: + + ` + generalOptionsUsage(usageOptsDefault) + ` + +Start Options: + + -detach + Return immediately instead of entering monitor mode. After the + job start command is submitted, a new evaluation ID is printed to the + screen, which can be used to examine the evaluation using the eval-status + command. + + -consul-token + The Consul token used to verify that the caller has access to the Service + Identity policies associated in the targeted version of the job. + + -vault-token + The Vault token used to verify that the caller has access to the Vault + policies in the targeted version of the job. + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +func (c *JobStartCommand) Synopsis() string { + return "Start a stopped job" +} + +func (c *JobStartCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-detach": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} + +func (c *JobStartCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Jobs] + }) +} +func (c *JobStartCommand) Name() string { return "job start" } + +func (c *JobStartCommand) Run(args []string) int { + var detach, verbose bool + var consulToken, vaultToken string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + flags.StringVar(&consulToken, "consul-token", "", "") + flags.StringVar(&vaultToken, "vault-token", "", "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got at least one job + args = flags.Args() + if len(args) < 1 { + c.Ui.Error("This command takes at least one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + var jobIDs []string + for _, jobID := range flags.Args() { + jobIDs = append(jobIDs, strings.TrimSpace(jobID)) + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + statusCh := make(chan int, len(jobIDs)) + var wg sync.WaitGroup + + for _, jobIDPrefix := range jobIDs { + + wg.Add(1) + go func() { + defer wg.Done() + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + job, err := c.JobByPrefix(client, jobIDPrefix, nil) + if err != nil { + c.Ui.Error(err.Error()) + statusCh <- 1 + return + } + + if *job.Status != "dead" { + c.Ui.Info(fmt.Sprintf("Job %v has not been stopped and has the following status: %v", *job.Name, *job.Status)) + statusCh <- 0 + return + + } + + // Get all versions associated to current job + q := &api.QueryOptions{Namespace: *job.Namespace} + + versions, _, _, err := client.Jobs().Versions(*job.ID, true, q) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving job versions: %s", err)) + statusCh <- 1 + } + + // Find the most recent version for this job that has not been stopped + var chosenVersion uint64 + versionAvailable := false + for i := range versions { + if !*versions[i].Stop { + chosenVersion = *versions[i].Version + versionAvailable = true + break + } + + } + c.versionSelected = chosenVersion + + if !versionAvailable { + c.Ui.Error(fmt.Sprintf("No previous available versions of job %v", *job.Name)) + statusCh <- 1 + return + } + + if consulToken == "" { + consulToken = os.Getenv("CONSUL_HTTP_TOKEN") + } + + if vaultToken == "" { + vaultToken = os.Getenv("VAULT_TOKEN") + } + + // Revert to most recent running version! + m := &api.WriteOptions{Namespace: *job.Namespace} + + resp, _, err := client.Jobs().Revert(*job.ID, chosenVersion, nil, m, consulToken, vaultToken) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving job version %v for job %s: %s,", chosenVersion, *job.ID, err)) + statusCh <- 1 + return + } + + // Nothing to do: periodic or dispatch job + evalCreated := resp.EvalID != "" + + if !evalCreated { + statusCh <- 0 + return + } + + if detach { + c.Ui.Output("Evaluation ID: " + resp.EvalID) + statusCh <- 0 + return + } + + mon := newMonitor(c.Ui, client, length) + statusCh <- mon.monitor(resp.EvalID) + + }() + } + // users will still see + // errors if any while we + // wait for the goroutines + // to finish processing + wg.Wait() + + // close the channel to ensure + // the range statement below + // doesn't go on indefinitely + close(statusCh) + + // return a non-zero exit code + // if even a single job start fails + for status := range statusCh { + if status != 0 { + return status + } + } + return 0 +} diff --git a/command/job_start_test.go b/command/job_start_test.go new file mode 100644 index 00000000000..9857cbd12c7 --- /dev/null +++ b/command/job_start_test.go @@ -0,0 +1,245 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "github.com/shoenig/test/must" +) + +var _ cli.Command = (*JobStartCommand)(nil) + +func TestJobStartCommand_Fails(t *testing.T) { + ci.Parallel(t) + + srv, _, addr := testServer(t, true, func(c *agent.Config) { + c.DevMode = true + }) + defer srv.Shutdown() + + ui := cli.NewMockUi() + cmd := &JobStartCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + code := cmd.Run([]string{"-bad", "-flag"}) + must.One(t, code) + + out := ui.ErrorWriter.String() + must.StrContains(t, out, "flag provided but not defined: -bad") + + ui.ErrorWriter.Reset() + + // Fails on nonexistent job ID + code = cmd.Run([]string{"-address=" + addr, "non-existent"}) + must.One(t, code) + + out = ui.ErrorWriter.String() + must.StrContains(t, out, "No job(s) with prefix or ID") + + ui.ErrorWriter.Reset() + + // Fails on connection failure + code = cmd.Run([]string{"-address=nope", "n"}) + must.One(t, code) + + out = ui.ErrorWriter.String() + must.StrContains(t, out, "Error querying job prefix") + + // Info on attempting to start a job that's not been stopped + jobID := uuid.Generate() + jobFilePath := filepath.Join(os.TempDir(), jobID+".nomad") + + t.Cleanup(func() { + _ = os.Remove(jobFilePath) + }) + job := testJob(jobID) + job.TaskGroups[0].Tasks[0].Resources.MemoryMB = pointer.Of(16) + job.TaskGroups[0].Tasks[0].Resources.DiskMB = pointer.Of(32) + job.TaskGroups[0].Tasks[0].Resources.CPU = pointer.Of(10) + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "30s", + } + + jobJSON, err := json.MarshalIndent(job, "", " ") + must.NoError(t, err) + + jobFile := jobFilePath + err = os.WriteFile(jobFile, []byte(jobJSON), 0o644) + must.NoError(t, err) + + runCmd := &JobRunCommand{Meta: Meta{Ui: ui}} + code = runCmd.Run([]string{"-address", addr, "-json", jobFile}) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", ui.OutputWriter.String()), + must.Sprintf("job stop stderr: %s", ui.ErrorWriter.String()), + ) + + code = cmd.Run([]string{"-address=" + addr, jobID}) + must.Zero(t, code) + out = ui.OutputWriter.String() + must.StrContains(t, out, "has not been stopped and has the following status:") + +} + +func TestStartCommand_ManyJobs(t *testing.T) { + ci.Parallel(t) + + srv, _, addr := testServer(t, true, func(c *agent.Config) { + c.DevMode = true + }) + defer srv.Shutdown() + // the number of jobs we want to run + numJobs := 10 + + // create and run a handful of jobs + jobIDs := make([]string, 0, numJobs) + for i := 0; i < numJobs; i++ { + jobID := uuid.Generate() + jobIDs = append(jobIDs, jobID) + } + + jobFilePath := func(jobID string) string { + return filepath.Join(os.TempDir(), jobID+".nomad") + } + + // cleanup job files we will create + t.Cleanup(func() { + for _, jobID := range jobIDs { + _ = os.Remove(jobFilePath(jobID)) + } + }) + + // record cli output + ui := cli.NewMockUi() + + for _, jobID := range jobIDs { + job := testJob(jobID) + + jobJSON, err := json.MarshalIndent(job, "", " ") + must.NoError(t, err) + + jobFile := jobFilePath(jobID) + err = os.WriteFile(jobFile, []byte(jobJSON), 0o644) + must.NoError(t, err) + + cmd := &JobRunCommand{Meta: Meta{Ui: ui}} + + code := cmd.Run([]string{"-address", addr, "-json", jobFile}) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", ui.OutputWriter.String()), + must.Sprintf("job stop stderr: %s", ui.ErrorWriter.String()), + ) + + } + + // helper for stopping a list of jobs + stop := func(args ...string) (stdout string, stderr string, code int) { + cmd := &JobStopCommand{Meta: Meta{Ui: ui}} + code = cmd.Run(args) + return ui.OutputWriter.String(), ui.ErrorWriter.String(), code + } + // helper for starting a list of jobs + start := func(args ...string) (stdout string, stderr string, code int) { + cmd := &JobStartCommand{Meta: Meta{Ui: ui}} + code = cmd.Run(args) + return ui.OutputWriter.String(), ui.ErrorWriter.String(), code + } + + // stop all jobs in one command + args := []string{"-address", addr, "-detach"} + args = append(args, jobIDs...) + stdout, stderr, code := stop(args...) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", stdout), + must.Sprintf("job stop stderr: %s", stderr), + ) + + // start all jobs again in one command + stdout, stderr, code = start(args...) + must.Zero(t, code, + must.Sprintf("job start stdout: %s", stdout), + must.Sprintf("job start stderr: %s", stderr), + ) + +} + +func TestStartCommand_MultipleCycles(t *testing.T) { + ci.Parallel(t) + + srv, client, addr := testServer(t, true, func(c *agent.Config) { + c.DevMode = true + }) + + defer srv.Shutdown() + + ui := cli.NewMockUi() + + job1 := testJob("job-start-test") + resp, _, err := client.Jobs().Register(job1, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + + args := []string{"-address", addr, "-detach"} + args = append(args, "job-start-test") + expectedVersions := []uint64{0, 2, 4, 6, 8, 10} + stopCmd := &JobStopCommand{Meta: Meta{Ui: ui}} + startCmd := &JobStartCommand{Meta: Meta{Ui: ui}} + + // check multiple cycles of starting/stopping a job result in the correct version selected + for i := range 6 { + + code := stopCmd.Run(args) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", ui.OutputWriter.String()), + must.Sprintf("job stop stderr: %s", ui.ErrorWriter.String()), + ) + + code = startCmd.Run(args) + must.Zero(t, code, + must.Sprintf("job start stdout: %s", ui.OutputWriter.String()), + must.Sprintf("job start stderr: %s", ui.ErrorWriter.String()), + ) + must.Eq(t, expectedVersions[i], startCmd.versionSelected) + } + +} + +func TestStartCommand_AutocompleteArgs(t *testing.T) { + ci.Parallel(t) + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := cli.NewMockUi() + cmd := &JobStartCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Create a fake job + state := srv.Agent.Server().State() + j := mock.Job() + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j)) + + prefix := j.ID[:len(j.ID)-5] + args := complete.Args{Last: prefix} + predictor := cmd.AutocompleteArgs() + + res := predictor.Predict(args) + must.Len(t, 1, res) + must.Eq(t, j.ID, res[0]) +}