Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tolerate failed checkins / ready #392

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/kg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var cmd = &cobra.Command{

var (
backend string
checkIn bool
cleanUp bool
cleanUpIface bool
createIface bool
Expand Down Expand Up @@ -134,6 +135,7 @@ var (

func init() {
cmd.Flags().StringVar(&backend, "backend", k8s.Backend, fmt.Sprintf("The backend for the mesh. Possible values: %s", availableBackends))
cmd.Flags().BoolVar(&checkIn, "check-in", true, "Should Kilo regularly check in with the backend")
cmd.Flags().BoolVar(&cleanUp, "clean-up", true, "Should kilo clean up network modifications on shutdown?")
cmd.Flags().BoolVar(&cleanUpIface, "clean-up-interface", false, "Should Kilo delete its interface when it shuts down?")
cmd.Flags().BoolVar(&createIface, "create-interface", true, "Should kilo create an interface on startup?")
Expand Down Expand Up @@ -248,7 +250,7 @@ func runRoot(_ *cobra.Command, _ []string) error {
c := kubernetes.NewForConfigOrDie(config)
kc := kiloclient.NewForConfigOrDie(config)
ec := apiextensions.NewForConfigOrDie(config)
b = k8s.New(c, kc, ec, topologyLabel, log.With(logger, "component", "k8s backend"))
b = k8s.New(c, kc, ec, topologyLabel, checkIn, log.With(logger, "component", "k8s backend"))
default:
return fmt.Errorf("backend %v unknown; possible values are: %s", backend, availableBackends)
}
Expand All @@ -266,7 +268,7 @@ func runRoot(_ *cobra.Command, _ []string) error {
serviceCIDRs = append(serviceCIDRs, s)
}

m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry)
m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, checkIn, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry)
if err != nil {
return fmt.Errorf("failed to create Kilo mesh: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/kgctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var (
port int
}
backend string
checkIn bool
granularity string
kubeconfig string
topologyLabel string
Expand Down Expand Up @@ -94,7 +95,7 @@ func runRoot(c *cobra.Command, _ []string) error {
c := kubernetes.NewForConfigOrDie(config)
opts.kc = kiloclient.NewForConfigOrDie(config)
ec := apiextensions.NewForConfigOrDie(config)
opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger())
opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, checkIn, log.NewNopLogger())
default:
return fmt.Errorf("backend %s unknown; posible values are: %s", backend, availableBackends)
}
Expand All @@ -119,6 +120,7 @@ func main() {
SilenceErrors: true,
}
cmd.PersistentFlags().StringVar(&backend, "backend", k8s.Backend, fmt.Sprintf("The backend for the mesh. Possible values: %s", availableBackends))
cmd.PersistentFlags().BoolVar(&checkIn, "check-in", true, "Should Kilo prune nodes that have not checked in with the backend")
cmd.PersistentFlags().StringVar(&granularity, "mesh-granularity", string(mesh.AutoGranularity), fmt.Sprintf("The granularity of the network mesh to create. Possible values: %s", availableGranularities))
defaultKubeconfig := os.Getenv("KUBECONFIG")
if _, err := os.Stat(defaultKubeconfig); os.IsNotExist(err) {
Expand Down
1 change: 1 addition & 0 deletions docs/kg.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Available Commands:

Flags:
--backend string The backend for the mesh. Possible values: kubernetes (default "kubernetes")
--check-in Should Kilo regularly check in with the backend (default true)
--clean-up Should kilo clean up network modifications on shutdown? (default true)
--clean-up-interface Should Kilo delete its interface when it shuts down?
--cni Should Kilo manage the node's CNI configuration? (default true)
Expand Down
17 changes: 10 additions & 7 deletions pkg/k8s/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type nodeBackend struct {
informer cache.SharedIndexInformer
lister v1listers.NodeLister
topologyLabel string
checkIn bool
}

type peerBackend struct {
Expand All @@ -103,7 +104,7 @@ type peerBackend struct {
}

// New creates a new instance of a mesh.Backend.
func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string, l log.Logger) mesh.Backend {
func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string, checkIn bool, l log.Logger) mesh.Backend {
ni := v1informers.NewNodeInformer(c, 5*time.Minute, nil)
pi := v1alpha1informers.NewPeerInformer(kc, 5*time.Minute, nil)

Expand All @@ -116,6 +117,7 @@ func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Inter
informer: ni,
lister: v1listers.NewNodeLister(ni.GetIndexer()),
topologyLabel: topologyLabel,
checkIn: checkIn,
},
&peerBackend{
client: kc,
Expand Down Expand Up @@ -150,7 +152,7 @@ func (nb *nodeBackend) Get(name string) (*mesh.Node, error) {
if err != nil {
return nil, err
}
return translateNode(n, nb.topologyLabel), nil
return translateNode(n, nb.topologyLabel, nb.checkIn), nil
}

// Init initializes the backend; for this backend that means
Expand All @@ -170,7 +172,7 @@ func (nb *nodeBackend) Init(ctx context.Context) error {
// Failed to decode Node; ignoring...
return
}
nb.events <- &mesh.NodeEvent{Type: mesh.AddEvent, Node: translateNode(n, nb.topologyLabel)}
nb.events <- &mesh.NodeEvent{Type: mesh.AddEvent, Node: translateNode(n, nb.topologyLabel, nb.checkIn)}
},
UpdateFunc: func(old, obj interface{}) {
n, ok := obj.(*v1.Node)
Expand All @@ -183,15 +185,15 @@ func (nb *nodeBackend) Init(ctx context.Context) error {
// Failed to decode Node; ignoring...
return
}
nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n, nb.topologyLabel), Old: translateNode(o, nb.topologyLabel)}
nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n, nb.topologyLabel, nb.checkIn), Old: translateNode(o, nb.topologyLabel, nb.checkIn)}
},
DeleteFunc: func(obj interface{}) {
n, ok := obj.(*v1.Node)
if !ok {
// Failed to decode Node; ignoring...
return
}
nb.events <- &mesh.NodeEvent{Type: mesh.DeleteEvent, Node: translateNode(n, nb.topologyLabel)}
nb.events <- &mesh.NodeEvent{Type: mesh.DeleteEvent, Node: translateNode(n, nb.topologyLabel, nb.checkIn)}
},
},
)
Expand All @@ -206,7 +208,7 @@ func (nb *nodeBackend) List() ([]*mesh.Node, error) {
}
nodes := make([]*mesh.Node, len(ns))
for i := range ns {
nodes[i] = translateNode(ns[i], nb.topologyLabel)
nodes[i] = translateNode(ns[i], nb.topologyLabel, nb.checkIn)
}
return nodes, nil
}
Expand Down Expand Up @@ -265,7 +267,7 @@ func (nb *nodeBackend) Watch() <-chan *mesh.NodeEvent {
}

// translateNode translates a Kubernetes Node to a mesh.Node.
func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
func translateNode(node *v1.Node, topologyLabel string, checkIn bool) *mesh.Node {
if node == nil {
return nil
}
Expand Down Expand Up @@ -354,6 +356,7 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
InternalIP: internalIP,
Key: key,
LastSeen: lastSeen,
CheckLastSeen: checkIn,
Leader: leader,
Location: location,
Name: node.Name,
Expand Down
63 changes: 43 additions & 20 deletions pkg/k8s/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ func TestTranslateNode(t *testing.T) {
{
name: "empty",
annotations: nil,
out: &mesh.Node{},
out: &mesh.Node{
CheckLastSeen: true,
},
},
{
name: "invalid ips",
annotations: map[string]string{
endpointAnnotationKey: "10.0.0.1",
internalIPAnnotationKey: "foo",
},
out: &mesh.Node{},
out: &mesh.Node{
CheckLastSeen: true,
},
},
{
name: "valid ips",
Expand All @@ -80,8 +84,9 @@ func TestTranslateNode(t *testing.T) {
internalIPAnnotationKey: "10.0.0.2/32",
},
out: &mesh.Node{
Endpoint: wireguard.NewEndpoint(net.ParseIP("10.0.0.1").To4(), mesh.DefaultKiloPort),
InternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2").To4(), Mask: net.CIDRMask(32, 32)},
Endpoint: wireguard.NewEndpoint(net.ParseIP("10.0.0.1").To4(), mesh.DefaultKiloPort),
InternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2").To4(), Mask: net.CIDRMask(32, 32)},
CheckLastSeen: true,
},
},
{
Expand All @@ -91,29 +96,34 @@ func TestTranslateNode(t *testing.T) {
internalIPAnnotationKey: "ff60::10/64",
},
out: &mesh.Node{
Endpoint: wireguard.NewEndpoint(net.ParseIP("ff10::10").To16(), mesh.DefaultKiloPort),
InternalIP: &net.IPNet{IP: net.ParseIP("ff60::10").To16(), Mask: net.CIDRMask(64, 128)},
Endpoint: wireguard.NewEndpoint(net.ParseIP("ff10::10").To16(), mesh.DefaultKiloPort),
InternalIP: &net.IPNet{IP: net.ParseIP("ff60::10").To16(), Mask: net.CIDRMask(64, 128)},
CheckLastSeen: true,
},
},
{
name: "invalid subnet",
annotations: map[string]string{},
out: &mesh.Node{},
subnet: "foo",
out: &mesh.Node{
CheckLastSeen: true,
},
subnet: "foo",
},
{
name: "normalize subnet",
annotations: map[string]string{},
out: &mesh.Node{
Subnet: &net.IPNet{IP: net.ParseIP("10.2.0.0").To4(), Mask: net.CIDRMask(24, 32)},
Subnet: &net.IPNet{IP: net.ParseIP("10.2.0.0").To4(), Mask: net.CIDRMask(24, 32)},
CheckLastSeen: true,
},
subnet: "10.2.0.1/24",
},
{
name: "valid subnet",
annotations: map[string]string{},
out: &mesh.Node{
Subnet: &net.IPNet{IP: net.ParseIP("10.2.1.0").To4(), Mask: net.CIDRMask(24, 32)},
Subnet: &net.IPNet{IP: net.ParseIP("10.2.1.0").To4(), Mask: net.CIDRMask(24, 32)},
CheckLastSeen: true,
},
subnet: "10.2.1.0/24",
},
Expand All @@ -123,7 +133,8 @@ func TestTranslateNode(t *testing.T) {
RegionLabelKey: "a",
},
out: &mesh.Node{
Location: "a",
Location: "a",
CheckLastSeen: true,
},
},
{
Expand All @@ -135,7 +146,8 @@ func TestTranslateNode(t *testing.T) {
RegionLabelKey: "a",
},
out: &mesh.Node{
Location: "b",
Location: "b",
CheckLastSeen: true,
},
},
{
Expand All @@ -145,7 +157,8 @@ func TestTranslateNode(t *testing.T) {
forceEndpointAnnotationKey: "-10.0.0.2:51821",
},
out: &mesh.Node{
Endpoint: wireguard.NewEndpoint(net.ParseIP("10.0.0.1").To4(), mesh.DefaultKiloPort),
Endpoint: wireguard.NewEndpoint(net.ParseIP("10.0.0.1").To4(), mesh.DefaultKiloPort),
CheckLastSeen: true,
},
},
{
Expand All @@ -155,7 +168,8 @@ func TestTranslateNode(t *testing.T) {
forceEndpointAnnotationKey: "10.0.0.2:51821",
},
out: &mesh.Node{
Endpoint: wireguard.NewEndpoint(net.ParseIP("10.0.0.2").To4(), 51821),
Endpoint: wireguard.NewEndpoint(net.ParseIP("10.0.0.2").To4(), 51821),
CheckLastSeen: true,
},
},
{
Expand All @@ -165,6 +179,7 @@ func TestTranslateNode(t *testing.T) {
},
out: &mesh.Node{
PersistentKeepalive: 25 * time.Second,
CheckLastSeen: true,
},
},
{
Expand All @@ -174,8 +189,9 @@ func TestTranslateNode(t *testing.T) {
forceInternalIPAnnotationKey: "-10.1.0.2/24",
},
out: &mesh.Node{
InternalIP: &net.IPNet{IP: net.ParseIP("10.1.0.1").To4(), Mask: net.CIDRMask(24, 32)},
NoInternalIP: false,
InternalIP: &net.IPNet{IP: net.ParseIP("10.1.0.1").To4(), Mask: net.CIDRMask(24, 32)},
NoInternalIP: false,
CheckLastSeen: true,
},
},
{
Expand All @@ -185,16 +201,19 @@ func TestTranslateNode(t *testing.T) {
forceInternalIPAnnotationKey: "10.1.0.2/24",
},
out: &mesh.Node{
InternalIP: &net.IPNet{IP: net.ParseIP("10.1.0.2").To4(), Mask: net.CIDRMask(24, 32)},
NoInternalIP: false,
InternalIP: &net.IPNet{IP: net.ParseIP("10.1.0.2").To4(), Mask: net.CIDRMask(24, 32)},
NoInternalIP: false,
CheckLastSeen: true,
},
},
{
name: "invalid time",
annotations: map[string]string{
lastSeenAnnotationKey: "foo",
},
out: &mesh.Node{},
out: &mesh.Node{
CheckLastSeen: true,
},
},
{
name: "complete",
Expand All @@ -219,6 +238,7 @@ func TestTranslateNode(t *testing.T) {
InternalIP: &net.IPNet{IP: net.ParseIP("10.1.0.2").To4(), Mask: net.CIDRMask(32, 32)},
Key: fooKey,
LastSeen: 1000000000,
CheckLastSeen: true,
Leader: true,
Location: "b",
PersistentKeepalive: 25 * time.Second,
Expand Down Expand Up @@ -250,6 +270,7 @@ func TestTranslateNode(t *testing.T) {
InternalIP: &net.IPNet{IP: net.ParseIP("10.1.0.2"), Mask: net.CIDRMask(32, 32)},
Key: fooKey,
LastSeen: 1000000000,
CheckLastSeen: true,
Leader: true,
Location: "b",
PersistentKeepalive: 25 * time.Second,
Expand Down Expand Up @@ -277,6 +298,7 @@ func TestTranslateNode(t *testing.T) {
InternalIP: nil,
Key: fooKey,
LastSeen: 1000000000,
CheckLastSeen: true,
Leader: false,
Location: "b",
PersistentKeepalive: 25 * time.Second,
Expand Down Expand Up @@ -306,6 +328,7 @@ func TestTranslateNode(t *testing.T) {
InternalIP: nil,
Key: fooKey,
LastSeen: 1000000000,
CheckLastSeen: true,
Leader: false,
Location: "b",
PersistentKeepalive: 25 * time.Second,
Expand All @@ -319,7 +342,7 @@ func TestTranslateNode(t *testing.T) {
n.ObjectMeta.Annotations = tc.annotations
n.ObjectMeta.Labels = tc.labels
n.Spec.PodCIDR = tc.subnet
node := translateNode(n, RegionLabelKey)
node := translateNode(n, RegionLabelKey, true)
if diff := pretty.Compare(node, tc.out); diff != "" {
t.Errorf("test case %q: got diff: %v", tc.name, diff)
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/mesh/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Node struct {
// LastSeen is a Unix time for the last time
// the node confirmed it was live.
LastSeen int64
// Whether Ready will check LastSeen value
CheckLastSeen bool
// Leader is a suggestion to Kilo that
// the node wants to lead its segment.
Leader bool
Expand All @@ -81,11 +83,17 @@ type Node struct {
// Ready indicates whether or not the node is ready.
func (n *Node) Ready() bool {
// Nodes that are not leaders will not have WireGuardIPs, so it is not required.
var checkedIn bool
if (n != nil) && (n.Key != wgtypes.Key{}) && (n.Subnet != nil) && (n.CheckLastSeen) {
checkedIn = time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
} else {
checkedIn = true
}
Comment on lines +86 to +91
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to add more conditions to the check-in logic.
Let's keep this scoped to the change at hand for now. If we need to add more safety let's consider that in a different PR / issue.

Suggested change
var checkedIn bool
if (n != nil) && (n.Key != wgtypes.Key{}) && (n.Subnet != nil) && (n.CheckLastSeen) {
checkedIn = time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
} else {
checkedIn = true
}
var checkedIn bool
if n.CheckLastSeen {
checkedIn = time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
} else {
checkedIn = true
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those tests are here to protect against nil values.
They do not add any logic as they are also in the return statement and would return false anyway.

That was my first implem, but it did not pass the tests. I agree it is not pretty now (return one liner), but did not wanted to refactor too much.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agh yes, I woke up in the middle of the night today and literally said "I understand now". I have a little refactor in mind that might be more legible

return n != nil &&
n.Endpoint.Ready() &&
n.Key != wgtypes.Key{} &&
n.Subnet != nil &&
time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
checkedIn
}

// Peer represents a peer in the network.
Expand Down
Loading