From 51df7958ade86acbd1f62e416a67e92a0bf09ae2 Mon Sep 17 00:00:00 2001 From: Matthias Geihs Date: Thu, 23 Sep 2021 14:04:20 +0200 Subject: [PATCH 1/6] :fire: [channel,backend/sim/channel] Remove set backend test There are several problems with `func TestSetBackend`: - It looks like a test function but it is contained within a non-test package. - The test does not give any valuable information: We use it in the sim backend tests. However, the backend tests would fail anyways if the backend is not set correctly. Signed-off-by: Matthias Geihs --- backend/sim/channel/init_test.go | 7 ------- channel/backendtest.go | 29 ----------------------------- 2 files changed, 36 deletions(-) delete mode 100644 channel/backendtest.go diff --git a/backend/sim/channel/init_test.go b/backend/sim/channel/init_test.go index f49edb17..98ef4c98 100644 --- a/backend/sim/channel/init_test.go +++ b/backend/sim/channel/init_test.go @@ -15,12 +15,5 @@ package channel import ( - "testing" - _ "perun.network/go-perun/backend/sim/wallet" // backend init - "perun.network/go-perun/channel" ) - -func TestSetBackend(t *testing.T) { - channel.SetBackendTest(t) -} diff --git a/channel/backendtest.go b/channel/backendtest.go deleted file mode 100644 index 08619848..00000000 --- a/channel/backendtest.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2019 - See NOTICE file for copyright holders. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package channel - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// SetBackendTest is a generic backend test. -func SetBackendTest(t *testing.T) { - assert.Panics(t, func() { SetBackend(nil) }, "nil backend set should panic") - require.NotNil(t, backend, "backend should be already set by init()") - assert.Panics(t, func() { SetBackend(backend) }, "setting a backend twice should panic") -} From 4a39c8d81d1c2a674091fd39034463b9deb6552a Mon Sep 17 00:00:00 2001 From: Matthias Geihs Date: Wed, 22 Sep 2021 22:25:28 +0200 Subject: [PATCH 2/6] :bug: [backend/eth/channel] Fix state app decoding Function `FromEthState` did not set the App correctly on a decoded state. Signed-off-by: Matthias Geihs --- backend/ethereum/channel/backend.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/ethereum/channel/backend.go b/backend/ethereum/channel/backend.go index 20a72fca..427878f5 100644 --- a/backend/ethereum/channel/backend.go +++ b/backend/ethereum/channel/backend.go @@ -263,6 +263,7 @@ func FromEthState(app channel.App, s *adjudicator.ChannelState) channel.State { ID: s.ChannelID, Version: s.Version, Allocation: alloc, + App: app, Data: data, IsFinal: s.IsFinal, } From d4e62068212d29cf6139b948b52aab0fdb5193f0 Mon Sep 17 00:00:00 2001 From: Matthias Geihs Date: Tue, 21 Sep 2021 16:51:11 +0200 Subject: [PATCH 3/6] :white_check_mark: [channel/test] Add DecodeAsset test Signed-off-by: Matthias Geihs --- channel/test/backend.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/channel/test/backend.go b/channel/test/backend.go index 1bdf6c38..a8f1205a 100644 --- a/channel/test/backend.go +++ b/channel/test/backend.go @@ -15,6 +15,7 @@ package test import ( + "bytes" "math/big" "testing" @@ -66,6 +67,10 @@ func GenericBackendTest(t *testing.T, s *Setup) { t.Run("Verify", func(t *testing.T) { genericVerifyTest(t, s) }) + + t.Run("DecodeAsset", func(t *testing.T) { + genericDecodeAssetTest(t, s) + }) } func genericChannelIDTest(t *testing.T, s *Setup) { @@ -339,3 +344,18 @@ func GenericStateEqualTest(t *testing.T, s1, s2 *channel.State) { assert.Error(t, differentState.Equal(s1)) } } + +// genericDecodeAssetTest tests the DecodeAsset function of the initialized backend. +func genericDecodeAssetTest(t *testing.T, s *Setup) { + assets := s.State.Assets + _assets := make([]channel.Asset, len(assets)) + for i, a := range assets { + var buf bytes.Buffer + err := a.Encode(&buf) + assert.NoError(t, err, "encoding") + _assets[i], err = channel.DecodeAsset(&buf) + assert.NoError(t, err, "decoding") + } + err := channel.AssetsAssertEqual(assets, _assets) + assert.NoError(t, err, "equality") +} From bf134c26a55fcc4cb9fb4bf7e7546023f5f06764 Mon Sep 17 00:00:00 2001 From: Matthias Geihs Date: Tue, 21 Sep 2021 19:09:14 +0200 Subject: [PATCH 4/6] :white_check_mark: [channel/test,backend/eth/channel] Generic funder test Signed-off-by: Matthias Geihs --- backend/ethereum/channel/funder_test.go | 62 +++++++++++++++++++++++++ channel/test/funder.go | 55 ++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 channel/test/funder.go diff --git a/backend/ethereum/channel/funder_test.go b/backend/ethereum/channel/funder_test.go index 96a95621..4e45af4a 100644 --- a/backend/ethereum/channel/funder_test.go +++ b/backend/ethereum/channel/funder_test.go @@ -338,6 +338,68 @@ func TestFunder_Fund_multi(t *testing.T) { t.Run("10-party funding", func(t *testing.T) { testFunderFunding(t, 10) }) } +func TestFunder(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + rng := pkgtest.Prng(t) + + numParts := 2 + rng.Intn(maxNumParts-2) + s := test.NewSetup(t, rng, numParts, blockInterval) + + funders := make([]channel.Funder, numParts) + for i := range funders { + funders[i] = s.Funders[i] + } + + _f := &funderSetup{ + setup: s, + funders: funders, + } + channeltest.TestFunder(ctx, t, rng, _f) +} + +// funderSetup represents a funderSetup test setup. +type funderSetup struct { + setup *test.Setup + funders []channel.Funder +} + +const maxNumParts = 8 + +func (f *funderSetup) Funders() []channel.Funder { + return f.funders +} + +func (f *funderSetup) NewFundingRequests(ctx context.Context, t *testing.T, rng *rand.Rand) []channel.FundingReq { + params, state := f.newRandomParamsAndState(rng) + + funders := f.funders + numParts := len(funders) + requests := make([]channel.FundingReq, numParts) + for i := range funders { + requests[i] = *channel.NewFundingReq(params, state, channel.Index(i), state.Balances) + } + return requests +} + +func (f *funderSetup) newRandomParamsAndState(rng *rand.Rand, opts ...channeltest.RandomOpt) (*channel.Params, *channel.State) { + _opts := f.defaultOpts() + _opts = append(_opts, opts...) + return channeltest.NewRandomParamsAndState(rng, _opts...) +} + +func (f *funderSetup) defaultOpts() []channeltest.RandomOpt { + return []channeltest.RandomOpt{ + channeltest.WithChallengeDuration(100), + channeltest.WithParts(f.setup.Parts...), + channeltest.WithAssets((*ethchannel.Asset)(&f.setup.Asset)), + channeltest.WithLedgerChannel(true), + channeltest.WithVirtualChannel(false), + channeltest.WithNumLocked(0), + } +} + func testFunderFunding(t *testing.T, n int) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), defaultTxTimeout*time.Duration(n)) diff --git a/channel/test/funder.go b/channel/test/funder.go new file mode 100644 index 00000000..2fc56e0a --- /dev/null +++ b/channel/test/funder.go @@ -0,0 +1,55 @@ +// Copyright 2021 - See NOTICE file for copyright holders. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + "perun.network/go-perun/channel" +) + +// Funder represents a funder setup used for testing. +// +// Funders should return a set of funders used for testing. +// NewFundingRequests should return a new set of funding requests for the given funders. +type Funder interface { + Funders() []channel.Funder + NewFundingRequests(context.Context, *testing.T, *rand.Rand) []channel.FundingReq +} + +// TestFunder runs a set of generic funder tests. +func TestFunder(ctx context.Context, t *testing.T, rng *rand.Rand, f Funder) { + funders := f.Funders() + requests := f.NewFundingRequests(ctx, t, rng) + + errs := make(chan error, len(funders)) + for i := range funders { + go func(funder channel.Funder, req channel.FundingReq) { + errs <- funder.Fund(ctx, req) + }(funders[i], requests[i]) + } + + for range funders { + select { + case err := <-errs: + require.NoError(t, err, "funding should work") + case <-ctx.Done(): + t.Error(ctx.Err()) + } + } +} From f71718595fb63cbaafd669457bdcbe07a17003fc Mon Sep 17 00:00:00 2001 From: Matthias Geihs Date: Wed, 22 Sep 2021 13:46:36 +0200 Subject: [PATCH 5/6] :white_check_mark: [channel/test,backend/eth/channel] Generic adjudicator test Signed-off-by: Matthias Geihs --- backend/ethereum/channel/adjudicator_test.go | 123 +++++++++++++++++++ channel/test/adjudicator.go | 74 +++++++++++ 2 files changed, 197 insertions(+) create mode 100644 channel/test/adjudicator.go diff --git a/backend/ethereum/channel/adjudicator_test.go b/backend/ethereum/channel/adjudicator_test.go index 58e89637..2fc6aaad 100644 --- a/backend/ethereum/channel/adjudicator_test.go +++ b/backend/ethereum/channel/adjudicator_test.go @@ -16,6 +16,7 @@ package channel_test import ( "context" + "math/rand" "testing" "time" @@ -33,6 +34,128 @@ import ( pkgtest "perun.network/go-perun/pkg/test" ) +// TestAdjudicator tests the adjudicator. +func TestAdjudicator(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + rng := pkgtest.Prng(t) + numParts := 2 + rng.Intn(maxNumParts-2) + s := test.NewSetup(t, rng, numParts, blockInterval) + a := newAdjudicatorSetup(s) + channeltest.TestAdjudicator(ctx, t, a) +} + +type adjudicatorSetup struct { + setup *test.Setup +} + +func newAdjudicatorSetup(setup *test.Setup) *adjudicatorSetup { + return &adjudicatorSetup{ + setup: setup, + } +} + +type testAdjudicator struct { + *test.SimAdjudicator +} + +// Progress is a no-op because app channels are not supported yet. +func (a *testAdjudicator) Progress(ctx context.Context, req channel.ProgressReq) error { + return nil +} + +func (a *adjudicatorSetup) Adjudicator() channel.Adjudicator { + return &testAdjudicator{a.setup.Adjs[0]} +} + +func (a *adjudicatorSetup) NewRegisterReq(ctx context.Context, rng *rand.Rand) (*channel.AdjudicatorReq, []channel.SignedState) { + req := a.newAdjudicatorReq(ctx, rng, channeltest.WithIsFinal(false)) + return req, nil +} + +func (a *adjudicatorSetup) newAdjudicatorReq(ctx context.Context, rng *rand.Rand, opts ...channeltest.RandomOpt) *channel.AdjudicatorReq { + params, state := a.newRandomParamsAndState(rng, opts...) + + // Fund the channel. + numParts := len(params.Parts) + errs := make(chan error, numParts) + for i, funder := range a.setup.Funders { + req := channel.NewFundingReq(params, state, channel.Index(i), state.Balances) + go func(funder channel.Funder, req channel.FundingReq) { + errs <- funder.Fund(ctx, req) + }(funder, *req) + } + for range a.setup.Funders { + select { + case err := <-errs: + if err != nil { + panic(err) + } + case <-ctx.Done(): + panic(ctx.Err()) + } + } + + tx, err := signState(a.setup.Accs, state) + if err != nil { + panic(err) + } + return &channel.AdjudicatorReq{ + Acc: a.setup.Accs[0], + Idx: 0, + Params: params, + Tx: tx, + } +} + +func (a *adjudicatorSetup) newRandomParamsAndState(rng *rand.Rand, opts ...channeltest.RandomOpt) (*channel.Params, *channel.State) { + _opts := a.defaultOpts() + _opts = append(_opts, opts...) + return channeltest.NewRandomParamsAndState(rng, _opts...) +} + +func (a *adjudicatorSetup) defaultOpts() []channeltest.RandomOpt { + return []channeltest.RandomOpt{ + channeltest.WithChallengeDuration(100), + channeltest.WithParts(a.setup.Parts...), + channeltest.WithAssets((*ethchannel.Asset)(&a.setup.Asset)), + channeltest.WithLedgerChannel(true), + channeltest.WithVirtualChannel(false), + channeltest.WithNumLocked(0), + } +} + +func (a *adjudicatorSetup) NewProgressReq(context.Context, *rand.Rand) *channel.ProgressReq { + return &channel.ProgressReq{} // Progression test is not implemented. +} + +func (a *adjudicatorSetup) NewWithdrawReq(ctx context.Context, rng *rand.Rand) (*channel.AdjudicatorReq, channel.StateMap) { + adj := a.Adjudicator() + req := a.newAdjudicatorReq(ctx, rng, channeltest.WithoutApp()) + subChannels := []channel.SignedState{} + + if !req.Tx.IsFinal { + // Register. + err := adj.Register(ctx, *req, subChannels) + if err != nil { + panic(err) + } + + // Wait until concludable. + sub, err := adj.Subscribe(ctx, req.Params.ID()) + if err != nil { + panic(err) + } + err = sub.Next().Timeout().Wait(ctx) + if err != nil { + panic(err) + } + } + + return req, channeltest.MakeStateMapFromSignedStates(subChannels...) +} + const defaultTxTimeout = 2 * time.Second func testSignState(t *testing.T, accounts []*keystore.Account, state *channel.State) channel.Transaction { diff --git a/channel/test/adjudicator.go b/channel/test/adjudicator.go new file mode 100644 index 00000000..4ab69876 --- /dev/null +++ b/channel/test/adjudicator.go @@ -0,0 +1,74 @@ +// Copyright 2021 - See NOTICE file for copyright holders. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + "perun.network/go-perun/channel" + pkgtest "perun.network/go-perun/pkg/test" +) + +// Adjudicator represents an adjudicator test setup. +// +// Adjudicator should return the adjudicator instance used for testing. +// NewRegisterReq should return the inputs to a successful register call. +// NewProgressReq should return the inputs to a successful progress call. +// NewWithdrawReq should return the inputs to a successful withdraw call. +type Adjudicator interface { + Adjudicator() channel.Adjudicator + NewRegisterReq(context.Context, *rand.Rand) (*channel.AdjudicatorReq, []channel.SignedState) + NewProgressReq(context.Context, *rand.Rand) *channel.ProgressReq + NewWithdrawReq(context.Context, *rand.Rand) (*channel.AdjudicatorReq, channel.StateMap) +} + +// TestAdjudicator tests the given adjudicator. +func TestAdjudicator(ctx context.Context, t *testing.T, a Adjudicator) { + rng := pkgtest.Prng(t) + + adj := a.Adjudicator() + + t.Run("register and subscribe", func(t *testing.T) { + req, subChannels := a.NewRegisterReq(ctx, rng) + err := adj.Register(ctx, *req, subChannels) + require.NoError(t, err, "registering") + + _, err = adj.Subscribe(ctx, req.Params.ID()) + require.NoError(t, err, "subscribing") + }) + + t.Run("progress", func(t *testing.T) { + req := a.NewProgressReq(ctx, rng) + err := adj.Progress(ctx, *req) + require.NoError(t, err, "progressing") + }) + + t.Run("withdraw", func(t *testing.T) { + req, subChannels := a.NewWithdrawReq(ctx, rng) + err := adj.Withdraw(ctx, *req, subChannels) + require.NoError(t, err, "withdrawing") + }) +} + +func MakeStateMapFromSignedStates(channels ...channel.SignedState) channel.StateMap { + m := channel.MakeStateMap() + for _, c := range channels { + m.Add(c.State) + } + return m +} From 6de987a4de5daec331d79a59e30c34a05e6e80f6 Mon Sep 17 00:00:00 2001 From: Matthias Geihs Date: Wed, 22 Sep 2021 22:25:45 +0200 Subject: [PATCH 6/6] :white_check_mark: [channel/test,backend/eth/channel] Generic subscription test Signed-off-by: Matthias Geihs --- backend/ethereum/channel/subscription_test.go | 113 ++++++++++++++++++ channel/test/subscription.go | 76 ++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 backend/ethereum/channel/subscription_test.go create mode 100644 channel/test/subscription.go diff --git a/backend/ethereum/channel/subscription_test.go b/backend/ethereum/channel/subscription_test.go new file mode 100644 index 00000000..41d54868 --- /dev/null +++ b/backend/ethereum/channel/subscription_test.go @@ -0,0 +1,113 @@ +// Copyright 2021 - See NOTICE file for copyright holders. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package channel_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "perun.network/go-perun/backend/ethereum/channel/test" + "perun.network/go-perun/channel" + channeltest "perun.network/go-perun/channel/test" + pkgtest "perun.network/go-perun/pkg/test" +) + +func TestSubscription(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + rng := pkgtest.Prng(t) + numParts := 2 + rng.Intn(maxNumParts-2) + setup := test.NewSetup(t, rng, numParts, blockInterval) + adjSetup := newAdjudicatorSetup(setup) + adj := adjSetup.Adjudicator() + + opts := []channeltest.RandomOpt{channeltest.WithoutApp(), channeltest.WithIsFinal(false)} + req, subChannels := adjSetup.newAdjudicatorReq(ctx, rng, opts...), []channel.SignedState{} + sub, err := adj.Subscribe(ctx, req.Params.ID()) + require.NoError(t, err, "subscribing") + + subSetup := newSubscriptionSetup(sub, adj, req, subChannels) + channeltest.TestSubscription(ctx, t, subSetup) +} + +type subscriptionSetup struct { + adj channel.Adjudicator + sub *testSubscription + req *channel.AdjudicatorReq + subChannels []channel.SignedState +} + +func newSubscriptionSetup( + sub channel.AdjudicatorSubscription, + adj channel.Adjudicator, + req *channel.AdjudicatorReq, + subChannels []channel.SignedState, +) *subscriptionSetup { + return &subscriptionSetup{ + sub: &testSubscription{ + emitProgressed: false, + req: req, + AdjudicatorSubscription: sub, + }, + adj: adj, + req: req, + subChannels: subChannels, + } +} + +type testSubscription struct { + channel.AdjudicatorSubscription + emitProgressed bool + req *channel.AdjudicatorReq +} + +// Next emulates app channel functionality because app channels are not supported yet. +func (s *testSubscription) Next() channel.AdjudicatorEvent { + if s.emitProgressed { + s.emitProgressed = false + return channel.NewProgressedEvent(s.req.Tx.ID, &channel.ElapsedTimeout{}, s.req.Tx.State, s.req.Idx) + } + return s.AdjudicatorSubscription.Next() +} + +func (s *subscriptionSetup) Subscription() channel.AdjudicatorSubscription { + return s.sub +} + +// EmitRegistered operates the adjudicator so that a registered event should be emitted. +func (s *subscriptionSetup) EmitRegistered(ctx context.Context) (channel.Params, channel.State) { + err := s.adj.Register(ctx, *s.req, s.subChannels) + if err != nil { + panic(err) + } + return *s.req.Params, *s.req.Tx.State +} + +// EmitProgressed emulates app channel functionality because app channels are not supported yet. +func (s *subscriptionSetup) EmitProgressed(ctx context.Context) (channel.Params, channel.State) { + s.sub.emitProgressed = true + return *s.req.Params, *s.req.Tx.State +} + +// EmitRegistered operates the adjudicator so that a concluded event should be emitted. +func (s *subscriptionSetup) EmitConcluded(ctx context.Context) channel.Params { + err := s.adj.Withdraw(ctx, *s.req, channeltest.MakeStateMapFromSignedStates(s.subChannels...)) + if err != nil { + panic(err) + } + return *s.req.Params +} diff --git a/channel/test/subscription.go b/channel/test/subscription.go new file mode 100644 index 00000000..e1bcbed5 --- /dev/null +++ b/channel/test/subscription.go @@ -0,0 +1,76 @@ +// Copyright 2021 - See NOTICE file for copyright holders. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "perun.network/go-perun/channel" +) + +// SubscriptionTestSetup represents a setup for testing a subscription implementation. +// +// Subscription should return an instance of the subscription implementation. +// EmitRegistered should make the subscription emit a registered event. +// EmitProgressed should make the subscription emit a progressed event. +// EmitConcluded should make the subscription emit a concluded event. +type SubscriptionTestSetup interface { + Subscription() channel.AdjudicatorSubscription + EmitRegistered(context.Context) (channel.Params, channel.State) + EmitProgressed(context.Context) (channel.Params, channel.State) + EmitConcluded(context.Context) channel.Params +} + +func TestSubscription(ctx context.Context, t *testing.T, s SubscriptionTestSetup) { + sub := s.Subscription() + + { + params, state := s.EmitRegistered(ctx) + e, ok := sub.Next().(*channel.RegisteredEvent) + assert.True(t, ok, "registered") + assert.True(t, e.ID() == params.ID(), "equal ID") + assert.True(t, e.State.Equal(&state) == nil, "equal state") + err := e.Timeout().Wait(ctx) + assert.NoError(t, err, "registered: waiting") + } + + { + params, state := s.EmitProgressed(ctx) + e, ok := sub.Next().(*channel.ProgressedEvent) + assert.True(t, ok, "progressed") + assert.True(t, e.ID() == params.ID(), "equal ID") + assert.True(t, e.State.Equal(&state) == nil, "equal state") + err := e.Timeout().Wait(ctx) + assert.NoError(t, err, "progressed: waiting") + } + + { + params := s.EmitConcluded(ctx) + e, ok := sub.Next().(*channel.ConcludedEvent) + assert.True(t, ok, "concluded") + assert.True(t, e.ID() == params.ID(), "equal ID") + err := e.Timeout().Wait(ctx) + assert.NoError(t, err, "concluded: waiting") + } + + { + err := sub.Close() + assert.NoError(t, err, "close") + err = sub.Err() + assert.NoError(t, err, "err") + } +}