Skip to content

Commit

Permalink
chore: migrate uncompressed resources data
Browse files Browse the repository at this point in the history
Abusing generics to compress uncompressed data in `*omni.ConfigPatch`, `*omni.ClusterMachineConfig`, `*omni.RedactedClusterMachineConfig` and `*omni.ClusterMachineConfigPatches`.

Fixes #853

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Jan 27, 2025
1 parent fd888ab commit e2e5f08
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 44 deletions.
4 changes: 4 additions & 0 deletions internal/backend/runtime/omni/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func NewManager(state state.State, logger *zap.Logger) *Manager {
callback: removeMaintenanceConfigPatchFinalizers,
name: "removeMaintenanceConfigPatchFinalizers",
},
{
callback: compressMachineConfigsAndPatches,
name: "compressMachineConfigsAndPatches",
},
},
}
}
Expand Down
239 changes: 195 additions & 44 deletions internal/backend/runtime/omni/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package migration_test

import (
"context"
"encoding/hex"
"errors"
"fmt"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/controller/generic"
"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/protobuf"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/cosi-project/runtime/pkg/resource/typed"
"github.com/cosi-project/runtime/pkg/safe"
Expand All @@ -23,9 +26,11 @@ import (
"github.com/google/uuid"
"github.com/siderolabs/gen/pair"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/gen/xtesting/must"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -86,11 +91,7 @@ func (suite *MigrationSuite) TestClusterInfo() {

cluster, machine := suite.createCluster(ctx, "c1")

suite.Require().NoError(suite.manager.Run(ctx), migration.WithFilter(
func(name string) bool {
return name == "clusterInfo"
},
))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("clusterInfo", "clusterInstallImageToTalosVersion"))))

var err error

Expand All @@ -116,11 +117,7 @@ func (suite *MigrationSuite) TestClusterInfo() {

// This shouldn't happen: create old version of the cluster again and see it not being updated
// as the DB version is already up-to-date.
suite.Require().NoError(suite.manager.Run(ctx), migration.WithFilter(
func(name string) bool {
return name == "clusterInfo"
},
))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("clusterInfo", "clusterInstallImageToTalosVersion"))))

cluster, err = safe.StateGet[*omni.Cluster](ctx, suite.state, cluster.Metadata())

Expand Down Expand Up @@ -212,9 +209,7 @@ func (suite *MigrationSuite) Test_changePublicKeyOwner() {
}

// test migration in isolation
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "changePublicKeyOwner"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("changePublicKeyOwner"))))

keyVerifier := func(key *authres.PublicKey, expectedVersion int) {
result, err := safe.StateGet[*authres.PublicKey](ctx, suite.state, key.Metadata())
Expand Down Expand Up @@ -439,9 +434,7 @@ func (suite *MigrationSuite) TestUserDefaultScopes() {
suite.Require().NoError(suite.state.Create(ctx, user3))

// test migration in isolation
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "addDefaultScopesToUsers"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("addDefaultScopesToUsers"))))

user1, err = safe.StateGet[*authres.User](ctx, suite.state, user1.Metadata())
suite.Require().NoError(err)
Expand Down Expand Up @@ -743,9 +736,7 @@ func (suite *MigrationSuite) TestAddServiceAccountScopesToUsers() {
suite.Require().NoError(suite.state.Create(ctx, publicKey2))

// test migration in isolation
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "addServiceAccountScopesToUsers"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("addServiceAccountScopesToUsers"))))

user1, err := safe.StateGet[*authres.User](ctx, suite.state, user1.Metadata())
suite.Require().NoError(err)
Expand Down Expand Up @@ -788,9 +779,7 @@ func (suite *MigrationSuite) TestMigrateLabels() {
machineLabels.Metadata().Labels().Set("user-label", "value")
suite.Require().NoError(suite.state.Create(ctx, machineLabels))

suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "migrateLabels" || name == "dropOldLabels"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("migrateLabels", "dropOldLabels"))))

updatedMachineStatus, err := suite.state.Get(ctx, machineStatus.Metadata())
suite.Require().NoError(err)
Expand Down Expand Up @@ -860,9 +849,7 @@ func (suite *MigrationSuite) TestConvertScopesToRoles() {
suite.Require().NoError(suite.state.Create(ctx, pubKeyWithUserMgmtScopes))
suite.Require().NoError(suite.state.Create(ctx, pubKeyWithServiceAccountScopes))

suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "convertScopesToRoles"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("convertScopesToRoles"))))

updatedUserWithNoScopes, err := safe.StateGet[*authres.User](ctx, suite.state, userWithNoScopes.Metadata())
suite.Require().NoError(err)
Expand Down Expand Up @@ -1274,9 +1261,7 @@ func (suite *MigrationSuite) TestClearEmptyConfigPatches() {
suite.Require().NoError(suite.state.Create(ctx, cp1, state.WithCreateOwner("MachineSetStatusController")))
suite.Require().NoError(suite.state.Create(ctx, cp2, state.WithCreateOwner("MachineSetStatusController")))

suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "clearEmptyConfigPatches"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("clearEmptyConfigPatches"))))

cp1After, err := safe.StateGetByID[*omni.ClusterMachineConfigPatches](ctx, suite.state, "1")
suite.Require().NoError(err)
Expand Down Expand Up @@ -1398,9 +1383,7 @@ func (suite *MigrationSuite) TestSetMachineStatusSnapshotOwner() {
}

// test migration in isolation
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "setMachineStatusSnapshotOwner"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("setMachineStatusSnapshotOwner"))))

check := func(item *omni.MachineStatusSnapshot, expectedVersion int) {
result, err := safe.StateGet[*omni.MachineStatusSnapshot](ctx, suite.state, item.Metadata())
Expand Down Expand Up @@ -1469,9 +1452,7 @@ func (suite *MigrationSuite) TestMigrateInstallImageConfigIntoGenOptions() {
suite.Require().NoError(suite.state.Create(ctx, res, state.WithCreateOwner(res.Metadata().Owner())))
}

suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "migrateInstallImageConfigIntoGenOptions"
})))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("migrateInstallImageConfigIntoGenOptions"))))

genOptions, err := safe.StateGet[*omni.MachineConfigGenOptions](ctx, suite.state, omni.NewMachineConfigGenOptions(resources.DefaultNamespace, "test").Metadata())
suite.Require().NoError(err)
Expand Down Expand Up @@ -1588,11 +1569,7 @@ func (suite *MigrationSuite) testDeleteDeprecatedResources(createRes func(id str

suite.Require().NoError(err)

suite.Require().NoError(suite.manager.Run(ctx), migration.WithFilter(
func(name string) bool {
return name == migrationFilter
},
))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith(migrationFilter))))

list, err := suite.state.List(ctx, resource.NewMetadata(res.Metadata().Namespace(), res.Metadata().Type(), "", resource.VersionUndefined))

Expand Down Expand Up @@ -1639,11 +1616,7 @@ func (suite *MigrationSuite) TestRemoveMaintenanceConfigPatchFinalizers() {
suite.Require().NoError(suite.state.Create(ctx, m2Status, state.WithCreateOwner("MachineStatusController")))
suite.Require().NoError(suite.state.Create(ctx, m3Status, state.WithCreateOwner("MachineStatusController")))

suite.Require().NoError(suite.manager.Run(ctx), migration.WithFilter(
func(name string) bool {
return name == "removeMaintenanceConfigPatchFinalizers"
},
))
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(filterWith("removeMaintenanceConfigPatchFinalizers"))))

rtestutils.AssertAll(ctx, suite.T(), suite.state, func(
res *omni.MachineStatus, assert *assert.Assertions,
Expand All @@ -1652,8 +1625,186 @@ func (suite *MigrationSuite) TestRemoveMaintenanceConfigPatchFinalizers() {
})
}

func (suite *MigrationSuite) TestCompressUncompressMigrations() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const (
data1 = "Hello world!"
data2 = "Hello Sidero!"
)

encoded1 := must.Value(hex.DecodeString("28b52ffd040061000048656c6c6f20776f726c6421b27dfd7f"))(suite.T())
encoded2 := must.Value(hex.DecodeString("28b52ffd040069000048656c6c6f2053696465726f211ce853bc"))(suite.T())
disabled := specs.WithConfigCompressionOption(specs.CompressionConfig{Enabled: false})

const ns = resources.DefaultNamespace

checkMigrations := []func(t *testing.T){
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewConfigPatch(ns, "config-patch-1"),
fillData[*specs.ConfigPatchSpec](data1, disabled),
checkCompressed[string, *specs.ConfigPatchSpec](encoded1),
),
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewClusterMachineConfig(ns, "machine-config-1"),
fillData[*specs.ClusterMachineConfigSpec](data1, disabled),
checkCompressed[[]byte, *specs.ClusterMachineConfigSpec](encoded1),
),
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewRedactedClusterMachineConfig(ns, "redacted-machine-config-1"),
fillData[*specs.RedactedClusterMachineConfigSpec](data1, disabled),
checkCompressed[string, *specs.RedactedClusterMachineConfigSpec](encoded1),
),
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewConfigPatch(ns, "config-patch-2"),
fillData[*specs.ConfigPatchSpec](data2, disabled),
checkCompressed[string, *specs.ConfigPatchSpec](encoded2),
),
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewClusterMachineConfig(ns, "machine-config-2"),
fillData[*specs.ClusterMachineConfigSpec](data2, disabled),
checkCompressed[[]byte, *specs.ClusterMachineConfigSpec](encoded2),
),
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewClusterMachineConfigPatches(ns, "cluster-machine-config-patches-1"),
func(t *testing.T, spec *omni.ClusterMachineConfigPatchesSpec) {
require.NoError(t, spec.Value.SetUncompressedPatches([]string{data1, data2}, disabled))
},
func(t *assert.Assertions, spec *omni.ClusterMachineConfigPatchesSpec) {
uncompressed := spec.Value.GetPatches()
t.Empty(uncompressed)

patches := spec.Value.GetCompressedPatches()
t.NotEmpty(patches)

for i, data := range [][]byte{encoded1, encoded2} {
t.Equalf(data, patches[i], "%x != %x", data, patches[i])
}
},
),
startMigration(
ctx,
suite.T(),
suite.state,
omni.NewClusterMachineConfigPatches(ns, "cluster-machine-config-patches-2"),
func(t *testing.T, spec *omni.ClusterMachineConfigPatchesSpec) {
require.NoError(t, spec.Value.SetUncompressedPatches([]string{data2, data1}, disabled))
},
func(t *assert.Assertions, spec *omni.ClusterMachineConfigPatchesSpec) {
uncompressed := spec.Value.GetPatches()
t.Empty(uncompressed)

patches := spec.Value.GetCompressedPatches()
t.NotEmpty(patches)

for i, data := range [][]byte{encoded2, encoded1} {
t.Equalf(data, patches[i], "%x != %x", data, patches[i])
}
},
),
}

require.NoError(suite.T(), suite.manager.Run(ctx, migration.WithFilter(filterWith("compressMachineConfigsAndPatches"))))

for _, check := range checkMigrations {
check(suite.T())
}
}

func startMigration[
R interface {
generic.ResourceWithRD
TypedSpec() *protobuf.ResourceSpec[T, S]
},
T any,
S protobuf.Spec[T],
](
ctx context.Context,
t *testing.T,
st state.State,
res R,
fill func(t *testing.T, spec *protobuf.ResourceSpec[T, S]),
check func(t *assert.Assertions, spec *protobuf.ResourceSpec[T, S]),
) func(t *testing.T) {
fill(t, res.TypedSpec())

require.NoError(t, st.Create(ctx, res))

return func(t *testing.T) {
rtestutils.AssertResource(
ctx,
t,
st,
res.Metadata().ID(),
func(found R, assertion *assert.Assertions) { check(assertion, found.TypedSpec()) },
)
}
}

func fillData[
S interface {
SetUncompressedData(data []byte, opts ...specs.CompressionOption) error
protobuf.Spec[T]
},
T any,
](data string, opts ...specs.CompressionOption) func(t *testing.T, spec *protobuf.ResourceSpec[T, S]) {
return func(t *testing.T, spec *protobuf.ResourceSpec[T, S]) {
require.NoError(t, spec.Value.SetUncompressedData([]byte(data), opts...))
}
}

func checkCompressed[
D string | []byte,
S interface {
GetData() D
GetCompressedData() []byte
protobuf.Spec[T]
},
T any,
](data []byte) func(t *assert.Assertions, spec *protobuf.ResourceSpec[T, S]) {
return func(t *assert.Assertions, spec *protobuf.ResourceSpec[T, S]) {
uncompressed := spec.Value.GetData()
t.Empty(uncompressed)

result := spec.Value.GetCompressedData()
t.NotEmpty(result)
t.Equalf(data, result, "%x != %x", data, result)
}
}

func TestMigrationSuite(t *testing.T) {
t.Parallel()

suite.Run(t, new(MigrationSuite))
}

func filterWith(vals ...string) func(string) bool {
return func(cur string) bool {
for _, val := range vals {
if cur == val {
return true
}
}

return false
}
}
Loading

0 comments on commit e2e5f08

Please sign in to comment.