diff --git a/go.mod b/go.mod index 431e4bb..98614be 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/cosi-project/state-etcd go 1.22.2 require ( - github.com/cosi-project/runtime v0.4.1 + github.com/cosi-project/runtime v0.4.2 github.com/siderolabs/gen v0.4.8 github.com/stretchr/testify v1.9.0 go.etcd.io/etcd/api/v3 v3.5.13 go.etcd.io/etcd/client/v3 v3.5.13 go.etcd.io/etcd/server/v3 v3.5.13 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 google.golang.org/grpc v1.62.2 ) @@ -65,7 +66,6 @@ require ( go.opentelemetry.io/otel/trace v1.20.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.6.0 // indirect diff --git a/go.sum b/go.sum index c39d795..2262566 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cosi-project/runtime v0.4.1 h1:9lJWw5cl3Lz1qP32bl2vxAsJs6LM8KdUGLCc9t/EGqw= -github.com/cosi-project/runtime v0.4.1/go.mod h1:eXVAHf9QzzSVblLUtHHPFOZ7JBuz+GypHbao1vw+SdQ= +github.com/cosi-project/runtime v0.4.2 h1:kJkhorzDWierDDbXn1BDHS6iQ7ai9AdvQOnK5uG/g8g= +github.com/cosi-project/runtime v0.4.2/go.mod h1:eXVAHf9QzzSVblLUtHHPFOZ7JBuz+GypHbao1vw+SdQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/state/impl/etcd/watch_test.go b/pkg/state/impl/etcd/watch_test.go index b96719a..db08daf 100644 --- a/pkg/state/impl/etcd/watch_test.go +++ b/pkg/state/impl/etcd/watch_test.go @@ -145,3 +145,85 @@ func TestWatchSpuriousEvents(t *testing.T) { } }) } + +func TestWatchDeadCancel(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) }) + + withEtcd(t, func(s state.State) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + const N = 50 + + // the key to reproduce the bug is to create enough watchers (>25 seems to be enough), + // and cancel all of them at the same time + // after that trigger a couple of updates, and the next Watch will hang forever + deadCtx, deadCancel := context.WithCancel(ctx) + deadCh := make(chan state.Event) + + for range N { + require.NoError(t, s.Watch(deadCtx, conformance.NewPathResource("default", "path-0").Metadata(), deadCh)) + } + + t.Log("dead cancel!") + deadCancel() + + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-0"))) + require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-0").Metadata())) + + watchCh := make(chan state.Event) + + // when the bug is triggered, s.Watch will hang forever + // + // the bug seems to be related to etcd client `*watcher` object getting into a deadlock + // the bug doesn't reproduce if we use a real gRPC connection for the embedded etcd instead of "passthrough" mode + require.NoError(t, s.Watch(ctx, conformance.NewPathResource("default", "path-0").Metadata(), watchCh)) + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Destroyed, ev.Type, "ev: %v", ev.Resource) + } + + for range N { + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-0"))) + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Created, ev.Type) + } + + require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-0").Metadata())) + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Destroyed, ev.Type, "ev: %v", ev.Resource) + } + } + + for i := range N { + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-0"))) + + select { + case <-time.After(time.Second): + t.Fatalf("timeout waiting for event iteration %d", i) + case ev := <-watchCh: + assert.Equal(t, state.Created, ev.Type) + } + + require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-0").Metadata())) + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Destroyed, ev.Type, "ev: %v", ev.Resource) + } + } + }) +} diff --git a/pkg/util/testhelpers/testhelpers.go b/pkg/util/testhelpers/testhelpers.go index d4709ec..b891388 100644 --- a/pkg/util/testhelpers/testhelpers.go +++ b/pkg/util/testhelpers/testhelpers.go @@ -14,7 +14,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" - "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -28,7 +27,15 @@ func WithEtcd(t *testing.T, f func(*clientv3.Client)) { cfg.EnableGRPCGateway = false cfg.LogLevel = "info" - cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))) + cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder( + zaptest.NewLogger( + t, + zaptest.Level(zap.InfoLevel), + zaptest.WrapOptions( + zap.Fields(zap.String("component", "etcd-server")), + ), + ), + ) cfg.AuthToken = "" cfg.AutoCompactionMode = "periodic" cfg.AutoCompactionRetention = "5h" @@ -76,7 +83,20 @@ func WithEtcd(t *testing.T, f func(*clientv3.Client)) { } }() - cli := v3client.New(e.Server) + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{e.Clients[0].Addr().String()}, + DialTimeout: time.Second, + Logger: zaptest.NewLogger( + t, + zaptest.Level(zap.InfoLevel), + zaptest.WrapOptions( + zap.Fields(zap.String("component", "etcd-client")), + ), + ), + }) + if err != nil { + t.Fatalf("failed to create etcd client: %v", err) + } defer func() { err := cli.Close()