Skip to content

Commit

Permalink
ipam-node: Remove stale pool entries from the store
Browse files Browse the repository at this point in the history
remove the pool data from the store if the pool has
no allocations and if information about the pool is
unavailable in the Kubernetes API

Signed-off-by: Yury Kulazhenkov <[email protected]>
  • Loading branch information
ykulazhenkov committed Aug 7, 2023
1 parent d45ca3f commit 9d0c23e
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}
return
}
c := cleaner.New(mgr.GetClient(), store, time.Minute, 3)
c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3)
c.Start(innerCtx)
logger.Info("cleaner stopped")
}()
Expand Down
36 changes: 26 additions & 10 deletions pkg/ipam-node/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (

storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
)

// Cleaner is the interface of the cleaner package.
// The cleaner periodically scan the store and check for allocations which doesn't have
// related Pod in the k8s API. If allocation has no Pod for more than X checks, then the cleaner
// will release the allocation.
// will release the allocation. Also, the cleaner will remove pool entries in the store if the pool has no
// allocation and pool configuration is unavailable in the Kubernetes API.
type Cleaner interface {
// Start starts the cleaner loop.
// The cleaner loop discovers stale allocations and clean up them.
Expand All @@ -43,12 +45,13 @@ type Cleaner interface {
// New creates and initialize new cleaner instance
// "checkInterval" defines delay between checks for stale allocations.
// "checkCountBeforeRelease: defines how many check to do before remove the allocation
func New(client client.Client, store storePkg.Store,
func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader,
checkInterval time.Duration,
checkCountBeforeRelease int) Cleaner {
return &cleaner{
client: client,
store: store,
poolConfReader: poolConfReader,
checkInterval: checkInterval,
checkCountBeforeRelease: checkCountBeforeRelease,
staleAllocations: make(map[string]int),
Expand All @@ -58,6 +61,7 @@ func New(client client.Client, store storePkg.Store,
type cleaner struct {
client client.Client
store storePkg.Store
poolConfReader pool.ConfigReader
checkInterval time.Duration
checkCountBeforeRelease int
// key is <pool_name>|<container_id>|<interface_name>, value is count of failed checks
Expand All @@ -84,13 +88,19 @@ func (c *cleaner) Start(ctx context.Context) {

func (c *cleaner) loop(ctx context.Context) error {
logger := logr.FromContextOrDiscard(ctx)
store, err := c.store.Open(ctx)
session, err := c.store.Open(ctx)
if err != nil {
return fmt.Errorf("failed to open store: %v", err)
}
allReservations := map[string]struct{}{}
for _, poolName := range store.ListPools() {
for _, reservation := range store.ListReservations(poolName) {
emptyPools := []string{}
for _, poolName := range session.ListPools() {
poolReservations := session.ListReservations(poolName)
if len(poolReservations) == 0 {
emptyPools = append(emptyPools, poolName)
continue
}
for _, reservation := range poolReservations {
resLogger := logger.WithValues("pool", poolName,
"container_id", reservation.ContainerID, "interface_name", reservation.InterfaceName)
key := c.getStaleAllocKey(poolName, reservation)
Expand All @@ -105,7 +115,7 @@ func (c *cleaner) loop(ctx context.Context) error {
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
store.Cancel()
session.Cancel()
return fmt.Errorf("failed to read Pod info from the cache: %v", err)
}
if apiErrors.IsNotFound(err) ||
Expand All @@ -128,13 +138,19 @@ func (c *cleaner) loop(ctx context.Context) error {
// release reservations which were marked as stale multiple times
if count > c.checkCountBeforeRelease {
keyFields := strings.SplitN(k, "|", 3)
pool, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "pool", pool,
poolName, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "poolName", poolName,
"container_id", containerID, "interface_name", ifName)
store.ReleaseReservationByID(pool, containerID, ifName)
session.ReleaseReservationByID(poolName, containerID, ifName)
}
}
// remove empty pools if they don't have configuration in the k8s API
for _, emptyPool := range emptyPools {
if p := c.poolConfReader.GetPoolByName(emptyPool); p == nil {
session.RemovePool(emptyPool)
}
}
if err := store.Commit(); err != nil {
if err := session.Commit(); err != nil {
return fmt.Errorf("failed to commit changes to the store: %v", err)
}
return nil
Expand Down
42 changes: 29 additions & 13 deletions pkg/ipam-node/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
cleanerPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner"
storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types"
poolPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
poolMockPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool/mocks"
)

const (
Expand All @@ -35,6 +37,7 @@ const (
testPodName2 = "test-pod2"
testPool1 = "pool1"
testPool2 = "pool2"
testPool3 = "pool3"
testIFName = "net0"
)

Expand All @@ -54,62 +57,75 @@ var _ = Describe("Cleaner", func() {
It("Cleanup test", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
defer close(done)
storePath := filepath.Join(GinkgoT().TempDir(), "test_store")
storeMgr := storePkg.New(storePath)
cleaner := cleanerPkg.New(k8sClient, storeMgr, time.Millisecond*100, 3)
store := storePkg.New(storePath)

pod1UID := createPod(testPodName1, testNamespace)
_ = createPod(testPodName2, testNamespace)
poolManager := poolMockPkg.NewManager(GinkgoT())
// pool2 has no config in the k8s API
poolManager.On("GetPoolByName", testPool2).Return(nil)
// pool3 has config in the k8s API
poolManager.On("GetPoolByName", testPool3).Return(&poolPkg.IPPool{})

store, err := storeMgr.Open(ctx)
session, err := store.Open(ctx)
Expect(err).NotTo(HaveOccurred())
// this will create empty pool config
session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100"))

cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3)

pod1UID := createPod(testPodName1, testNamespace)
_ = createPod(testPodName2, testNamespace)

// should keep these reservations
Expect(store.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodUUID: pod1UID,
PodName: testPodName1,
PodNamespace: testNamespace,
}, net.ParseIP("192.168.1.100"))).NotTo(HaveOccurred())

Expect(store.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{},
Expect(session.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{},
net.ParseIP("192.168.1.101"))).NotTo(HaveOccurred())

// should remove these reservations
Expect(store.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodName: "unknown",
PodNamespace: testNamespace,
}, net.ParseIP("192.168.1.102"))).NotTo(HaveOccurred())
Expect(store.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodName: "unknown2",
PodNamespace: testNamespace,
}, net.ParseIP("192.168.2.100"))).NotTo(HaveOccurred())
Expect(store.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodUUID: "something", // differ from the reservation
PodName: testPodName2,
PodNamespace: testNamespace,
}, net.ParseIP("192.168.2.101"))).NotTo(HaveOccurred())

Expect(store.Commit()).NotTo(HaveOccurred())
Expect(session.Commit()).NotTo(HaveOccurred())

go func() {
cleaner.Start(ctx)
}()
Eventually(func(g Gomega) {
store, err := storeMgr.Open(ctx)
store, err := store.Open(ctx)
g.Expect(err).NotTo(HaveOccurred())
defer store.Cancel()
g.Expect(store.GetReservationByID(testPool1, "id1", testIFName)).NotTo(BeNil())
g.Expect(store.GetReservationByID(testPool1, "id2", testIFName)).NotTo(BeNil())
g.Expect(store.GetReservationByID(testPool1, "id3", testIFName)).To(BeNil())
g.Expect(store.GetReservationByID(testPool2, "id4", testIFName)).To(BeNil())
g.Expect(store.GetReservationByID(testPool2, "id5", testIFName)).To(BeNil())
g.Expect(store.ListPools()).To(And(
ContainElements(testPool1, testPool3),
Not(ContainElement(testPool2))))
}, 10).Should(Succeed())

close(done)
}()
Eventually(done, time.Minute).Should(BeClosed())
})
Expand Down
33 changes: 33 additions & 0 deletions pkg/ipam-node/store/mocks/Session.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/ipam-node/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Session interface {
ListReservations(pool string) []types.Reservation
// ListPools return list with names of all known pools
ListPools() []string
// RemovePool removes information about the pool from the store
RemovePool(pool string)
// GetLastReservedIP returns last reserved IP for the pool or nil
GetLastReservedIP(pool string) net.IP
// SetLastReservedIP set last reserved IP fot the pool
Expand Down Expand Up @@ -256,6 +258,13 @@ func (s *session) ListPools() []string {
return pools
}

// RemovePool is the Session interface implementation for session
func (s *session) RemovePool(pool string) {
s.checkClosed()
delete(s.tmpData.Pools, pool)
s.isModified = true
}

// GetLastReservedIP is the Session interface implementation for session
func (s *session) GetLastReservedIP(pool string) net.IP {
s.checkClosed()
Expand Down
8 changes: 8 additions & 0 deletions pkg/ipam-node/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,12 @@ var _ = Describe("Store", func() {
s.Reserve(testPoolName, "other", testNetIfName,
types.ReservationMetadata{}, net.ParseIP(testIP))).To(MatchError(storePkg.ErrIPAlreadyReserved))
})
It("Remove pool data", func() {
s, err := store.Open(context.Background())
Expect(err).NotTo(HaveOccurred())
createTestReservation(s)
Expect(s.ListReservations(testPoolName)).NotTo(BeEmpty())
s.RemovePool(testPoolName)
Expect(s.ListReservations(testPoolName)).To(BeEmpty())
})
})

0 comments on commit 9d0c23e

Please sign in to comment.