Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start: allow users to call job start command to start stopped jobs #24150

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24150.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: Added job start command to allow starting a stopped job from the cli
```
10 changes: 10 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"job start": func() (cli.Command, error) {
return &JobStartCommand{
Meta: meta,
}, nil
},
"job tag": func() (cli.Command, error) {
return &JobTagCommand{
Meta: meta,
Expand Down Expand Up @@ -1101,6 +1106,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"start": func() (cli.Command, error) {
return &JobStartCommand{
Meta: meta,
}, nil
},
"system": func() (cli.Command, error) {
return &SystemCommand{
Meta: meta,
Expand Down
240 changes: 240 additions & 0 deletions command/job_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package command

import (
"fmt"
"os"
"strings"
"sync"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type JobStartCommand struct {
Meta
versionSelected uint64
}

func (c *JobStartCommand) Help() string {
helpText := `
Usage: nomad job start [options] <job>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we don't have any clue here to the user that they can use more than one job ID, or that if they do they all need to be in the same namespace.

Alias: nomad start

Start an existing stopped job. This command is used to start a previously stopped job's
most recent version. Upon successful start, an interactive
monitor session will start to display log lines as the job starts its
allocations based on its most recent version. It is safe to exit the monitor
early using ctrl+c.
Comment on lines +27 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: the output of these commands are what get printed literally to the terminal. There's no automatic reflowing in mitchellh/cli. So we should try to manually reflow these to 80 cols.


When ACLs are enabled, this command requires a token with the 'submit-job'
and 'read-job' capabilities for the job's namespace. The 'list-jobs'
capability is required to run the command with job prefixes instead of exact
job IDs.

General Options:

` + generalOptionsUsage(usageOptsDefault) + `

Start Options:

-detach
Return immediately instead of entering monitor mode. After the
job start command is submitted, a new evaluation ID is printed to the
screen, which can be used to examine the evaluation using the eval-status
command.

-consul-token
The Consul token used to verify that the caller has access to the Service
Identity policies associated in the targeted version of the job.

-vault-token
The Vault token used to verify that the caller has access to the Vault
policies in the targeted version of the job.

-verbose
Display full information.
`
return strings.TrimSpace(helpText)
}

func (c *JobStartCommand) Synopsis() string {
return "Start a stopped job"
}

func (c *JobStartCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-detach": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}

func (c *JobStartCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Jobs]
})
}
func (c *JobStartCommand) Name() string { return "job start" }

func (c *JobStartCommand) Run(args []string) int {
var detach, verbose bool
var consulToken, vaultToken string

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.StringVar(&consulToken, "consul-token", "", "")
flags.StringVar(&vaultToken, "vault-token", "", "")

if err := flags.Parse(args); err != nil {
return 1
}

// Check that we got at least one job
args = flags.Args()
if len(args) < 1 {
c.Ui.Error("This command takes at least one argument: <job>")
c.Ui.Error(commandErrorText(c))
return 1
}

var jobIDs []string
for _, jobID := range flags.Args() {
jobIDs = append(jobIDs, strings.TrimSpace(jobID))
}

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

statusCh := make(chan int, len(jobIDs))
var wg sync.WaitGroup

for _, jobIDPrefix := range jobIDs {

wg.Add(1)
go func() {
defer wg.Done()

// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}

job, err := c.JobByPrefix(client, jobIDPrefix, nil)
if err != nil {
c.Ui.Error(err.Error())
statusCh <- 1
return
}

if *job.Status != "dead" {
c.Ui.Info(fmt.Sprintf("Job %v has not been stopped and has the following status: %v", *job.Name, *job.Status))
statusCh <- 0
return

}

// Get all versions associated to current job
q := &api.QueryOptions{Namespace: *job.Namespace}

versions, _, _, err := client.Jobs().Versions(*job.ID, true, q)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error retrieving job versions: %s", err))
statusCh <- 1
}

// Find the most recent version for this job that has not been stopped
var chosenVersion uint64
versionAvailable := false
for i := range versions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do for _, version := range versions here and then not have to dereference by index.

if !*versions[i].Stop {
Copy link
Contributor Author

@martisah martisah Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of some bug fixing associated to selecting the correct version, I realized that we should actually be selecting the previously non-stopped version. Also I re-evaluated whether we should be specifically looking for a running version, as often the previously non stopped version tends to be pending (in testing). Is it the expected behavior to always revert to only running versions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as often the previously non stopped version tends to be pending (in testing). Is it the expected behavior to always revert to only running versions?

Oh that's a good call. Yeah we shouldn't revert to pending versions because we don't know that they're stable. That'll complicate testing a bit because you'll need to make sure the job can mark itself running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know of a method I can use to ensure the jobs are running in my tests? I've been trying to wait for the evaluation to succeed after running each command, but that still hasn't proved to work in terms of getting them to be marked running in tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job should be marked running as soon as at lease one allocation is placed. So as long as the allocations can start you should be able to poll api/Jobs.Info for the running status. But if the allocations exit then the job will be dead, so you need to make sure they stay running or they could exit before you can poll and you'll get weird test flakes.

The testJob uses the mock driver, so you may want to look at the configuration of the test job to make sure it'll still be up and running.

chosenVersion = *versions[i].Version
versionAvailable = true
break
}

}
c.versionSelected = chosenVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially have multiple jobs we're going to revert, so we'd be overwriting this field from multiple goroutines, which is a race condition. The production code doesn't ever read this value, only tests, so we probably want to get rid of it if we can.

I guess there's no way in the API to tell after the fact which version we reverted?

One way to solve this would be to refactor the body of the goroutine into a method that returns the chosen version, and then have the tests call that method directly. But for this it'd be nice to have the tests exercise the goroutines. Maybe this could be a buffered channel that the goroutine writes into and then the test can pull out the value? That'd be safe from data races.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh I see, I'll give that a shot!


if !versionAvailable {
c.Ui.Error(fmt.Sprintf("No previous available versions of job %v", *job.Name))
statusCh <- 1
return
}

if consulToken == "" {
consulToken = os.Getenv("CONSUL_HTTP_TOKEN")
}

if vaultToken == "" {
vaultToken = os.Getenv("VAULT_TOKEN")
}
Comment on lines +184 to +190
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: this is not going to change between jobs, so we can lift these two assignments above the loop.


// Revert to most recent running version!
m := &api.WriteOptions{Namespace: *job.Namespace}

resp, _, err := client.Jobs().Revert(*job.ID, chosenVersion, nil, m, consulToken, vaultToken)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error retrieving job version %v for job %s: %s,", chosenVersion, *job.ID, err))
statusCh <- 1
return
}

// Nothing to do: periodic or dispatch job
evalCreated := resp.EvalID != ""

if !evalCreated {
statusCh <- 0
return
}

if detach {
c.Ui.Output("Evaluation ID: " + resp.EvalID)
statusCh <- 0
return
}

mon := newMonitor(c.Ui, client, length)
statusCh <- mon.monitor(resp.EvalID)

}()
}
// users will still see
// errors if any while we
// wait for the goroutines
// to finish processing
wg.Wait()

// close the channel to ensure
// the range statement below
// doesn't go on indefinitely
close(statusCh)

// return a non-zero exit code
// if even a single job start fails
for status := range statusCh {
if status != 0 {
return status
}
}
return 0
}
Loading
Loading