Skip to content

Commit

Permalink
Refactor labelLocalhost variables for clarity and consistency
Browse files Browse the repository at this point in the history
Renamed `LabelNode` to `LabelLocalhost` across the codebase.
  • Loading branch information
cgalibern committed Jan 10, 2025
1 parent f0f0d3e commit 5646134
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 57 deletions.
8 changes: 4 additions & 4 deletions daemon/ccfg/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (t *Manager) pubClusterConfig() {
t.handleConfigChanges()

t.state = *state.DeepCopy()
labelLocalNode := pubsub.Label{"node", t.localhost}
labelLocalhost := pubsub.Label{"node", t.localhost}

removed, added := stringslice.Diff(previousNodes, state.Nodes)
if len(added) > 0 {
Expand All @@ -47,13 +47,13 @@ func (t *Manager) pubClusterConfig() {
cluster.ConfigData.Set(&state)
clusternode.Set(state.Nodes)

t.bus.Pub(&msgbus.ClusterConfigUpdated{Node: t.localhost, Value: state, NodesAdded: added, NodesRemoved: removed}, labelLocalNode)
t.bus.Pub(&msgbus.ClusterConfigUpdated{Node: t.localhost, Value: state, NodesAdded: added, NodesRemoved: removed}, labelLocalhost)

for _, v := range added {
t.bus.Pub(&msgbus.JoinSuccess{Node: v}, labelLocalNode, pubsub.Label{"added", v})
t.bus.Pub(&msgbus.JoinSuccess{Node: v}, labelLocalhost, pubsub.Label{"added", v})
}
for _, v := range removed {
t.bus.Pub(&msgbus.LeaveSuccess{Node: v}, labelLocalNode, pubsub.Label{"removed", v})
t.bus.Pub(&msgbus.LeaveSuccess{Node: v}, labelLocalhost, pubsub.Label{"removed", v})
}
}

Expand Down
4 changes: 2 additions & 2 deletions daemon/cstat/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func (o *T) onNodeStatusUpdated(c *msgbus.NodeStatusUpdated) {
if o.change {
o.change = false
localhost := hostname.Hostname()
labelLocalNode := pubsub.Label{"node", localhost}
o.bus.Pub(&msgbus.ClusterStatusUpdated{Node: localhost, Value: o.state}, labelLocalNode)
labelLocalhost := pubsub.Label{"node", localhost}
o.bus.Pub(&msgbus.ClusterStatusUpdated{Node: localhost, Value: o.state}, labelLocalhost)
}
}

Expand Down
6 changes: 3 additions & 3 deletions daemon/daemonapi/get_daemon_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,16 @@ func (a *DaemonAPI) getLocalDaemonEvents(ctx echo.Context, params api.GetDaemonE
// "hidden" because such messages don't require to be forwarded
// to response event stream.
createdMsg := &msgbus.ObjectCreated{}
createdMsg.AddLabels(a.LabelNode)
createdMsg.AddLabels(a.LabelLocalhost)
if !needForwardEvent("ObjectCreated", createdMsg) {
log.Debugf("add hidden filtering: ObjectCreated")
sub.AddFilter(&msgbus.ObjectCreated{})
}
deleteMsg := &msgbus.ObjectDeleted{}
deleteMsg.AddLabels(a.LabelNode)
deleteMsg.AddLabels(a.LabelLocalhost)
if !needForwardEvent("ObjectDeleted", deleteMsg) {
log.Debugf("add hidden filtering: ObjectDeleted,node=%s", a.localhost)
sub.AddFilter(&msgbus.ObjectDeleted{}, a.LabelNode)
sub.AddFilter(&msgbus.ObjectDeleted{}, a.LabelLocalhost)
}
}
sub.Start()
Expand Down
14 changes: 7 additions & 7 deletions daemon/daemonapi/lib_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type (
EventBus *pubsub.Bus
JWTcreator JWTCreater

LabelNode pubsub.Label
LabelLocalhost pubsub.Label

localhost string
SubQS pubsub.QueueSizer
Expand All @@ -46,12 +46,12 @@ var (
func New(ctx context.Context) *DaemonAPI {
localhost := hostname.Hostname()
return &DaemonAPI{
Daemondata: daemondata.FromContext(ctx),
EventBus: pubsub.BusFromContext(ctx),
JWTcreator: daemonauth.JWTCreatorFromContext(ctx),
LabelNode: pubsub.Label{"node", localhost},
localhost: localhost,
SubQS: SubQS(ctx),
Daemondata: daemondata.FromContext(ctx),
EventBus: pubsub.BusFromContext(ctx),
JWTcreator: daemonauth.JWTCreatorFromContext(ctx),
LabelLocalhost: pubsub.Label{"node", localhost},
localhost: localhost,
SubQS: SubQS(ctx),
}
}

Expand Down
4 changes: 2 additions & 2 deletions daemon/daemonapi/lib_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
)

func (a *DaemonAPI) announceSub(name string) {
a.EventBus.Pub(&msgbus.ClientSubscribed{Time: time.Now(), Name: name}, a.LabelNode, labelAPI)
a.EventBus.Pub(&msgbus.ClientSubscribed{Time: time.Now(), Name: name}, a.LabelLocalhost, labelAPI)
}

func (a *DaemonAPI) announceUnsub(name string) {
a.EventBus.Pub(&msgbus.ClientUnsubscribed{Time: time.Now(), Name: name}, a.LabelNode, labelAPI)
a.EventBus.Pub(&msgbus.ClientUnsubscribed{Time: time.Now(), Name: name}, a.LabelLocalhost, labelAPI)
}

func (a *DaemonAPI) announceNodeState(log *plog.Logger, state node.MonitorState) {
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_cluster_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func (a *DaemonAPI) PostClusterAction(eCtx echo.Context, globalExpect node.Monit
}
msg, err := msgbus.NewSetNodeMonitorWithErr(ctx, a.localhost, value)

a.EventBus.Pub(msg, a.LabelNode, labelAPI)
a.EventBus.Pub(msg, a.LabelLocalhost, labelAPI)
return JSONFromSetNodeMonitorError(eCtx, &value, err.Receive())
}
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_daemon_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ func (a *DaemonAPI) PostDaemonJoin(ctx echo.Context, params api.PostDaemonJoinPa
return JSONProblem(ctx, http.StatusBadRequest, "Invalid parameters", "Missing node param")
}
log.Infof("publish join request for node %s", node)
a.EventBus.Pub(&msgbus.JoinRequest{Node: node}, a.LabelNode, labelAPI)
a.EventBus.Pub(&msgbus.JoinRequest{Node: node}, a.LabelLocalhost, labelAPI)
return ctx.JSON(http.StatusOK, nil)
}
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_daemon_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func (a *DaemonAPI) PostDaemonLeave(ctx echo.Context, params api.PostDaemonLeave
return JSONProblem(ctx, http.StatusBadRequest, "Invalid parameters", "Missing node param")
}
log.Infof("publish leave request for node %s", node)
a.EventBus.Pub(&msgbus.LeaveRequest{Node: node}, a.LabelNode, labelAPI)
a.EventBus.Pub(&msgbus.LeaveRequest{Node: node}, a.LabelLocalhost, labelAPI)
return ctx.JSON(http.StatusOK, nil)
}
4 changes: 2 additions & 2 deletions daemon/daemonapi/post_daemon_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (a *DaemonAPI) localPostDaemonShutdown(eCtx echo.Context, params api.PostDa
a.announceNodeState(log, node.MonitorStateShutting)

sub := a.EventBus.Sub(fmt.Sprintf("api.post_daemon_shutdown %s", eCtx.Get("uuid")))
sub.AddFilter(&msgbus.InstanceMonitorUpdated{}, a.LabelNode)
sub.AddFilter(&msgbus.InstanceMonitorUpdated{}, a.LabelLocalhost)
sub.Start()
defer func() {
if err := sub.Stop(); err != nil {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (a *DaemonAPI) localPostDaemonShutdown(eCtx echo.Context, params api.PostDa
a.announceNodeState(log, node.MonitorStateShutdown)
log.Infof("ask daemon do stop")
a.EventBus.Pub(&msgbus.DaemonCtl{Component: "daemon", Action: "stop"},
pubsub.Label{"id", "daemon"}, a.LabelNode, labelAPI)
pubsub.Label{"id", "daemon"}, a.LabelLocalhost, labelAPI)
log.Infof("succeed")
return JSONProblem(eCtx, http.StatusOK, "all objects are now shutdown, daemon will stop", "")
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_daemon_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ func (a *DaemonAPI) localPostDaemonStop(ctx echo.Context) error {
a.announceNodeState(log, node.MonitorStateMaintenance)

a.EventBus.Pub(&msgbus.DaemonCtl{Component: "daemon", Action: "stop"},
pubsub.Label{"id", "daemon"}, a.LabelNode, labelAPI)
pubsub.Label{"id", "daemon"}, a.LabelLocalhost, labelAPI)
return ctx.JSON(http.StatusOK, api.DaemonPid{Pid: os.Getpid()})
}
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_instance_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (a *DaemonAPI) PostInstanceProgress(ctx echo.Context, namespace string, kin
}
a.EventBus.Pub(&msgbus.ProgressInstanceMonitor{Path: p, Node: a.localhost, SessionID: payload.SessionID, State: state, IsPartial: isPartial},
pubsub.Label{"path", p.String()},
a.LabelNode,
a.LabelLocalhost,
labelAPI,
)
return ctx.JSON(http.StatusOK, nil)
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_instance_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *DaemonAPI) PostInstanceStatus(ctx echo.Context, namespace string, kind
}
a.EventBus.Pub(&msgbus.InstanceStatusPost{Path: p, Node: a.localhost, Value: payload},
pubsub.Label{"path", p.String()},
a.LabelNode,
a.LabelLocalhost,
labelAPI,
)
return ctx.JSON(http.StatusOK, nil)
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemonapi/post_node_action_drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (a *DaemonAPI) localNodeActionDrain(eCtx echo.Context) error {
}

msg, errReceiver := msgbus.NewSetNodeMonitorWithErr(ctx, a.localhost, value)
a.EventBus.Pub(msg, a.LabelNode, labelAPI)
a.EventBus.Pub(msg, a.LabelLocalhost, labelAPI)

return JSONFromSetNodeMonitorError(eCtx, &value, errReceiver.Receive())
}
2 changes: 1 addition & 1 deletion daemon/daemondata/daemon_hb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (d *data) setDaemonHeartbeat() {
}
subHb.UpdatedAt = time.Now()
daemonsubsystem.DataHeartbeat.Set(d.localNode, subHb.DeepCopy())
d.bus.Pub(&msgbus.DaemonHeartbeatUpdated{Node: d.localNode, Value: *subHb.DeepCopy()}, d.labelLocalNode)
d.bus.Pub(&msgbus.DaemonHeartbeatUpdated{Node: d.localNode, Value: *subHb.DeepCopy()}, d.labelLocalhost)
}

func (d *data) setHbMsgPatchLength(node string, length int) {
Expand Down
56 changes: 28 additions & 28 deletions daemon/daemondata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type (
// set to false after a hb message is created
needMsg bool

labelLocalNode pubsub.Label
labelLocalhost pubsub.Label
}

gens map[string]uint64
Expand Down Expand Up @@ -287,7 +287,7 @@ func (d *data) run(ctx context.Context, cmdC <-chan Caller, hbRecvQ <-chan *hbty
gens[s] = v
}
d.bus.Pub(&msgbus.NodeStatusGenUpdates{Node: d.localNode, Value: gens},
d.labelLocalNode,
d.labelLocalhost,
)
}
if isCtxDone() {
Expand Down Expand Up @@ -386,40 +386,40 @@ func gensEqual(a, b gens) bool {
// or that must be forwarded to peers
func (d *data) startSubscriptions(qs pubsub.QueueSizer) {
sub := d.bus.Sub("daemon.data", qs)
sub.AddFilter(&msgbus.ClusterConfigUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.ClusterStatusUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.ClusterConfigUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.ClusterStatusUpdated{}, d.labelLocalhost)

sub.AddFilter(&msgbus.DaemonCollectorUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonDataUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonDnsUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonHeartbeatUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonListenerUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonRunnerImonUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonSchedulerUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonStatusUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.DaemonCollectorUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonDataUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonDnsUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonHeartbeatUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonListenerUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonRunnerImonUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonSchedulerUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.DaemonStatusUpdated{}, d.labelLocalhost)

sub.AddFilter(&msgbus.InstanceConfigDeleted{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceConfigFor{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceConfigUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceConfigDeleted{}, d.labelLocalhost)
sub.AddFilter(&msgbus.InstanceConfigFor{}, d.labelLocalhost)
sub.AddFilter(&msgbus.InstanceConfigUpdated{}, d.labelLocalhost)

sub.AddFilter(&msgbus.InstanceMonitorDeleted{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceMonitorUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceMonitorDeleted{}, d.labelLocalhost)
sub.AddFilter(&msgbus.InstanceMonitorUpdated{}, d.labelLocalhost)

sub.AddFilter(&msgbus.InstanceStatusUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceStatusDeleted{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceStatusUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.InstanceStatusDeleted{}, d.labelLocalhost)

sub.AddFilter(&msgbus.NodeConfigUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.NodeConfigUpdated{}, d.labelLocalhost)

sub.AddFilter(&msgbus.NodeMonitorDeleted{}, d.labelLocalNode)
sub.AddFilter(&msgbus.NodeMonitorUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.NodeOsPathsUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.NodeStatsUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.NodeStatusUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.NodeMonitorDeleted{}, d.labelLocalhost)
sub.AddFilter(&msgbus.NodeMonitorUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.NodeOsPathsUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.NodeStatsUpdated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.NodeStatusUpdated{}, d.labelLocalhost)

// need forward to peers
sub.AddFilter(&msgbus.ObjectCreated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.ObjectStatusDeleted{}, d.labelLocalNode)
sub.AddFilter(&msgbus.ObjectStatusUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.ObjectCreated{}, d.labelLocalhost)
sub.AddFilter(&msgbus.ObjectStatusDeleted{}, d.labelLocalhost)
sub.AddFilter(&msgbus.ObjectStatusUpdated{}, d.labelLocalhost)
sub.Start()
d.sub = sub
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemondata/data_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newData() *data {
previousRemoteInfo: make(map[string]remoteInfo),
hbMsgPatchLength: map[string]int{localNode: 0},
hbMsgType: map[string]string{localNode: initialMsgType},
labelLocalNode: pubsub.Label{"node", hostname.Hostname()},
labelLocalhost: pubsub.Label{"node", hostname.Hostname()},
}
}

Expand Down

0 comments on commit 5646134

Please sign in to comment.