Skip to content

Commit

Permalink
use hashmap to refactor (#558)
Browse files Browse the repository at this point in the history
* use hashmap to refactor

* use hashmap in helium

* refactor service discovery

* replace sync.map in engine factory

* add lock to ensure no dirty data

* remove the last timer
  • Loading branch information
CMGS authored Mar 9, 2022
1 parent 36d1c03 commit 67360b6
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 744 deletions.
89 changes: 45 additions & 44 deletions client/utils/servicepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"sync"
"time"

"github.com/projecteru2/core/log"

"github.com/cornelk/hashmap"
"github.com/go-ping/ping"
"github.com/projecteru2/core/log"
)

// EndpointPusher pushes endpoints to registered channels if the ep is L3 reachable
type EndpointPusher struct {
sync.Mutex
chans []chan []string
pendingEndpoints sync.Map
availableEndpoints sync.Map
pendingEndpoints hashmap.HashMap
availableEndpoints hashmap.HashMap
}

// NewEndpointPusher .
Expand All @@ -37,54 +38,54 @@ func (p *EndpointPusher) Push(endpoints []string) {
}

func (p *EndpointPusher) delOutdated(endpoints []string) {
newEps := make(map[string]struct{})
for _, e := range endpoints {
newEps[e] = struct{}{}
p.Lock()
defer p.Unlock()
newEndpoints := make(map[string]struct{}) // TODO after go 1.18, use slice package to search endpoints
for _, endpoint := range endpoints {
newEndpoints[endpoint] = struct{}{}
}

p.pendingEndpoints.Range(func(key, value interface{}) bool {
ep, ok := key.(string)
for kv := range p.pendingEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging pendingEndpoints")
return true
continue
}
cancel, ok := value.(context.CancelFunc)
cancel, ok := kv.Value.(context.CancelFunc)
if !ok {
log.Error("[EruResolver] failed to cast value while ranging pendingEndpoints")
}
if _, ok := newEps[ep]; !ok {
if _, ok := newEndpoints[endpoint]; !ok {
cancel()
p.pendingEndpoints.Delete(ep)
log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", ep) //nolint
p.pendingEndpoints.Del(endpoint)
log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", endpoint) //nolint
}
return true
})
}

p.availableEndpoints.Range(func(key, _ interface{}) bool {
ep, ok := key.(string)
for kv := range p.availableEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
return true
continue
}
if _, ok := newEps[ep]; !ok {
p.availableEndpoints.Delete(ep)
log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", ep) //nolint
if _, ok := newEndpoints[endpoint]; !ok {
p.availableEndpoints.Del(endpoint)
log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", endpoint) //nolint
}
return true
})
}
}

func (p *EndpointPusher) addCheck(endpoints []string) {
for _, endpoint := range endpoints {
if _, ok := p.pendingEndpoints.Load(endpoint); ok {
if _, ok := p.pendingEndpoints.GetStringKey(endpoint); ok {
continue
}
if _, ok := p.availableEndpoints.Load(endpoint); ok {
if _, ok := p.availableEndpoints.GetStringKey(endpoint); ok {
continue
}

ctx, cancel := context.WithCancel(context.TODO())
p.pendingEndpoints.Store(endpoint, cancel)
p.pendingEndpoints.Set(endpoint, cancel)
go p.pollReachability(ctx, endpoint)
log.Debugf(ctx, "[EruResolver] pending endpoint added: %s", endpoint)
}
Expand All @@ -97,24 +98,25 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
return
}

ticker := time.NewTicker(time.Second) // TODO config from outside?
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debugf(ctx, "[EruResolver] reachability goroutine ends: %s", endpoint)
return
default:
}

time.Sleep(time.Second)
if err := p.checkReachability(parts[0]); err != nil {
continue
case <-ticker.C:
p.Lock()
defer p.Unlock()
if err := p.checkReachability(parts[0]); err != nil {
continue
}
p.pendingEndpoints.Del(endpoint)
p.availableEndpoints.Set(endpoint, struct{}{})
p.pushEndpoints()
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}

p.pendingEndpoints.Delete(endpoint)
p.availableEndpoints.Store(endpoint, struct{}{})
p.pushEndpoints()
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}
}

Expand All @@ -140,15 +142,14 @@ func (p *EndpointPusher) checkReachability(host string) (err error) {

func (p *EndpointPusher) pushEndpoints() {
endpoints := []string{}
p.availableEndpoints.Range(func(key, value interface{}) bool {
endpoint, ok := key.(string)
for kv := range p.availableEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
return true
continue
}
endpoints = append(endpoints, endpoint)
return true
})
}
for _, ch := range p.chans {
ch <- endpoints
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func NewTestCluster() *Calcium {
MaxShare: -1,
ShareBase: 100,
},
GRPCConfig: types.GRPCConfig{
ServiceDiscoveryPushInterval: 15 * time.Second,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
HAKeepaliveInterval: 16 * time.Second,
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/cornelk/hashmap"
"github.com/projecteru2/core/cluster"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWorkloadMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(deployMap))
syncRollbackMap := sync.Map{}
syncRollbackMap := hashmap.HashMap{}

seq := 0
rollbackMap := make(map[string][]int)
Expand All @@ -172,7 +173,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
return func() {
defer wg.Done()
if indices, err := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, plans, seq); err != nil {
syncRollbackMap.Store(nodename, indices)
syncRollbackMap.Set(nodename, indices)
}
}
}(nodename, deploy, seq))
Expand All @@ -181,12 +182,11 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
}

wg.Wait()
syncRollbackMap.Range(func(key, value interface{}) bool {
nodename := key.(string)
indices := value.([]int)
for kv := range syncRollbackMap.Iter() {
nodename := kv.Key.(string)
indices := kv.Value.([]int)
rollbackMap[nodename] = indices
return true
})
}
log.Debugf(ctx, "[Calcium.doDeployWorkloads] rollbackMap: %+v", rollbackMap)
if len(rollbackMap) != 0 {
err = types.ErrRollbackMapIsNotEmpty
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceS
utils.SentryGo(func() {
<-ctx.Done()
c.watcher.Unsubscribe(id)
close(ch)
})
return ch, nil
}
Expand Down
62 changes: 34 additions & 28 deletions discovery/helium/helium.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,30 @@ import (
"sync"
"time"

"github.com/cornelk/hashmap"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"

"github.com/google/uuid"
)

const interval = 15 * time.Second

// Helium .
type Helium struct {
sync.Once
lock *sync.RWMutex
config types.GRPCConfig
stor store.Store
subs sync.Map
stor store.Store
subs hashmap.HashMap
interval time.Duration
}

// New .
func New(config types.GRPCConfig, stor store.Store) *Helium {
h := &Helium{}
h.config = config
h.stor = stor
h.lock = &sync.RWMutex{}
h := &Helium{interval: config.ServiceDiscoveryPushInterval, stor: stor, subs: hashmap.HashMap{}}
if h.interval < time.Second {
h.interval = interval
}
h.Do(func() {
h.start(context.TODO()) // TODO rewrite ctx here, because this will run only once!
})
Expand All @@ -35,18 +37,24 @@ func New(config types.GRPCConfig, stor store.Store) *Helium {

// Subscribe .
func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
h.lock.Lock()
defer h.lock.Unlock()
id := uuid.New()
_, _ = h.subs.LoadOrStore(id, ch)
key := id.ID()
h.subs.Set(key, ch)
return id
}

// Unsubscribe .
func (h *Helium) Unsubscribe(id uuid.UUID) {
h.lock.Lock()
defer h.lock.Unlock()
h.subs.Delete(id)
v, ok := h.subs.GetUintKey(uintptr(id.ID()))
if !ok {
return
}
ch, ok := v.(chan<- types.ServiceStatus)
if !ok {
return
}
close(ch)
h.subs.Del(id.ID())
}

func (h *Helium) start(ctx context.Context) {
Expand All @@ -60,7 +68,8 @@ func (h *Helium) start(ctx context.Context) {
log.Info("[WatchServiceStatus] service discovery start")
defer log.Error("[WatchServiceStatus] service discovery exited")
var latestStatus types.ServiceStatus
timer := time.NewTimer(h.config.ServiceDiscoveryPushInterval)
ticker := time.NewTicker(h.interval)
defer ticker.Stop()
for {
select {
case addresses, ok := <-ch:
Expand All @@ -71,32 +80,29 @@ func (h *Helium) start(ctx context.Context) {

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: h.config.ServiceDiscoveryPushInterval * 2,
Interval: h.interval * 2,
}
case <-timer.C:
case <-ticker.C:
}
h.dispatch(latestStatus)
timer.Stop()
timer.Reset(h.config.ServiceDiscoveryPushInterval)
}
}()
}

func (h *Helium) dispatch(status types.ServiceStatus) {
h.lock.RLock()
defer h.lock.RUnlock()
h.subs.Range(func(k, v interface{}) bool {
f := func(kv hashmap.KeyValue) {
defer func() {
if err := recover(); err != nil {
log.Errorf(context.TODO(), "[dispatch] dispatch %s failed, err: %v", k, err)
log.Errorf(context.TODO(), "[dispatch] dispatch %v failed, err: %v", kv.Key, err)
}
}()
c, ok := v.(chan<- types.ServiceStatus)
ch, ok := kv.Value.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
ch <- status
}
for kv := range h.subs.Iter() {
f(kv)
}
}
2 changes: 1 addition & 1 deletion discovery/helium/helium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestPanic(t *testing.T) {
uuid := service.Subscribe(chStatus)
time.Sleep(time.Second)
service.Unsubscribe(uuid)
close(chStatus)
//close(chStatus)
}()
}

Expand Down
Loading

0 comments on commit 67360b6

Please sign in to comment.