Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC modifications to include the lock work #17697

Merged
merged 55 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
16cd69d
temp: remove when real work is added
jrasell Jun 22, 2023
fa0543d
func: add acquier/release to apply rpc function
Juanadelacuesta Jun 22, 2023
a6bbaaf
Delete lock.go
Juanadelacuesta Jun 23, 2023
8ad7bbd
func: add new rpc endpoint
Juanadelacuesta Jun 26, 2023
c8cbec7
func: add http wrapper for command agent
Juanadelacuesta Jun 26, 2023
a8c2aad
func: remove lock related item
Juanadelacuesta Jun 27, 2023
d5af7a0
fix: sync msg types
Juanadelacuesta Jun 27, 2023
f40bc7a
func: remove the lock key form the url
Juanadelacuesta Jun 28, 2023
a58e815
fix: fix the flow of the preapply function
Juanadelacuesta Jun 28, 2023
3441ae6
remove raft message for renew lock
Juanadelacuesta Jun 29, 2023
2b285eb
style: refactor and spell mistakes
Juanadelacuesta Jun 29, 2023
3a46135
func: separate validation for lock acquier
Juanadelacuesta Jun 30, 2023
582d67d
fix: update agent command for lock after rpc endpoint update from que…
Juanadelacuesta Jun 30, 2023
4696bd0
func: add test for validate lock
Juanadelacuesta Jul 3, 2023
11685ac
style: clean unsed code and update comments
Juanadelacuesta Jul 5, 2023
7a6a091
func: remove raft message type for renew lock, not needed
Juanadelacuesta Jul 5, 2023
94484c6
fix: update logic to distinguish upsert variable and lock operation o…
Juanadelacuesta Jul 5, 2023
7bf5014
fix: adjust the look up for query params to only be done on put and p…
Juanadelacuesta Jul 5, 2023
f068f17
fix: move the rpc metrics up on the renew lock to include auth errors
Juanadelacuesta Jul 5, 2023
90fb2a4
func: add acquire and release lock functions on fsm
Juanadelacuesta Jul 4, 2023
439b77e
func: fix the query for missing lock to avoid nil points in case of a…
Juanadelacuesta Jul 5, 2023
c829e2b
func: add happy path test for acquire and release flows on non existi…
Juanadelacuesta Jul 5, 2023
379d6ee
fix: solve rebase with rpc conflicts
Juanadelacuesta Jul 5, 2023
2a63039
func: add testing for release lock
Juanadelacuesta Jul 5, 2023
f60991c
fix: remove debug file
Juanadelacuesta Jul 5, 2023
e09d32f
Update nomad/fsm.go
Juanadelacuesta Jul 6, 2023
997f7dd
func: change the return when lock ID is wrong from error to conflict
Juanadelacuesta Jul 6, 2023
5e29ff9
func: add lock ID verification for variable upserts
Juanadelacuesta Jul 18, 2023
e4353fd
fix: check for empty lock instead of empty lock ID on setCas to avoid…
Juanadelacuesta Jul 18, 2023
1b36b06
fix: separate the table update for the lock release, the var update c…
Juanadelacuesta Jul 19, 2023
6253296
func: move lock ID check to the inner most tx function, not the CAS w…
Juanadelacuesta Jul 19, 2023
5c3b8a3
func: remove the lock ID check from the CAS delete
Juanadelacuesta Jul 19, 2023
0d6d59a
func: allow for users to overide the variable data when locking, the …
Juanadelacuesta Jul 24, 2023
1fb2643
fix: make acquire lock function somewhat idempotente
Juanadelacuesta Jul 27, 2023
7747fd5
func: add timer operations to variable locks
Juanadelacuesta Jul 21, 2023
070d489
style: remove redundant code
Juanadelacuesta Jul 21, 2023
22c1fee
Add acquire and release lock functions on the state store (#17794)
Juanadelacuesta Jul 24, 2023
103a8f4
func: merge the timer functions with the leader functions and add loc…
Juanadelacuesta Jul 24, 2023
d7e7149
fix: add a restriction for CAS and lock, only one at the time and avo…
Juanadelacuesta Jul 25, 2023
c471356
fix: add existing timer check for lock acquire
Juanadelacuesta Jul 26, 2023
e7277f1
func: add tests for the creation and renewal of the timers
Juanadelacuesta Jul 26, 2023
c3e10f3
fix: add path to renew lock http handler
Juanadelacuesta Jul 27, 2023
83d9d7d
fix: avoif panics by reading path and namespace from req and not resp…
Juanadelacuesta Jul 27, 2023
6440af0
func: add functional tests to lock
Juanadelacuesta Jul 27, 2023
c769d5b
fix: remove duplicated code after rebase
Juanadelacuesta Jul 27, 2023
276b9f5
fix: remove the lock info from the correct return value on listing an…
Juanadelacuesta Jul 28, 2023
8124801
fix: update test after making TimerNum public
Juanadelacuesta Jul 28, 2023
950749a
func: only return values after the renew was successful, and add tests
Juanadelacuesta Jul 28, 2023
ef7cffe
style: typo
Juanadelacuesta Jul 28, 2023
35b309a
fix: add empty lock when no lock is provided for a lock acquire to av…
Juanadelacuesta Jul 31, 2023
8496b6d
fix: use lockID function instead of calling the lock ID directly to a…
Juanadelacuesta Jul 31, 2023
6c0eab9
func: add tests for command agent lock actions
Juanadelacuesta Jul 31, 2023
dfd9507
Update nomad/variables_endpoint.go
Juanadelacuesta Jul 31, 2023
dfc61ad
Update nomad/variables_endpoint.go
Juanadelacuesta Jul 31, 2023
d46dd2a
fix: typo
Juanadelacuesta Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 75 additions & 29 deletions command/agent/variable_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package agent

import (
"errors"
"fmt"
"net/http"
"net/url"
Expand All @@ -13,10 +14,11 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
renewLockQueryParam = "renew"
acquireLockQueryParam = "acquire"
releaseLockQueryParam = "release"
var (
renewLockQueryParam = "lock-renew"

acquireLockQueryParam = string(structs.VarOpLockAcquire)
releaseLockQueryParam = string(structs.VarOpLockRelease)
)

func (s *HTTPServer) VariablesListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -52,67 +54,83 @@ func (s *HTTPServer) VariableSpecificRequest(resp http.ResponseWriter, req *http
case http.MethodGet:
return s.variableQuery(resp, req, path)
case http.MethodPut, http.MethodPost:
urlParams := req.URL.Query()
lockOperation, err := getLockOperation(urlParams)
if err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

queryParams := req.URL.Query()
_, renewLock := queryParams[renewLockQueryParam]
_, acquireLock := queryParams[acquireLockQueryParam]
_, releaseLock := queryParams[releaseLockQueryParam]
cq := req.URL.Query().Get("cas")

if cq != "" && lockOperation != "" {
return nil, CodedError(http.StatusBadRequest, "CAS can't be used with lock operations")
}

if renewLock || acquireLock || releaseLock {
if !isOneAndOnlyOneSet(renewLock, acquireLock, releaseLock) {
return nil, CodedError(http.StatusBadRequest, "multiple lock operations")
}
return s.variableLockOperation(resp, req, queryParams)
if lockOperation == "" {
return s.variableUpsert(resp, req, path)
}

return s.variableUpsert(resp, req, path)
if lockOperation == renewLockQueryParam {
return s.variableLockRenew(resp, req, path)
}

return s.variableLockOperation(resp, req, path, lockOperation)

case http.MethodDelete:
return s.variableDelete(resp, req, path)
default:
return nil, CodedError(http.StatusBadRequest, ErrInvalidMethod)
}
}

func (s *HTTPServer) variableLockOperation(resp http.ResponseWriter, req *http.Request,
operation url.Values) (interface{}, error) {
func (s *HTTPServer) variableLockRenew(resp http.ResponseWriter, req *http.Request, path string) (interface{}, error) {

// Parse the Variable
var Variable structs.VariableDecrypted
if err := decodeBody(req, &Variable); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

if operation[renewLockQueryParam][0] == renewLockQueryParam {
args := structs.VariablesRenewLockRequest{
Path: Variable.Path,
args := structs.VariablesRenewLockRequest{
Path: path,
LockID: Variable.LockID(),
}

LockID: Variable.Lock.ID,
}
s.parseWriteRequest(req, &args.WriteRequest)

s.parseWriteRequest(req, &args.WriteRequest)
var out structs.VariablesRenewLockResponse
if err := s.agent.RPC(structs.VariablesRenewLockRPCMethod, &args, &out); err != nil {
return nil, err
}

var out structs.VariablesRenewLockResponse
if err := s.agent.RPC(structs.VariablesRenewLockRPCMethod, &args, &out); err != nil {
return nil, err
}
return out.VarMeta, nil
}

func (s *HTTPServer) variableLockOperation(resp http.ResponseWriter, req *http.Request,
path, operation string) (interface{}, error) {

return out.VarMeta, nil
// Parse the Variable
var Variable structs.VariableDecrypted
if err := decodeBody(req, &Variable); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

// At this point, the operation can be either acquire or release, and they are
// both handled by the VariablesApplyRPCMethod.
args := structs.VariablesApplyRequest{
Op: structs.VarOpSet,
Op: structs.VarOp(operation),
Var: &Variable,
}

Variable.Path = path

s.parseWriteRequest(req, &args.WriteRequest)

var out structs.VariablesApplyResponse
err := s.agent.RPC(structs.VariablesApplyRPCMethod, &args, &out)
defer setIndex(resp, out.WriteMeta.Index)
if err != nil {
return nil, CodedError(http.StatusInternalServerError, err.Error())
return nil, err
}

if out.Conflict != nil {
Expand Down Expand Up @@ -153,6 +171,7 @@ func (s *HTTPServer) variableUpsert(resp http.ResponseWriter, req *http.Request,
if err := decodeBody(req, &Variable); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

if len(Variable.Items) == 0 {
return nil, CodedError(http.StatusBadRequest, "variable missing required Items object")
}
Expand Down Expand Up @@ -263,3 +282,30 @@ func parseCAS(req *http.Request) (bool, uint64, error) {
func isOneAndOnlyOneSet(a, b, c bool) bool {
return (a || b || c) && !a != !b != !c != !(a && b && c)
}

// getLockOperation returns the lock operation to be performed in case there is
// one. It returns error if more than one is set.
func getLockOperation(queryParams url.Values) (string, error) {
_, renewLock := queryParams[renewLockQueryParam]
_, acquireLock := queryParams[acquireLockQueryParam]
_, releaseLock := queryParams[releaseLockQueryParam]

if !renewLock && !acquireLock && !releaseLock {
return "", nil
}

if !isOneAndOnlyOneSet(renewLock, acquireLock, releaseLock) {
return "", errors.New("multiple lock operations")
}

switch {
case renewLock:
return renewLockQueryParam, nil
case acquireLock:
return acquireLockQueryParam, nil
case releaseLock:
return releaseLockQueryParam, nil
default:
return "", errors.New("unspecified lock operation")
}
}
162 changes: 160 additions & 2 deletions command/agent/variable_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ func TestHTTP_Variables(t *testing.T) {
require.Equal(t, &svU, out)
}
})
t.Run("update-cas", func(t *testing.T) {

t.Run("update_cas", func(t *testing.T) {
sv := mock.Variable()
require.NoError(t, rpcWriteSV(s, sv, sv))

Expand Down Expand Up @@ -375,7 +376,164 @@ func TestHTTP_Variables(t *testing.T) {
require.Equal(t, fmt.Sprint(svU.Items), fmt.Sprint(out.Items))
}
})
rpcResetSV(s)

t.Run("error_cas_and_acquire_lock", func(t *testing.T) {
svLA := sv1.Copy()
svLA.Items["new"] = "new"

// break the request body
badBuf := encodeBrokenReq(&svLA)

req, err := http.NewRequest("PUT", "/v1/var/"+sv1.Path+"?cas=1&"+acquireLockQueryParam, badBuf)
require.NoError(t, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For new tests we're trying to use shoenig/test

respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.VariableSpecificRequest(respW, req)
require.EqualError(t, err, "CAS can't be used with lock operations")

var cErr HTTPCodedError
require.ErrorAs(t, err, &cErr)
require.Equal(t, http.StatusBadRequest, cErr.Code())
require.Nil(t, obj)
})
t.Run("error_parse_acquire_lock", func(t *testing.T) {
svLA := sv1.Copy()
svLA.Items["new"] = "new"

// break the request body
badBuf := encodeBrokenReq(&svLA)

req, err := http.NewRequest("PUT", "/v1/var/"+sv1.Path+"?"+acquireLockQueryParam, badBuf)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.VariableSpecificRequest(respW, req)
require.EqualError(t, err, "unexpected EOF")
var cErr HTTPCodedError
require.ErrorAs(t, err, &cErr)
require.Equal(t, http.StatusBadRequest, cErr.Code())
require.Nil(t, obj)
})
t.Run("error_rpc_acquire_lock", func(t *testing.T) {
svLA := sv1.Copy()
svLA.Items["new"] = "new"

// test broken rpc error
buf := encodeReq(&svLA)
req, err := http.NewRequest("PUT", "/v1/var/"+sv1.Path+"?region=bad&"+acquireLockQueryParam, buf)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.VariableSpecificRequest(respW, req)
require.EqualError(t, err, "No path to region")
require.Nil(t, obj)
})

t.Run("acquire_lock", func(t *testing.T) {
svLA := sv1

svLA.Items["new"] = "new"
// Make the HTTP request
buf := encodeReq(&svLA)
req, err := http.NewRequest("PUT", "/v1/var/"+svLA.Path+"?"+acquireLockQueryParam, buf)
require.NoError(t, err)
respW := httptest.NewRecorder()

obj, err := s.Server.VariableSpecificRequest(respW, req)
require.NoError(t, err)

// Test the returned object and rehydrate to a VariableDecrypted
require.NotNil(t, obj)
out, ok := obj.(*structs.VariableDecrypted)
require.True(t, ok, "Unable to convert obj to VariableDecrypted")

// Check for the index
require.NotZero(t, respW.HeaderMap.Get("X-Nomad-Index"))
require.Equal(t, fmt.Sprint(out.ModifyIndex), respW.HeaderMap.Get("X-Nomad-Index"))

// Check for the lock
require.NotNil(t, out.VariableMetadata.Lock)
require.NotEmpty(t, out.LockID())

// Check that written varible does not equal the input to rule out input mutation
require.NotEqual(t, &svLA.Items, out.Items)

// Update the lock information for the following tests
sv1.VariableMetadata = out.VariableMetadata
})

t.Run("error_rpc_renew_lock", func(t *testing.T) {
svRL := sv1.Copy()

// test broken rpc error
buf := encodeReq(&svRL)
req, err := http.NewRequest("PUT", "/v1/var/"+sv1.Path+"?region=bad&"+renewLockQueryParam, buf)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.VariableSpecificRequest(respW, req)
require.EqualError(t, err, "No path to region")
require.Nil(t, obj)
})

t.Run("renew_lock", func(t *testing.T) {
svRL := sv1.Copy()

// Make the HTTP request
buf := encodeReq(&svRL)
req, err := http.NewRequest("PUT", "/v1/var/"+svRL.Path+"?"+renewLockQueryParam, buf)
require.NoError(t, err)
respW := httptest.NewRecorder()

obj, err := s.Server.VariableSpecificRequest(respW, req)
require.NoError(t, err)

// Test the returned object and rehydrate to a VariableDecrypted
require.NotNil(t, obj)
out, ok := obj.(*structs.VariableMetadata)
require.True(t, ok, "Unable to convert obj to VariableDecrypted")

// Check for the lock
require.NotNil(t, out.Lock)
require.Equal(t, sv1.LockID(), out.Lock.ID)
})

t.Run("release_lock", func(t *testing.T) {
svLR := sv1

svLR.Items["new"] = "new"
// Make the HTTP request
buf := encodeReq(&svLR)
req, err := http.NewRequest("PUT", "/v1/var/"+svLR.Path+"?"+releaseLockQueryParam, buf)
require.NoError(t, err)
respW := httptest.NewRecorder()

obj, err := s.Server.VariableSpecificRequest(respW, req)
require.NoError(t, err)

// Test the returned object and rehydrate to a VariableDecrypted
require.NotNil(t, obj)
out, ok := obj.(*structs.VariableDecrypted)
require.True(t, ok, "Unable to convert obj to VariableDecrypted")

// Check for the index
require.NotZero(t, respW.HeaderMap.Get("X-Nomad-Index"))
require.Equal(t, fmt.Sprint(out.ModifyIndex), respW.HeaderMap.Get("X-Nomad-Index"))

// Check for the lock
require.Nil(t, out.VariableMetadata.Lock)
require.Empty(t, out.LockID())

// Check that written variable is equal the input
require.Equal(t, sv1.Items, out.Items)

// Remove the lock information from the mock variable for the following tests
sv1.VariableMetadata = out.VariableMetadata
})

t.Run("error_rpc_delete", func(t *testing.T) {
sv1 := mock.Variable()
Expand Down
Loading