diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go index 424d059f2..2ca71e742 100644 --- a/pkg/contracts/contracts.go +++ b/pkg/contracts/contracts.go @@ -27,3 +27,10 @@ func SafeInit(v any) { initer.Init() } } + +// Equaler is implemented by any entity that can be compared with another entity of the same type. +// The Equal method should return true if the receiver is equal to the other entity. +type Equaler interface { + // Equal returns whether the receiver is equal to the other entity. + Equal(other any) bool +} diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index e370fd03a..507d5a008 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -53,7 +53,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan database desired := EntitiesById{} // only read from desiredCh (so far) var update EntitiesById - if delta.Subject.WithChecksum() { + if _, ok := delta.Subject.Entity().(contracts.Equaler); ok || delta.Subject.WithChecksum() { update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums } @@ -120,5 +120,9 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan database // checksumsMatch returns whether the checksums of two entities are the same. // Both entities must implement contracts.Checksumer. func checksumsMatch(a, b database.Entity) bool { - return cmp.Equal(a.(contracts.Checksumer).Checksum(), b.(contracts.Checksumer).Checksum()) + if _, ok := a.(contracts.Checksumer); ok { + return cmp.Equal(a.(contracts.Checksumer).Checksum(), b.(contracts.Checksumer).Checksum()) + } + + return a.(contracts.Equaler).Equal(b) } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 6b39ee64f..b353c7848 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -148,9 +148,18 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) - entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) - // Let errors from SetChecksums cancel our group. - com.ErrgroupReceive(g, errs) + + var entities <-chan database.Entity + // Apply the checksums only if the sync subject supports it, i.e, it implements contracts.Checksumer. + // This is necessary because not only entities that implement contracts.Checksumer can be updated, but + // also entities that implement contracts.Equaler interface. + if delta.Subject.WithChecksum() { + entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + } else { + entities = entitiesWithoutChecksum + } g.Go(func() error { // Using upsert here on purpose as this is the fastest way to do bulk updates. diff --git a/pkg/icingadb/v1/dependency.go b/pkg/icingadb/v1/dependency.go index 523166867..9a0d8f9d4 100644 --- a/pkg/icingadb/v1/dependency.go +++ b/pkg/icingadb/v1/dependency.go @@ -1,6 +1,8 @@ package v1 import ( + "bytes" + "github.com/google/go-cmp/cmp" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" ) @@ -30,6 +32,18 @@ func (r RedundancygroupState) TableName() string { return "redundancy_group_state" } +// Equal implements the [contracts.Equaler] interface. +func (r RedundancygroupState) Equal(other any) bool { + if other, ok := other.(*RedundancygroupState); ok { + return bytes.Equal(r.RedundancyGroupId, other.RedundancyGroupId) && + cmp.Equal(r.Failed, other.Failed) && + cmp.Equal(r.IsReachable, other.IsReachable) && + r.LastStateChange.Time().Equal(other.LastStateChange.Time()) + } + + return false +} + type DependencyNode struct { EntityWithoutChecksum `json:",inline"` EnvironmentMeta `json:",inline"` @@ -44,6 +58,15 @@ type DependencyEdgeState struct { Failed types.Bool `json:"failed"` } +// Equal implements the [contracts.Equaler] interface. +func (es DependencyEdgeState) Equal(other any) bool { + if other, ok := other.(*DependencyEdgeState); ok { + return bytes.Equal(es.Id, other.Id) && cmp.Equal(es.Failed, other.Failed) + } + + return false +} + type DependencyEdge struct { EntityWithoutChecksum `json:",inline"` EnvironmentMeta `json:",inline"`