Skip to content

Commit

Permalink
remove useless engine check (#631)
Browse files Browse the repository at this point in the history
* remove engine cache in caller

* dion't check engine on offline node

* fix UT issue

* add debug log for engine check

* fix review issues
  • Loading branch information
yuyang0 authored Jan 5, 2024
1 parent 969435f commit ce65507
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 64 deletions.
7 changes: 6 additions & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
"github.com/projecteru2/core/store"
storefactory "github.com/projecteru2/core/store/factory"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"
Expand All @@ -37,7 +38,7 @@ type Calcium struct {
func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, error) {
logger := log.WithFunc("calcium.New")
// set store
store, err := store.NewStore(config, t)
store, err := storefactory.NewStore(config, t)
if err != nil {
logger.Error(ctx, err)
return nil, err
Expand Down Expand Up @@ -111,3 +112,7 @@ func (c *Calcium) Finalizer() {
func (c *Calcium) GetIdentifier() string {
return c.identifier
}

func (c *Calcium) GetStore() store.Store {
return c.store
}
5 changes: 4 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/resource/plugins"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -109,7 +110,8 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
// node with resource info
func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions) (<-chan *types.Node, error) {
logger := log.WithFunc("calcium.ListPodNodes").WithField("podname", opts.Podname).WithField("labels", opts.Labels).WithField("all", opts.All).WithField("info", opts.CallInfo)
nodes, err := c.store.GetNodesByPod(ctx, &types.NodeFilter{Podname: opts.Podname, Labels: opts.Labels, All: opts.All})
nf := &types.NodeFilter{Podname: opts.Podname, Labels: opts.Labels, All: opts.All}
nodes, err := c.store.GetNodesByPod(ctx, nf, store.WithoutEngineOption())
if err != nil {
logger.Error(ctx, err)
return nil, err
Expand Down Expand Up @@ -231,6 +233,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
},
// then: update node metadata
func(ctx context.Context) error {
defer enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
if err := c.store.UpdateNodes(ctx, n); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func TestAddNode(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
factory.InitEngineCache(ctx, c.config)
factory.InitEngineCache(ctx, c.config, nil)

opts := &types.AddNodeOptions{}
// failed by validating
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestListPodNodes(t *testing.T) {
opts := &types.ListNodesOptions{}
store := c.store.(*storemocks.Store)
// failed by GetNodesByPod
store.On("GetNodesByPod", mock.Anything, mock.Anything).Return(nil, types.ErrMockError).Once()
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrMockError).Once()
_, err := c.ListPodNodes(ctx, opts)
assert.Error(t, err)
store.AssertExpectations(t)
Expand All @@ -123,7 +123,7 @@ func TestListPodNodes(t *testing.T) {
{NodeMeta: types.NodeMeta{Name: name1}, Engine: engine, Available: true},
{NodeMeta: types.NodeMeta{Name: name2}, Engine: engine, Available: false},
}
store.On("GetNodesByPod", mock.Anything, mock.Anything).Return(nodes, nil)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
rmgr := c.rmgr.(*resourcemocks.Manager)
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, types.ErrMockError)
opts.CallInfo = true
Expand Down
7 changes: 4 additions & 3 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ func serve(c *cli.Context) error {
defer log.SentryDefer()
logger := log.WithFunc("main")

// init engine cache and start engine cache checker
factory.InitEngineCache(c.Context, config)

var t *testing.T
if embeddedStorage {
t = &testing.T{}
Expand All @@ -60,6 +57,10 @@ func serve(c *cli.Context) error {
return err
}
defer cluster.Finalizer()

// init engine cache and start engine cache checker
factory.InitEngineCache(c.Context, config, cluster.GetStore())

cluster.DisasterRecover(c.Context)

stop := make(chan struct{}, 1)
Expand Down
81 changes: 72 additions & 9 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"sync"
"time"
"unsafe"

"github.com/alphadose/haxmap"
"github.com/cockroachdb/errors"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -39,26 +39,35 @@ var (
// EngineCache .
type EngineCache struct {
cache *utils.EngineCache
keysToCheck *haxmap.Map[uintptr, engineParams]
keysToCheck *haxmap.Map[string, engineParams]
pool *ants.PoolWithFunc
config types.Config
stor store.Store
}

// NewEngineCache .
func NewEngineCache(config types.Config) *EngineCache {
func NewEngineCache(config types.Config, stor store.Store) *EngineCache {
pool, _ := utils.NewPool(config.MaxConcurrency)
return &EngineCache{
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
keysToCheck: haxmap.New[uintptr, engineParams](),
keysToCheck: haxmap.New[string, engineParams](),
pool: pool,
config: config,
stor: stor,
}
}

// InitEngineCache init engine cache and start engine cache checker
func InitEngineCache(ctx context.Context, config types.Config) {
engineCache = NewEngineCache(config)
func InitEngineCache(ctx context.Context, config types.Config, sto store.Store) {
engineCache = NewEngineCache(config, sto)
// init the cache, we don't care the return values
if sto != nil {
_, _ = engineCache.stor.GetNodesByPod(ctx, &types.NodeFilter{
All: true,
})
}
go engineCache.CheckAlive(ctx)
go engineCache.CheckNodeStatus(ctx)
}

// Get .
Expand All @@ -69,7 +78,7 @@ func (e *EngineCache) Get(key string) engine.API {
// Set .
func (e *EngineCache) Set(params engineParams, client engine.API) {
e.cache.Set(params.getCacheKey(), client)
e.keysToCheck.Set(uintptr(unsafe.Pointer(&params)), params)
e.keysToCheck.Set(params.getCacheKey(), params)
}

// Delete .
Expand All @@ -92,7 +101,7 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {

paramsChan := make(chan engineParams)
go func() {
e.keysToCheck.ForEach(func(k uintptr, v engineParams) bool {
e.keysToCheck.ForEach(func(k string, v engineParams) bool {
paramsChan <- v
return true
})
Expand All @@ -109,12 +118,15 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
client := e.cache.Get(cacheKey)
if client == nil {
e.cache.Delete(params.getCacheKey())
e.keysToCheck.Del(uintptr(unsafe.Pointer(&params)))
e.keysToCheck.Del(cacheKey)
return
}
if _, ok := client.(*fake.EngineWithErr); ok {
if newClient, err := newEngine(ctx, e.config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil {
logger.Errorf(ctx, err, "engine %+v is still unavailable", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
// check node status
e.checkOneNodeStatus(ctx, &params)
} else {
e.cache.Set(cacheKey, newClient)
}
Expand All @@ -124,13 +136,50 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced and removed", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
}
logger.Debugf(ctx, "engine %+v is available", cacheKey)
})
}
wg.Wait()
time.Sleep(e.config.ConnectionTimeout)
}
}

func (e *EngineCache) CheckNodeStatus(ctx context.Context) {
logger := log.WithFunc("engine.factory.CheckNodeStatus")
logger.Info(ctx, "check NodeStatus starts")
defer logger.Info(ctx, "check NodeStatus ends")
if e.stor == nil {
logger.Warnf(ctx, "nodeStore is nil")
return
}
for {
select {
case <-ctx.Done():
return
default:
}
ch := e.stor.NodeStatusStream(ctx)

for ns := range ch {
if ns.Alive {
// GetNode will call GetEngine, so GetNode updates the engine cache automatically
if _, err := e.stor.GetNode(ctx, ns.Nodename); err != nil {
logger.Warnf(ctx, "failed to get node %s: %s", ns.Nodename, err)
}
} else {
// a node may have multiple engines, so we need check all key here
e.keysToCheck.ForEach(func(k string, ep engineParams) bool {
if ep.nodename == ns.Nodename {
logger.Infof(ctx, "remove engine %+v from cache", ep.getCacheKey())
RemoveEngineFromCache(ctx, ep.endpoint, ep.ca, ep.cert, ep.key)
}
return true
})
}
}
}
}

// GetEngineFromCache .
func GetEngineFromCache(_ context.Context, endpoint, ca, cert, key string) engine.API {
return engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
Expand Down Expand Up @@ -225,3 +274,17 @@ func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
}
return client, nil
}

func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *engineParams) {
if e.stor == nil {
return
}
logger := log.WithFunc("engine.factory.checkOneNodeStatus")
nodename := params.nodename
cacheKey := params.getCacheKey()
ns, err := e.stor.GetNodeStatus(ctx, nodename)
if (err != nil && errors.Is(err, types.ErrInvaildCount)) || (!ns.Alive) {
logger.Warnf(ctx, "node %s is offline, the cache will be removed", nodename)
e.Delete(cacheKey)
}
}
3 changes: 2 additions & 1 deletion selfmon/selfmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
storefactory "github.com/projecteru2/core/store/factory"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -31,7 +32,7 @@ type NodeStatusWatcher struct {
func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster cluster.Cluster, t *testing.T) {
r := rand.New(rand.NewSource(int64(new(maphash.Hash).Sum64()))) //nolint
ID := r.Int63n(10000) //nolint
store, err := store.NewStore(config, t)
store, err := storefactory.NewStore(config, t)
if err != nil {
log.WithFunc("selfmon.RunNodeStatusWatcher").WithField("ID", ID).Error(ctx, err, "failed to create store")
return
Expand Down
2 changes: 1 addition & 1 deletion store/etcdv3/mercury_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewMercury(t *testing.T) *Mercury {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory.InitEngineCache(ctx, config)
factory.InitEngineCache(ctx, config, nil)

m, err := New(config, t)
assert.NoError(t, err)
Expand Down
47 changes: 37 additions & 10 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -65,19 +66,23 @@ func (m *Mercury) GetNodes(ctx context.Context, nodenames []string) ([]*types.No
if err != nil {
return nil, err
}
return m.doGetNodes(ctx, kvs, nil, true)
return m.doGetNodes(ctx, kvs, nil, true, nil)
}

// GetNodesByPod get all nodes bound to pod
// here we use podname instead of pod instance
func (m *Mercury) GetNodesByPod(ctx context.Context, nodeFilter *types.NodeFilter) ([]*types.Node, error) {
func (m *Mercury) GetNodesByPod(ctx context.Context, nodeFilter *types.NodeFilter, opts ...store.Option) ([]*types.Node, error) {
var op store.Op
for _, opt := range opts {
opt(&op)
}
do := func(podname string) ([]*types.Node, error) {
key := fmt.Sprintf(nodePodKey, podname, "")
resp, err := m.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}
return m.doGetNodes(ctx, resp.Kvs, nodeFilter.Labels, nodeFilter.All)
return m.doGetNodes(ctx, resp.Kvs, nodeFilter.Labels, nodeFilter.All, &op)
}
if nodeFilter.Podname != "" {
return do(nodeFilter.Podname)
Expand Down Expand Up @@ -116,7 +121,6 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
addIfNotEmpty(fmt.Sprintf(nodeCaKey, node.Name), node.Ca)
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
}

resp, err := m.BatchPut(ctx, data)
Expand Down Expand Up @@ -213,6 +217,24 @@ func (m *Mercury) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
return ch
}

func (m *Mercury) LoadNodeCert(ctx context.Context, node *types.Node) (err error) {
keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
data := []string{"", "", ""}
for i := 0; i < 3; i++ {
ev, err := m.GetOne(ctx, fmt.Sprintf(keyFormats[i], node.Name))
if err != nil {
if !errors.Is(err, types.ErrInvaildCount) {
log.WithFunc("store.etcdv3.LoadNodeCert").Warn(ctx, err, "Get key failed")
return err
}
continue
}
data[i] = string(ev.Value)
}
node.Ca, node.Cert, node.Key = data[0], data[1], data[2]
return nil
}

func (m *Mercury) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, node.Endpoint, "", "", ""); client != nil {
Expand Down Expand Up @@ -299,7 +321,10 @@ func (m *Mercury) doRemoveNode(ctx context.Context, podname, nodename, endpoint
return err
}

func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels map[string]string, all bool) (nodes []*types.Node, err error) {
func (m *Mercury) doGetNodes(
ctx context.Context, kvs []*mvccpb.KeyValue,
labels map[string]string, all bool, op *store.Op,
) (nodes []*types.Node, err error) {
allNodes := []*types.Node{}
for _, ev := range kvs {
node := &types.Node{}
Expand Down Expand Up @@ -333,11 +358,13 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
return
}

// update engine
if client, err := m.makeClient(ctx, node); err != nil {
logger.Errorf(ctx, err, "failed to make client for %+v", node.Name)
} else {
node.Engine = client
if op == nil || (!op.WithoutEngine) {
// update engine
if client, err := m.makeClient(ctx, node); err != nil {
logger.Errorf(ctx, err, "failed to make client for %+v", node.Name)
} else {
node.Engine = client
}
}
nodesCh <- node
})
Expand Down
Loading

0 comments on commit ce65507

Please sign in to comment.