Skip to content

Commit

Permalink
hc: Replay the current service state on Subscribe call
Browse files Browse the repository at this point in the history
Currently, the Subscribe only publish the event observed after the
subscription. In this way, subscribers may miss some events depending on
the timing of the subscription. To avoid this issue, replay the latest
state on subscription.

The potential concern here is we hold the Service's writer lock during
this operation. This is necessary to prevent the parallel service
update. However, it may cause long blocking with the large cluster.
However, Subscribe is typically called only once on startup, so there's
no much impact for the overall performance.

Signed-off-by: Yutaro Hayakawa <[email protected]>
  • Loading branch information
YutaroHayakawa committed Aug 22, 2024
1 parent ab090b2 commit b72618e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 10 deletions.
32 changes: 22 additions & 10 deletions pkg/service/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ func (s *Service) HealthCheckCallback(event int, data any) {
func (s *Service) Subscribe(ctx context.Context, updateCB HealthUpdateCallback) {
s.Lock()
defer s.Unlock()

// Replay the current state to let the subscriber to catch up
for _, svc := range s.svcByHash {
if svc == nil {
continue
}
updateCB(s.healthUpdateFromSvcInfo(svc))
}

s.healthCheckSubscribers = append(s.healthCheckSubscribers, HealthSubscriber{
Ctx: ctx,
Callback: updateCB,
Expand All @@ -70,6 +79,14 @@ func (s *Service) notifyHealthCheckUpdateSubscribers(svcAddr lb.L3n4Addr) {
// Service not found in case it was deleted.
return
}
for _, subscriber := range s.healthCheckSubscribers {
if subscriber.Ctx.Err() == nil {
subscriber.Callback(s.healthUpdateFromSvcInfo(info))
}
}
}

func (s *Service) healthUpdateFromSvcInfo(info *svcInfo) HealthUpdateSvcInfo {
activeBes := make([]lb.Backend, 0, len(info.backends))
for _, backend := range info.backends {
if backend == nil {
Expand All @@ -80,15 +97,10 @@ func (s *Service) notifyHealthCheckUpdateSubscribers(svcAddr lb.L3n4Addr) {
activeBes = append(activeBes, *backend)
}
}
for _, subscriber := range s.healthCheckSubscribers {
if subscriber.Ctx.Err() == nil {
svcInfo := HealthUpdateSvcInfo{
Name: info.svcName,
Addr: svcAddr,
SvcType: info.svcType,
ActiveBackends: activeBes,
}
subscriber.Callback(svcInfo)
}
return HealthUpdateSvcInfo{
Name: info.svcName,
Addr: info.frontend.L3n4Addr,
SvcType: info.svcType,
ActiveBackends: activeBes,
}
}
33 changes: 33 additions & 0 deletions pkg/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,39 @@ func TestHealthCheckCB(t *testing.T) {
require.Equal(t, m.lbmap.SvcActiveBackendsCount[uint16(id1)], 1)
}

func TestHealthCheckInitialSync(t *testing.T) {
m := setupManagerTestSuite(t)

backends := make([]*lb.Backend, len(backends1))
backends[0] = backends1[0].DeepCopy()
backends[1] = backends1[1].DeepCopy()
p1 := &lb.SVC{
Frontend: frontend1,
Backends: backends,
Type: lb.SVCTypeClusterIP,
Name: lb.ServiceName{Name: "svc1", Namespace: "ns1"},
}

_, _, err := m.svc.UpsertService(p1)
require.NoError(t, err)

// Test the Subscribe call replays the current state
receivedServices := make([]lb.ServiceName, 0)

// Upsert the service before subscription
m.svc.UpsertService(p1)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.svc.Subscribe(ctx, func(svcInfo HealthUpdateSvcInfo) {
receivedServices = append(receivedServices, svcInfo.Name)
})

require.Len(t, receivedServices, 1, "Unexpected number of events received")
require.Equal(t, receivedServices[0], p1.Name, "Received an unexpected service")
}

func TestNotifyHealthCheckUpdatesSubscriber(t *testing.T) {
m := setupManagerTestSuite(t)

Expand Down

0 comments on commit b72618e

Please sign in to comment.