From 39408aa7d36bac407c0d11e8c0d5a71cae13180b Mon Sep 17 00:00:00 2001 From: nbisson Date: Fri, 26 Apr 2024 15:00:47 +0200 Subject: [PATCH 01/11] Add compatability with calico in BGP mode --- cmd/kg/handlers.go | 2 +- pkg/k8s/backend.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ pkg/mesh/backend.go | 26 +++++++++++++++++++++ pkg/mesh/mesh.go | 48 +++++++++++++++++++++++++++++++++++++++ pkg/mesh/topology.go | 10 +++++++- 5 files changed, 138 insertions(+), 2 deletions(-) diff --git a/cmd/kg/handlers.go b/cmd/kg/handlers.go index be00cee7..4a40fe0f 100644 --- a/cmd/kg/handlers.go +++ b/cmd/kg/handlers.go @@ -65,7 +65,7 @@ func (h *graphHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { peers[p.Name] = p } } - topo, err := mesh.NewTopology(nodes, peers, h.granularity, *h.hostname, 0, wgtypes.Key{}, h.subnet, h.serviceCIDRs, nodes[*h.hostname].PersistentKeepalive, nil) + topo, err := mesh.NewTopology(nodes, peers, nil, h.granularity, *h.hostname, 0, wgtypes.Key{}, h.subnet, h.serviceCIDRs, nodes[*h.hostname].PersistentKeepalive, nil) if err != nil { http.Error(w, fmt.Sprintf("failed to create topology: %v", err), http.StatusInternalServerError) return diff --git a/pkg/k8s/backend.go b/pkg/k8s/backend.go index e94a6646..4fe25432 100644 --- a/pkg/k8s/backend.go +++ b/pkg/k8s/backend.go @@ -565,3 +565,57 @@ func normalizeIP(ip string) *net.IPNet { ipNet.IP = i.To16() return ipNet } + +// Init implements mesh.PodBackend. +func (p *podBackend) Init(ctx context.Context) error { + go p.informer.Run(ctx.Done()) + if ok := cache.WaitForCacheSync(ctx.Done(), func() bool { + return p.informer.HasSynced() + }); !ok { + return errors.New("failed to sync pod cache") + } + p.informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, obj interface{}) { + n, ok := obj.(*v1.Pod) + if !ok { + // Failed to decode Pod; ignoring... + return + } + o, ok := old.(*v1.Pod) + if !ok { + // Failed to decode Pod; ignoring... + return + } + p.events <- &mesh.PodEvent{Type: mesh.UpdateEvent, Pod: translatePod(n), Old: translatePod(o)} + }, + DeleteFunc: func(obj interface{}) { + n, ok := obj.(*v1.Pod) + if !ok { + // Failed to decode Pod; ignoring... + return + } + p.events <- &mesh.PodEvent{Type: mesh.DeleteEvent, Pod: translatePod(n)} + }, + }, + ) + return nil +} + +// List implements mesh.PodBackend. +func (pb *podBackend) List() ([]*mesh.Pod, error) { + ps, err := pb.lister.List(labels.Everything()) + if err != nil { + return nil, err + } + pods := make([]*mesh.Pod, len(ps)) + for i := range ps { + pods[i] = translatePod(ps[i]) + } + return pods, nil +} + +// Watch implements mesh.PodBackend. +func (p *podBackend) Watch() <-chan *mesh.PodEvent { + return p.events +} diff --git a/pkg/mesh/backend.go b/pkg/mesh/backend.go index 203661d1..7fbd18cf 100644 --- a/pkg/mesh/backend.go +++ b/pkg/mesh/backend.go @@ -20,6 +20,7 @@ import ( "time" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/wireguard" ) @@ -106,6 +107,18 @@ func (p *Peer) Ready() bool { p.PublicKey != wgtypes.Key{} // If Key was not set, it will be wgtypes.Key{}. } +type Pod struct { + Uid types.UID + Name string + Namespace string + NodeName string + IP *net.IPNet +} + +func (p *Pod) Ready() bool { + return p != nil +} + // EventType describes what kind of an action an event represents. type EventType string @@ -132,6 +145,12 @@ type PeerEvent struct { Old *Peer } +type PodEvent struct { + Type EventType + Pod *Pod + Old *Pod +} + // Backend can create clients for all of the // primitive types that Kilo deals with, namely: // * nodes; and @@ -139,6 +158,7 @@ type PeerEvent struct { type Backend interface { Nodes() NodeBackend Peers() PeerBackend + Pods() PodBackend } // NodeBackend can get nodes by name, init itself, @@ -168,3 +188,9 @@ type PeerBackend interface { Set(context.Context, string, *Peer) error Watch() <-chan *PeerEvent } + +type PodBackend interface { + Init(context.Context) error + List() ([]*Pod, error) + Watch() <-chan *PodEvent +} diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index 3057d2a2..48dc140b 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -32,6 +32,7 @@ import ( "github.com/vishvananda/netlink" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/encapsulation" "github.com/squat/kilo/pkg/iproute" @@ -78,6 +79,7 @@ type Mesh struct { // and need to be guarded. nodes map[string]*Node peers map[string]*Peer + pods map[types.UID]*Pod mu sync.Mutex errorCounter *prometheus.CounterVec @@ -182,6 +184,7 @@ func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularit kiloIfaceName: iface, nodes: make(map[string]*Node), peers: make(map[string]*Peer), + pods: make(map[types.UID]*Pod), port: port, priv: private, privIface: privIface, @@ -241,6 +244,9 @@ func (m *Mesh) Run(ctx context.Context) error { if err := m.Peers().Init(ctx); err != nil { return fmt.Errorf("failed to initialize peer backend: %v", err) } + if err := m.Pods().Init(ctx); err != nil { + return fmt.Errorf("failed to initialize pod backend: %v", err) + } ipTablesErrors, err := m.ipTables.Run(ctx.Done()) if err != nil { return fmt.Errorf("failed to watch for IP tables updates: %v", err) @@ -271,14 +277,18 @@ func (m *Mesh) Run(ctx context.Context) error { checkIn := time.NewTimer(checkInPeriod) nw := m.Nodes().Watch() pw := m.Peers().Watch() + po := m.Pods().Watch() var ne *NodeEvent var pe *PeerEvent + var poe *PodEvent for { select { case ne = <-nw: m.syncNodes(ctx, ne) case pe = <-pw: m.syncPeers(pe) + case poe = <-po: + m.syncPods(poe) case <-checkIn.C: m.checkIn(ctx) checkIn.Reset(checkInPeriod) @@ -365,6 +375,34 @@ func (m *Mesh) syncPeers(e *PeerEvent) { } } +func (m *Mesh) syncPods(e *PodEvent) { + logger := log.With(m.logger, "event", e.Type) + level.Debug(logger).Log("msg", "syncing pods", "event", e.Type) + var diff bool + m.mu.Lock() + // Pod are indexed by uid. + key := e.Pod.Uid + + switch e.Type { + case UpdateEvent: + if !podsAreEqual(m.pods[key], e.Pod) { + // Pod have IP => add allowed + if e.Pod.IP != nil { + m.pods[key] = e.Pod + diff = true + } + } + case DeleteEvent: + // Remove allowed + delete(m.pods, key) + diff = true + } + m.mu.Unlock() + if diff { + m.applyTopology() + } +} + // checkIn will try to update the local node's LastSeen timestamp // in the backend. func (m *Mesh) checkIn(ctx context.Context) { @@ -720,6 +758,16 @@ func peersAreEqual(a, b *Peer) bool { (a.PersistentKeepaliveInterval == nil || *a.PersistentKeepaliveInterval == *b.PersistentKeepaliveInterval) } +func podsAreEqual(a, b *Pod) bool { + if !(a != nil) == (b != nil) { + return false + } + if a == b { + return true + } + return a.Name == b.Name +} + func ipNetsEqual(a, b *net.IPNet) bool { if a == nil && b == nil { return true diff --git a/pkg/mesh/topology.go b/pkg/mesh/topology.go index ca22bf61..7c66fa8c 100644 --- a/pkg/mesh/topology.go +++ b/pkg/mesh/topology.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/wireguard" ) @@ -96,7 +97,7 @@ type segment struct { } // NewTopology creates a new Topology struct from a given set of nodes and peers. -func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port int, key wgtypes.Key, subnet *net.IPNet, serviceCIDRs []*net.IPNet, persistentKeepalive time.Duration, logger log.Logger) (*Topology, error) { +func NewTopology(nodes map[string]*Node, peers map[string]*Peer, pods map[types.UID]*Pod, granularity Granularity, hostname string, port int, key wgtypes.Key, subnet *net.IPNet, serviceCIDRs []*net.IPNet, persistentKeepalive time.Duration, logger log.Logger) (*Topology, error) { if logger == nil { logger = log.NewNopLogger() } @@ -176,6 +177,13 @@ func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Gra } cidrs = append(cidrs, node.Subnet) hostnames = append(hostnames, node.Name) + + for k := range pods { + if pods[k].NodeName == node.Name { + level.Debug(t.logger).Log("msg", "Add ip pod on allowedip wireguard", "nodename", node.Name, "allowedip", *pods[k].IP) + allowedIPs = append(allowedIPs, *pods[k].IP) + } + } } // The sorting has no function, but makes testing easier. sort.Slice(allowedLocationIPs, func(i, j int) bool { From 0aebe085c060c24ce175ecadc54a7afd567b90dc Mon Sep 17 00:00:00 2001 From: nbisson Date: Thu, 2 May 2024 16:30:56 +0200 Subject: [PATCH 02/11] Fix podsAreEqual by comparing with Uid --- pkg/mesh/mesh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index 48dc140b..a8ec2d79 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -765,7 +765,7 @@ func podsAreEqual(a, b *Pod) bool { if a == b { return true } - return a.Name == b.Name + return a.Uid == b.Uid } func ipNetsEqual(a, b *net.IPNet) bool { From e1229c89edc5166ac36ce1bda622a8ca896e60dc Mon Sep 17 00:00:00 2001 From: nbisson Date: Thu, 2 May 2024 17:02:55 +0200 Subject: [PATCH 03/11] use go fmt --- pkg/mesh/mesh.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index a8ec2d79..212d697d 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -385,7 +385,7 @@ func (m *Mesh) syncPods(e *PodEvent) { switch e.Type { case UpdateEvent: - if !podsAreEqual(m.pods[key], e.Pod) { + if !podsAreEqual(m.pods[key], e.Pod) { // Pod have IP => add allowed if e.Pod.IP != nil { m.pods[key] = e.Pod @@ -393,7 +393,7 @@ func (m *Mesh) syncPods(e *PodEvent) { } } case DeleteEvent: - // Remove allowed + // Remove allowed delete(m.pods, key) diff = true } From 216fb43cade44c47d177351799ed138c2da40f1c Mon Sep 17 00:00:00 2001 From: nbisson Date: Thu, 2 May 2024 17:14:27 +0200 Subject: [PATCH 04/11] Add IP comparation in podsAreEqual method --- pkg/mesh/mesh.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index 212d697d..8f79077b 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -765,7 +765,8 @@ func podsAreEqual(a, b *Pod) bool { if a == b { return true } - return a.Uid == b.Uid + return a.Uid == b.Uid && + a.IP == b.IP } func ipNetsEqual(a, b *net.IPNet) bool { From 87877ce9db501422b86a248a9f35c9bf1123ff19 Mon Sep 17 00:00:00 2001 From: nbisson Date: Thu, 2 May 2024 17:38:37 +0200 Subject: [PATCH 05/11] refacto podsAreEqual function --- pkg/mesh/mesh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index 8f79077b..d771a5d2 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -759,7 +759,7 @@ func peersAreEqual(a, b *Peer) bool { } func podsAreEqual(a, b *Pod) bool { - if !(a != nil) == (b != nil) { + if (a != nil) != (b != nil) { return false } if a == b { From 39b79df54f59bca806c7c56cb505d74b64726e2e Mon Sep 17 00:00:00 2001 From: nbisson Date: Fri, 3 May 2024 14:53:00 +0200 Subject: [PATCH 06/11] Add podBackend and get all pods at mesh initialization --- pkg/k8s/backend.go | 33 ++++++++++++++++++++++++++++++++- pkg/mesh/mesh.go | 22 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pkg/k8s/backend.go b/pkg/k8s/backend.go index 4fe25432..8812ad02 100644 --- a/pkg/k8s/backend.go +++ b/pkg/k8s/backend.go @@ -74,6 +74,7 @@ var logger = log.NewNopLogger() type backend struct { nodes *nodeBackend peers *peerBackend + pods *podBackend } // Nodes implements the mesh.Backend interface. @@ -86,6 +87,11 @@ func (b *backend) Peers() mesh.PeerBackend { return b.peers } +// Pods implements the mesh.Backend interface. +func (b *backend) Pods() mesh.PodBackend { + return b.pods +} + type nodeBackend struct { client kubernetes.Interface events chan *mesh.NodeEvent @@ -102,10 +108,18 @@ type peerBackend struct { lister v1alpha1listers.PeerLister } +type podBackend struct { + client kubernetes.Interface + events chan *mesh.PodEvent + informer cache.SharedIndexInformer + lister v1listers.PodLister +} + // New creates a new instance of a mesh.Backend. func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string, l log.Logger) mesh.Backend { ni := v1informers.NewNodeInformer(c, 5*time.Minute, nil) pi := v1alpha1informers.NewPeerInformer(kc, 5*time.Minute, nil) + po := v1informers.NewPodInformer(c, "", 5*time.Minute, nil) logger = l @@ -124,6 +138,12 @@ func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Inter informer: pi, lister: v1alpha1listers.NewPeerLister(pi.GetIndexer()), }, + &podBackend{ + client: c, + events: make(chan *mesh.PodEvent), + informer: po, + lister: v1listers.NewPodLister(po.GetIndexer()), + }, } } @@ -430,6 +450,17 @@ func translatePeer(peer *v1alpha1.Peer) *mesh.Peer { } } +// translatePod translates a Peer CRD to a mesh.Peer. +func translatePod(pod *v1.Pod) *mesh.Pod { + return &mesh.Pod{ + Uid: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + NodeName: pod.Spec.NodeName, + IP: normalizeIP(pod.Status.PodIP + "/32"), + } +} + // CleanUp removes configuration applied to the backend. func (pb *peerBackend) CleanUp(_ context.Context, _ string) error { return nil @@ -602,7 +633,7 @@ func (p *podBackend) Init(ctx context.Context) error { return nil } -// List implements mesh.PodBackend. +// List gets all the Pods in the cluster. func (pb *podBackend) List() ([]*mesh.Pod, error) { ps, err := pb.lister.List(labels.Everything()) if err != nil { diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index d771a5d2..4384ab11 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -247,6 +247,15 @@ func (m *Mesh) Run(ctx context.Context) error { if err := m.Pods().Init(ctx); err != nil { return fmt.Errorf("failed to initialize pod backend: %v", err) } + + // Get all pods during mesh initialization + ps, err := m.Pods().List() + for _, p := range ps { + if p.IP != nil { + m.pods[p.Uid] = p + } + } + ipTablesErrors, err := m.ipTables.Run(ctx.Done()) if err != nil { return fmt.Errorf("failed to watch for IP tables updates: %v", err) @@ -511,6 +520,16 @@ func (m *Mesh) applyTopology() { peers[k] = m.peers[k] readyPeers++ } + pods := make(map[types.UID]*Pod) + //var readyPods float64 + for k := range m.pods { + if !m.pods[k].Ready() { + continue + } + // Make it point the pod without copy. + pods[k] = m.pods[k] + //readyPods++ + } m.nodesGuage.Set(readyNodes) m.peersGuage.Set(readyPeers) // We cannot do anything with the topology until the local node is available. @@ -543,7 +562,8 @@ func (m *Mesh) applyTopology() { natEndpoints := discoverNATEndpoints(nodes, peers, wgDevice, m.logger) nodes[m.hostname].DiscoveredEndpoints = natEndpoints - t, err := NewTopology(nodes, peers, m.granularity, m.hostname, nodes[m.hostname].Endpoint.Port(), m.priv, m.subnet, m.serviceCIDRs, nodes[m.hostname].PersistentKeepalive, m.logger) + + t, err := NewTopology(nodes, peers, pods, m.granularity, m.hostname, nodes[m.hostname].Endpoint.Port(), m.priv, m.subnet, m.serviceCIDRs, nodes[m.hostname].PersistentKeepalive, m.logger) if err != nil { level.Error(m.logger).Log("error", err) m.errorCounter.WithLabelValues("apply").Inc() From c00b69ace4bcd1ae2a83971f1eebf7b46e285e0e Mon Sep 17 00:00:00 2001 From: nbisson Date: Fri, 3 May 2024 14:56:59 +0200 Subject: [PATCH 07/11] make podsareequal more simpler --- pkg/mesh/mesh.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index 4384ab11..157aa26b 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -782,9 +782,6 @@ func podsAreEqual(a, b *Pod) bool { if (a != nil) != (b != nil) { return false } - if a == b { - return true - } return a.Uid == b.Uid && a.IP == b.IP } From 30cb89a1000a0f459b5e8bbcaaf601246b109124 Mon Sep 17 00:00:00 2001 From: nbisson Date: Fri, 3 May 2024 15:01:14 +0200 Subject: [PATCH 08/11] Add pods in handlers --- cmd/kg/handlers.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/kg/handlers.go b/cmd/kg/handlers.go index 4a40fe0f..e90f0496 100644 --- a/cmd/kg/handlers.go +++ b/cmd/kg/handlers.go @@ -25,6 +25,7 @@ import ( "os/exec" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/mesh" ) @@ -48,6 +49,11 @@ func (h *graphHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("failed to list peers: %v", err), http.StatusInternalServerError) return } + pos, err := h.mesh.Pods().List() + if err != nil { + http.Error(w, fmt.Sprintf("failed to list pods: %v", err), http.StatusInternalServerError) + return + } nodes := make(map[string]*mesh.Node) for _, n := range ns { @@ -65,7 +71,14 @@ func (h *graphHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { peers[p.Name] = p } } - topo, err := mesh.NewTopology(nodes, peers, nil, h.granularity, *h.hostname, 0, wgtypes.Key{}, h.subnet, h.serviceCIDRs, nodes[*h.hostname].PersistentKeepalive, nil) + pods := make(map[types.UID]*mesh.Pod) + for _, p := range pos { + if p.Ready() { + pods[p.Uid] = p + } + } + + topo, err := mesh.NewTopology(nodes, peers, pods, h.granularity, *h.hostname, 0, wgtypes.Key{}, h.subnet, h.serviceCIDRs, nodes[*h.hostname].PersistentKeepalive, nil) if err != nil { http.Error(w, fmt.Sprintf("failed to create topology: %v", err), http.StatusInternalServerError) return From a72ad7d50577007755ffc5780a9d4504046c76c6 Mon Sep 17 00:00:00 2001 From: nbisson Date: Tue, 7 May 2024 09:26:35 +0200 Subject: [PATCH 09/11] add calico-bgp compatibility mode --- cmd/kg/main.go | 9 +++++++-- pkg/k8s/backend.go | 23 +++++++++++++++-------- pkg/mesh/mesh.go | 22 +++++++++++++--------- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/cmd/kg/main.go b/cmd/kg/main.go index 66e39a09..5b96809c 100644 --- a/cmd/kg/main.go +++ b/cmd/kg/main.go @@ -60,6 +60,7 @@ var ( }, ", ") availableCompatibilities = strings.Join([]string{ "flannel", + "calico-bgp", }, ", ") availableEncapsulations = strings.Join([]string{ string(encapsulation.Never), @@ -214,11 +215,15 @@ func runRoot(_ *cobra.Command, _ []string) error { } var enc encapsulation.Encapsulator + var watchPods = false switch compatibility { case "flannel": enc = encapsulation.NewFlannel(e) case "cilium": enc = encapsulation.NewCilium(e) + case "calico-bgp": + watchPods = true + fallthrough default: enc = encapsulation.NewIPIP(e) } @@ -241,7 +246,7 @@ func runRoot(_ *cobra.Command, _ []string) error { c := kubernetes.NewForConfigOrDie(config) kc := kiloclient.NewForConfigOrDie(config) ec := apiextensions.NewForConfigOrDie(config) - b = k8s.New(c, kc, ec, topologyLabel, log.With(logger, "component", "k8s backend")) + b = k8s.New(c, kc, ec, topologyLabel, log.With(logger, "component", "k8s backend"), watchPods) default: return fmt.Errorf("backend %v unknown; possible values are: %s", backend, availableBackends) } @@ -259,7 +264,7 @@ func runRoot(_ *cobra.Command, _ []string) error { serviceCIDRs = append(serviceCIDRs, s) } - m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry) + m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry, watchPods) if err != nil { return fmt.Errorf("failed to create Kilo mesh: %v", err) } diff --git a/pkg/k8s/backend.go b/pkg/k8s/backend.go index 8812ad02..26dab538 100644 --- a/pkg/k8s/backend.go +++ b/pkg/k8s/backend.go @@ -116,10 +116,22 @@ type podBackend struct { } // New creates a new instance of a mesh.Backend. -func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string, l log.Logger) mesh.Backend { +func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string, l log.Logger, watchPods bool) mesh.Backend { ni := v1informers.NewNodeInformer(c, 5*time.Minute, nil) pi := v1alpha1informers.NewPeerInformer(kc, 5*time.Minute, nil) - po := v1informers.NewPodInformer(c, "", 5*time.Minute, nil) + + var pb *podBackend + if watchPods { + po := v1informers.NewPodInformer(c, "", 5*time.Minute, nil) + pb = &podBackend{ + client: c, + events: make(chan *mesh.PodEvent), + informer: po, + lister: v1listers.NewPodLister(po.GetIndexer()), + } + } else { + pb = &podBackend{} + } logger = l @@ -138,12 +150,7 @@ func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Inter informer: pi, lister: v1alpha1listers.NewPeerLister(pi.GetIndexer()), }, - &podBackend{ - client: c, - events: make(chan *mesh.PodEvent), - informer: po, - lister: v1listers.NewPodLister(po.GetIndexer()), - }, + pb, } } diff --git a/pkg/mesh/mesh.go b/pkg/mesh/mesh.go index 157aa26b..161e8b34 100644 --- a/pkg/mesh/mesh.go +++ b/pkg/mesh/mesh.go @@ -73,6 +73,7 @@ type Mesh struct { serviceCIDRs []*net.IPNet subnet *net.IPNet table *route.Table + watchPods bool wireGuardIP *net.IPNet // nodes and peers are mutable fields in the struct @@ -91,7 +92,7 @@ type Mesh struct { } // New returns a new Mesh instance. -func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularity, hostname string, port int, subnet *net.IPNet, local, cni bool, cniPath, iface string, cleanup bool, cleanUpIface bool, createIface bool, mtu uint, resyncPeriod time.Duration, prioritisePrivateAddr, iptablesForwardRule bool, serviceCIDRs []*net.IPNet, logger log.Logger, registerer prometheus.Registerer) (*Mesh, error) { +func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularity, hostname string, port int, subnet *net.IPNet, local, cni bool, cniPath, iface string, cleanup bool, cleanUpIface bool, createIface bool, mtu uint, resyncPeriod time.Duration, prioritisePrivateAddr, iptablesForwardRule bool, serviceCIDRs []*net.IPNet, logger log.Logger, registerer prometheus.Registerer, watchPods bool) (*Mesh, error) { if err := os.MkdirAll(kiloPath, 0700); err != nil { return nil, fmt.Errorf("failed to create directory to store configuration: %v", err) } @@ -190,6 +191,7 @@ func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularit privIface: privIface, pub: public, resyncPeriod: resyncPeriod, + watchPods: watchPods, iptablesForwardRule: iptablesForwardRule, local: local, serviceCIDRs: serviceCIDRs, @@ -244,15 +246,17 @@ func (m *Mesh) Run(ctx context.Context) error { if err := m.Peers().Init(ctx); err != nil { return fmt.Errorf("failed to initialize peer backend: %v", err) } - if err := m.Pods().Init(ctx); err != nil { - return fmt.Errorf("failed to initialize pod backend: %v", err) - } + if m.watchPods { + if err := m.Pods().Init(ctx); err != nil { + return fmt.Errorf("failed to initialize pod backend: %v", err) + } - // Get all pods during mesh initialization - ps, err := m.Pods().List() - for _, p := range ps { - if p.IP != nil { - m.pods[p.Uid] = p + // Get all pods during mesh initialization + ps, _ := m.Pods().List() + for _, p := range ps { + if p.IP != nil { + m.pods[p.Uid] = p + } } } From 8382c1eab14693b7aec8dca95a9aef75ad8f3e19 Mon Sep 17 00:00:00 2001 From: nbisson Date: Tue, 7 May 2024 10:44:41 +0200 Subject: [PATCH 10/11] Resolve tests --- cmd/kgctl/graph.go | 4 +++- cmd/kgctl/main.go | 2 +- cmd/kgctl/showconf.go | 9 +++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cmd/kgctl/graph.go b/cmd/kgctl/graph.go index cb104089..d99925e4 100644 --- a/cmd/kgctl/graph.go +++ b/cmd/kgctl/graph.go @@ -19,6 +19,7 @@ import ( "github.com/spf13/cobra" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/mesh" ) @@ -67,7 +68,8 @@ func runGraph(_ *cobra.Command, _ []string) error { peers[p.Name] = p } } - t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) + pods := make(map[types.UID]*mesh.Pod) + t, err := mesh.NewTopology(nodes, peers, pods, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) if err != nil { return fmt.Errorf("failed to create topology: %w", err) } diff --git a/cmd/kgctl/main.go b/cmd/kgctl/main.go index 9a6c58ac..a753c2d9 100644 --- a/cmd/kgctl/main.go +++ b/cmd/kgctl/main.go @@ -94,7 +94,7 @@ func runRoot(c *cobra.Command, _ []string) error { c := kubernetes.NewForConfigOrDie(config) opts.kc = kiloclient.NewForConfigOrDie(config) ec := apiextensions.NewForConfigOrDie(config) - opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger()) + opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger(), false) default: return fmt.Errorf("backend %s unknown; posible values are: %s", backend, availableBackends) } diff --git a/cmd/kgctl/showconf.go b/cmd/kgctl/showconf.go index 79169f7b..5ed1ca67 100644 --- a/cmd/kgctl/showconf.go +++ b/cmd/kgctl/showconf.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1" "github.com/squat/kilo/pkg/mesh" @@ -152,7 +153,9 @@ func runShowConfNode(_ *cobra.Command, args []string) error { } } - t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) + pods := make(map[types.UID]*mesh.Pod) + + t, err := mesh.NewTopology(nodes, peers, pods, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) if err != nil { return fmt.Errorf("failed to create topology: %w", err) } @@ -251,11 +254,13 @@ func runShowConfPeer(_ *cobra.Command, args []string) error { return fmt.Errorf("did not find any peer named %q in the cluster", peer) } + pods := make(map[types.UID]*mesh.Pod) + pka := time.Duration(0) if p := peers[peer].PersistentKeepaliveInterval; p != nil { pka = *p } - t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, nil, pka, nil) + t, err := mesh.NewTopology(nodes, peers, pods, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, nil, pka, nil) if err != nil { return fmt.Errorf("failed to create topology: %w", err) } From 377febaacb550e63be54d18468e36e6d2b5488de Mon Sep 17 00:00:00 2001 From: nbisson Date: Tue, 7 May 2024 11:14:40 +0200 Subject: [PATCH 11/11] add kilo-k3s-calico-bgp exemple --- manifests/kilo-k3s-calico-bgp.yaml | 176 +++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 manifests/kilo-k3s-calico-bgp.yaml diff --git a/manifests/kilo-k3s-calico-bgp.yaml b/manifests/kilo-k3s-calico-bgp.yaml new file mode 100644 index 00000000..04aafcdb --- /dev/null +++ b/manifests/kilo-k3s-calico-bgp.yaml @@ -0,0 +1,176 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kilo + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kilo +rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - list + - patch + - watch +- apiGroups: + - kilo.squat.ai + resources: + - peers + verbs: + - list + - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get +- apiGroups: + - "" + resources: + - pods + verbs: + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kilo +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kilo +subjects: + - kind: ServiceAccount + name: kilo + namespace: kube-system +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kilo-scripts + namespace: kube-system +data: + init.sh: | + #!/bin/sh + cat > /etc/kubernetes/kubeconfig <