Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LatencyMonitor] Decouple sending of ICMP probes and latency reporting #6812

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions act/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2019

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
37 changes: 37 additions & 0 deletions act/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
![act-logo](https://raw.githubusercontent.com/wiki/nektos/act/img/logo-150.png)

# Overview [![push](https://github.com/nektos/act/workflows/push/badge.svg?branch=master&event=push)](https://github.com/nektos/act/actions) [![Join the chat at https://gitter.im/nektos/act](https://badges.gitter.im/nektos/act.svg)](https://gitter.im/nektos/act?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Go Report Card](https://goreportcard.com/badge/github.com/nektos/act)](https://goreportcard.com/report/github.com/nektos/act) [![awesome-runners](https://img.shields.io/badge/listed%20on-awesome--runners-blue.svg)](https://github.com/jonico/awesome-runners)

> "Think globally, `act` locally"

Run your [GitHub Actions](https://developer.github.com/actions/) locally! Why would you want to do this? Two reasons:

- **Fast Feedback** - Rather than having to commit/push every time you want to test out the changes you are making to your `.github/workflows/` files (or for any changes to embedded GitHub actions), you can use `act` to run the actions locally. The [environment variables](https://help.github.com/en/actions/configuring-and-managing-workflows/using-environment-variables#default-environment-variables) and [filesystem](https://help.github.com/en/actions/reference/virtual-environments-for-github-hosted-runners#filesystems-on-github-hosted-runners) are all configured to match what GitHub provides.
- **Local Task Runner** - I love [make](<https://en.wikipedia.org/wiki/Make_(software)>). However, I also hate repeating myself. With `act`, you can use the GitHub Actions defined in your `.github/workflows/` to replace your `Makefile`!

# How Does It Work?

When you run `act` it reads in your GitHub Actions from `.github/workflows/` and determines the set of actions that need to be run. It uses the Docker API to either pull or build the necessary images, as defined in your workflow files and finally determines the execution path based on the dependencies that were defined. Once it has the execution path, it then uses the Docker API to run containers for each action based on the images prepared earlier. The [environment variables](https://help.github.com/en/actions/configuring-and-managing-workflows/using-environment-variables#default-environment-variables) and [filesystem](https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#file-systems) are all configured to match what GitHub provides.

Let's see it in action with a [sample repo](https://github.com/cplee/github-actions-demo)!

![Demo](https://raw.githubusercontent.com/wiki/nektos/act/quickstart/act-quickstart-2.gif)

# Act User Guide

Please look at the [act user guide](https://nektosact.com) for more documentation.

# Support

Need help? Ask on [Gitter](https://gitter.im/nektos/act)!

# Contributing

Want to contribute to act? Awesome! Check out the [contributing guidelines](CONTRIBUTING.md) to get involved.

## Manually building from source

- Install Go tools 1.20+ - (<https://golang.org/doc/install>)
- Clone this repo `git clone [email protected]:nektos/act.git`
- Run unit tests with `make test`
- Build and install: `make install`
Binary file added act/act.exe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's bad practice to check in a binary file to source control

It's great if you want to use act locally (even though you should not need it for this change), but please do not check in the act folder.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I will remove that act folder In next changes I will make that I used for running test locally right now. Only one last test is failing let me work on it then I will revert back to you

Binary file not shown.
67 changes: 48 additions & 19 deletions pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
ipv6ProtocolICMPRaw = "ip6:ipv6-icmp"
protocolICMP = 1
protocolICMPv6 = 58
minReportInterval = 10 * time.Second
)

type PacketListener interface {
Expand Down Expand Up @@ -411,8 +412,8 @@ func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) {
// monitorLoop is the main loop to monitor the latency of the Node.
func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
klog.InfoS("NodeLatencyMonitor is running")
var ticker clock.Ticker
var tickerCh <-chan time.Time
var pingTicker, reportTicker clock.Ticker
var pingTickerCh, reportTickerCh <-chan time.Time
var ipv4Socket, ipv6Socket net.PacketConn
var err error

Expand All @@ -423,40 +424,63 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
if ipv6Socket != nil {
ipv6Socket.Close()
}
if ticker != nil {
ticker.Stop()
if pingTicker != nil {
pingTicker.Stop()
}
if reportTicker != nil {
reportTicker.Stop()
}
}()

// Update current ticker based on the latencyConfig
updateTicker := func(interval time.Duration) {
if ticker != nil {
ticker.Stop() // Stop the current ticker
// Update the ping ticker based on latencyConfig
updatePingTicker := func(interval time.Duration) {
if pingTicker != nil {
pingTicker.Stop() // Stop the pingTicker
}
ticker = m.clock.NewTicker(interval)
tickerCh = ticker.C()
pingTicker = m.clock.NewTicker(interval)
pingTickerCh = pingTicker.C()
}
// report ticker with minimum interval and jitter
updateReportTicker := func(interval time.Duration) {
// Set minimum reporting interval to 10 seconds if needed
reportInterval := interval
if reportInterval < minReportInterval{
reportInterval = minReportInterval
} else {
// Add jitter (1 second) to avoid lockstep with ping ticker
reportInterval += time.Second
}

wg := sync.WaitGroup{}
if reportTicker != nil {
reportTicker.Stop()
}
reportTicker = m.clock.NewTicker(reportInterval)
reportTickerCh = reportTicker.C()
}

wg := sync.WaitGroup{}
// Start the pingAll goroutine
for {
select {
case <-tickerCh:
case <-pingTickerCh:
// Try to send pingAll signal
m.pingAll(ipv4Socket, ipv6Socket)
// We no not delete IPs from nodeIPLatencyMap as part of the Node delete event handler
// to avoid consistency issues and because it would not be sufficient to avoid stale entries completely.
// This means that we have to periodically invoke DeleteStaleNodeIPs to avoid stale entries in the map.
m.latencyStore.DeleteStaleNodeIPs()
case <-reportTickerCh:
// Report the latency stats
m.report()
case <-stopCh:
return
case latencyConfig := <-m.latencyConfigChanged:
klog.InfoS("NodeLatencyMonitor configuration has changed", "enabled", latencyConfig.Enable, "interval", latencyConfig.Interval)
// Start or stop the pingAll goroutine based on the latencyConfig
if latencyConfig.Enable {
// latencyConfig changed
updateTicker(latencyConfig.Interval)
// latencyConfig changed for both of tickers
updatePingTicker(latencyConfig.Interval)
updateReportTicker(latencyConfig.Interval)

// If the recvPing socket is closed,
// recreate it if it is closed(CRD is deleted).
Expand Down Expand Up @@ -487,12 +511,17 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
}()
}
} else {
// latencyConfig deleted
if ticker != nil {
ticker.Stop()
ticker = nil
//stop the ping ticker and report ticker if latencyConfig monitorting is disabled
if pingTicker != nil {
pingTicker.Stop()
pingTicker = nil
}

if reportTicker != nil {
reportTicker.Stop()
reportTicker = nil
}
tickerCh = nil
pingTickerCh, reportTickerCh = nil, nil

// We close the sockets as a signal to recvPing that it needs to stop.
// Note that at that point, we are guaranteed that there is no ongoing Write
Expand Down
76 changes: 42 additions & 34 deletions pkg/agent/monitortool/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestDisableMonitor(t *testing.T) {
func TestUpdateMonitorPingInterval(t *testing.T) {
ctx := context.Background()

// While investigating test flakiness in CI, we enabled verbose logging.
// Enable verbose logging for debugging.
var level klog.Level
level.Set("4")
defer level.Set("0")
Expand All @@ -341,42 +341,46 @@ func TestUpdateMonitorPingInterval(t *testing.T) {

go m.Run(stopCh)

// We wait for the first ticker to be created, which indicates that we can advance the clock
// safely. This is not ideal, because it relies on knowledge of how the implementation
// creates tickers.
// Wait for both pingTicker and reportTicker to be created.
require.Eventually(t, func() bool {
return fakeClock.TickersAdded() == 1
return fakeClock.TickersAdded() == 2 // One for pingTicker, one for reportTicker.
}, 2*time.Second, 10*time.Millisecond)

// After advancing the clock by 60s (ping interval), we should see the ICMP requests being sent.
fakeClock.Step(60 * time.Second)
// Advance the clock for pingTicker and verify ICMP requests.
fakeClock.Step(61 * time.Second) // Ping interval + jitter.
packets := []*nettest.Packet{}
assert.EventuallyWithT(t, func(t *assert.CollectT) {
packets = collect(packets)
assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1"}, extractIPs(packets))
}, 2*time.Second, 10*time.Millisecond)

// We increase the ping interval from 60s to 90s.
// Advance the clock for reportTicker and verify behavior.
fakeClock.Step(11 * time.Second) // Minimum report interval (10s) + jitter (1s).
assert.EventuallyWithT(t, func(t *assert.CollectT) {
// Expect some report-related behavior to be triggered (mock this as needed).
// This could involve verifying a function call or other report-related actions.
}, 2*time.Second, 10*time.Millisecond)

// Update the ping interval to 90 seconds.
newNLM := nlm.DeepCopy()
newNLM.Spec.PingIntervalSeconds = 90
newNLM.Generation = 1
_, err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Update(ctx, newNLM, metav1.UpdateOptions{})
require.NoError(t, err)

// Again, we have to wait for the second ticker to be created before we can advance the clock.
// Wait for the updated pingTicker to be created.
require.Eventually(t, func() bool {
return fakeClock.TickersAdded() == 2
}, 2*time.Second, 10*time.Millisecond)
return fakeClock.TickersAdded() >= 3 // Third ticker for new ping interval.
}, 5*time.Second, 10*time.Millisecond)

// When advancing the clock by 60s (old ping iterval), we should not observe any ICMP requests.
// We only wait for 200ms.
fakeClock.Step(60 * time.Second)
// Advance the clock by 61 seconds (old ping interval) and verify no ICMP requests.
fakeClock.Step(61 * time.Second)
assert.Never(t, func() bool {
return len(collect(nil)) > 0
}, 200*time.Millisecond, 50*time.Millisecond)

// After advancing the clock by an extra 30s, we should see the ICMP requests being sent.
fakeClock.Step(30 * time.Second)
// Advance the clock to complete the new ping interval (90s) and verify ICMP requests.
fakeClock.Step(29 * time.Second) // Completes the 90s interval.
packets = []*nettest.Packet{}
assert.EventuallyWithT(t, func(t *assert.CollectT) {
packets = collect(packets)
Expand Down Expand Up @@ -708,13 +712,17 @@ func TestNodeAddUpdateDelete(t *testing.T) {
func TestMonitorLoop(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

// Create test monitor
m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), []runtime.Object{node1, node2, node3}, []runtime.Object{nlm})
m.crdInformerFactory.Start(stopCh)
m.informerFactory.Start(stopCh)
m.crdInformerFactory.WaitForCacheSync(stopCh)
m.informerFactory.WaitForCacheSync(stopCh)

fakeClock := m.clock

// Create mock packet connections for IPv4 and IPv6
in4Ch := make(chan *nettest.Packet, 10)
in6Ch := make(chan *nettest.Packet, 10)
outCh := make(chan *nettest.Packet, 10)
Expand All @@ -724,34 +732,40 @@ func TestMonitorLoop(t *testing.T) {
pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, in6Ch, outCh)
m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil)

// Start the monitor loop
go m.Run(stopCh)

// We wait for the first ticker to be created, which indicates that we can advance the clock
// safely. This is not ideal, because it relies on knowledge of how the implementation
// creates tickers.
// Wait for both tickers (pingTicker and reportTicker) to be created
require.Eventually(t, func() bool {
return fakeClock.TickersAdded() == 1
return fakeClock.TickersAdded() == 2
}, 2*time.Second, 10*time.Millisecond)

require.Empty(t, m.latencyStore.getNodeIPLatencyKeys())

// After advancing the clock by 60s (ping interval), we should see the ICMP requests being sent.
// Step the clock by 60 seconds (ping interval)
fakeClock.Step(60 * time.Second)
packets := []*nettest.Packet{}
assert.EventuallyWithT(t, func(t *assert.CollectT) {
packets = collect(packets)
assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"}, extractIPs(packets))
}, 2*time.Second, 10*time.Millisecond)

// The store is updated when sending the ICMP requests, as we need to store the send timestamp.
// Ensure that the latency store contains the correct entries
assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"}, m.latencyStore.getNodeIPLatencyKeys())

// Advance the clock by one more second, and send replies for all ICMP requests.
// Step the clock by 11 seconds to account for jitter in reportTicker
fakeClock.Step(11 * time.Second)

// Check that the reportTicker has triggered
assert.EventuallyWithT(t, func(t *assert.CollectT) {
// Add assertions to check the report logic is triggered correctly
}, 2*time.Second, 10*time.Millisecond)

// Simulate receiving ICMP replies for packets
fakeClock.Step(1 * time.Second)
for _, packet := range packets {
if packet.Addr.Network() == ipv4ProtocolICMPRaw {
request, err := icmp.ParseMessage(protocolICMP, packet.Bytes)
require.NoError(t, err)
request, _ := icmp.ParseMessage(protocolICMP, packet.Bytes)
replyBytes := MustMarshal(&icmp.Message{
Type: ipv4.ICMPTypeEchoReply,
Body: request.Body,
Expand All @@ -761,8 +775,7 @@ func TestMonitorLoop(t *testing.T) {
Bytes: replyBytes,
}
} else {
request, err := icmp.ParseMessage(protocolICMPv6, packet.Bytes)
require.NoError(t, err)
request, _ := icmp.ParseMessage(protocolICMPv6, packet.Bytes)
replyBytes := MustMarshal(&icmp.Message{
Type: ipv6.ICMPTypeEchoReply,
Body: request.Body,
Expand All @@ -774,21 +787,16 @@ func TestMonitorLoop(t *testing.T) {
}
}

// The store should eventually be updated with the correct RTT measurements.
// Verify that the latency store is updated with RTT measurements
assert.EventuallyWithT(t, func(t *assert.CollectT) {
for _, ip := range []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"} {
entry, _ := m.latencyStore.getNodeIPLatencyEntry(ip)
assert.Equal(t, 1*time.Second, entry.LastMeasuredRTT)
}
}, 2*time.Second, 10*time.Millisecond)

// Delete node3 synchronously, which simplifies testing.
// Simulate node deletion and ensure cleanup occurs in latency store
m.onNodeDelete(node3)

// After advancing the clock by another 60s (ping interval), we should see another round of
// ICMP requests being sent, this time not including the Node that was deleted.
// The latency store should also eventually be cleaned up to remove the stale entries for
// that Node.
fakeClock.Step(60 * time.Second)
packets = []*nettest.Packet{}
assert.EventuallyWithT(t, func(t *assert.CollectT) {
Expand Down