diff --git a/channel/mock_app.go b/channel/mock_app.go index 0bc60942..86990dfa 100644 --- a/channel/mock_app.go +++ b/channel/mock_app.go @@ -116,17 +116,29 @@ func (a MockApp) NewData() Data { // ValidTransition checks the transition for validity. func (a MockApp) ValidTransition(params *Params, from, to *State, actor Index) error { - return a.execMockOp(from.Data.(*MockOp)) + op, ok := from.Data.(*MockOp) + if !ok { + return fmt.Errorf("wrong data type: expected *MockOp, got %T", from.Data) + } + return a.execMockOp(op) } // ValidInit checks the initial state for validity. func (a MockApp) ValidInit(params *Params, state *State) error { - return a.execMockOp(state.Data.(*MockOp)) + op, ok := state.Data.(*MockOp) + if !ok { + return fmt.Errorf("wrong data type: expected *MockOp, got %T", state.Data) + } + return a.execMockOp(op) } // ValidAction checks the action for validity. func (a MockApp) ValidAction(params *Params, state *State, part Index, act Action) error { - return a.execMockOp(act.(*MockOp)) + op, ok := act.(*MockOp) + if !ok { + return fmt.Errorf("wrong data type: expected *MockOp, got %T", act) + } + return a.execMockOp(op) } // ApplyActions applies the actions unto a copy of state and returns the result or an error. @@ -134,12 +146,22 @@ func (a MockApp) ApplyActions(params *Params, state *State, acts []Action) (*Sta ret := state.Clone() ret.Version++ - return ret, a.execMockOp(acts[0].(*MockOp)) + op, ok := acts[0].(*MockOp) + if !ok { + return nil, fmt.Errorf("wrong data type: expected *MockOp, got %T", acts[0]) + } + + return ret, a.execMockOp(op) } // InitState Checks for the validity of the passed arguments as initial state. func (a MockApp) InitState(params *Params, rawActs []Action) (Allocation, Data, error) { - return Allocation{}, nil, a.execMockOp(rawActs[0].(*MockOp)) + op, ok := rawActs[0].(*MockOp) + if !ok { + return Allocation{}, nil, fmt.Errorf("wrong data type: expected *MockOp, got %T", rawActs[0]) + } + + return Allocation{}, nil, a.execMockOp(op) } // execMockOp executes the operation indicated by the MockOp from the MockOp. diff --git a/channel/persistence/keyvalue/restorer.go b/channel/persistence/keyvalue/restorer.go index 3f71aed4..1319fda8 100644 --- a/channel/persistence/keyvalue/restorer.go +++ b/channel/persistence/keyvalue/restorer.go @@ -160,7 +160,7 @@ func eatExpect(r io.Reader, tok string) error { return errors.WithMessage(err, "reading") } if string(buf) != tok { - return errors.Errorf("expected %s, got %s.", tok, string(buf)) + return errors.Errorf("expected %s, got %s", tok, string(buf)) } return nil } diff --git a/channel/persistence/test/channel.go b/channel/persistence/test/channel.go index bbf64301..9cd8857d 100644 --- a/channel/persistence/test/channel.go +++ b/channel/persistence/test/channel.go @@ -37,7 +37,7 @@ type Channel struct { *persistence.StateMachine pr persistence.PersistRestorer - ctx context.Context + ctx context.Context //nolint:containedctx // This is just done for testing. Could be revised. } // NewRandomChannel creates a random channel with the requested persister and diff --git a/channel/persistence/test/persistrestorertest.go b/channel/persistence/test/persistrestorertest.go index 1f3364b1..97c2f2ef 100644 --- a/channel/persistence/test/persistrestorertest.go +++ b/channel/persistence/test/persistrestorertest.go @@ -36,7 +36,7 @@ type Client struct { rng *rand.Rand pr persistence.PersistRestorer - ctx context.Context + ctx context.Context //nolint:containedctx // This is just done for testing. Could be revised. } // number of peers in a channel that are used for the tests. diff --git a/client/channelconn.go b/client/channelconn.go index d889e7bd..ea1e9d5e 100644 --- a/client/channelconn.go +++ b/client/channelconn.go @@ -60,8 +60,14 @@ func newChannelConn(id channel.ID, peers []wire.Address, idx channel.Index, sub }() isUpdateRes := func(e *wire.Envelope) bool { - ok := e.Msg.Type() == wire.ChannelUpdateAcc || e.Msg.Type() == wire.ChannelUpdateRej - return ok && e.Msg.(ChannelMsg).ID() == id + switch msg := e.Msg.(type) { + case *ChannelUpdateAccMsg: + return msg.ID() == id + case *ChannelUpdateRejMsg: + return msg.ID() == id + default: + return false + } } if err = sub.Subscribe(relay, isUpdateRes); err != nil { @@ -157,5 +163,9 @@ func (r *channelMsgRecv) Next(ctx context.Context) (channel.Index, ChannelMsg, e if idx == -1 { return 0, nil, errors.Errorf("channel connection received message from unexpected peer %v", env.Sender) } - return channel.Index(idx), env.Msg.(ChannelMsg), nil // predicate must guarantee that the conversion is safe + msg, ok := env.Msg.(ChannelMsg) + if !ok { + return 0, nil, errors.Errorf("unexpected message type: expected ChannelMsg, got %T", env.Msg) + } + return channel.Index(idx), msg, nil // predicate must guarantee that the conversion is safe } diff --git a/client/proposal.go b/client/proposal.go index 6310c608..8288a037 100644 --- a/client/proposal.go +++ b/client/proposal.go @@ -315,10 +315,14 @@ func (c *Client) proposeTwoPartyChannel( proposalID := proposal.Base().ProposalID isResponse := func(e *wire.Envelope) bool { - acc, isAcc := e.Msg.(ChannelProposalAccept) - return (isAcc && acc.Base().ProposalID == proposalID) || - (e.Msg.Type() == wire.ChannelProposalRej && - e.Msg.(*ChannelProposalRejMsg).ProposalID == proposalID) + switch msg := e.Msg.(type) { + case ChannelProposalAccept: + return msg.Base().ProposalID == proposalID + case ChannelProposalRejMsg: + return msg.ProposalID == proposalID + default: + return false + } } receiver := wire.NewReceiver() defer receiver.Close() @@ -606,9 +610,11 @@ func (c *Client) mpcppParts( ) (parts []wallet.Address) { switch p := prop.(type) { case *LedgerChannelProposalMsg: - parts = participants( - p.Participant, - acc.(*LedgerChannelProposalAccMsg).Participant) + ledgerAcc, ok := acc.(*LedgerChannelProposalAccMsg) + if !ok { + c.log.Panicf("unexpected message type: expected *LedgerChannelProposalAccMsg, got %T", acc) + } + parts = participants(p.Participant, ledgerAcc.Participant) case *SubChannelProposalMsg: ch, ok := c.channels.Channel(p.Parent) if !ok { @@ -616,10 +622,11 @@ func (c *Client) mpcppParts( } parts = ch.Params().Parts case *VirtualChannelProposalMsg: - parts = participants( - p.Proposer, - acc.(*VirtualChannelProposalAccMsg).Responder, - ) + virtualAcc, ok := acc.(*VirtualChannelProposalAccMsg) + if !ok { + c.log.Panicf("unexpected message type: expected *VirtualChannelProposalAccMsg, got %T", acc) + } + parts = participants(p.Proposer, virtualAcc.Responder) default: c.log.Panicf("unhandled %T", p) } @@ -697,8 +704,8 @@ func (c *Client) fundSubchannel(ctx context.Context, prop *SubChannelProposalMsg // enableVer0Cache enables caching of incoming version 0 signatures. func enableVer0Cache(c wire.Cacher) *wire.Predicate { p := func(m *wire.Envelope) bool { - return m.Msg.Type() == wire.ChannelUpdateAcc && - m.Msg.(*ChannelUpdateAccMsg).Version == 0 + msg, ok := m.Msg.(*ChannelUpdateAccMsg) + return ok && msg.Version == 0 } c.Cache(&p) return &p diff --git a/client/proposalopts.go b/client/proposalopts.go index 9e88612e..ab1fbd48 100644 --- a/client/proposalopts.go +++ b/client/proposalopts.go @@ -32,7 +32,11 @@ var optNames = struct{ nonce, app, appData, fundingAgreement string }{nonce: "no // App returns the option's configured app. func (o ProposalOpts) App() channel.App { if v := o[optNames.app]; v != nil { - return v.(channel.App) + app, ok := v.(channel.App) + if !ok { + log.Panicf("wrong type: expected channel.App, got %T", v) + } + return app } return channel.NoApp() } @@ -40,7 +44,11 @@ func (o ProposalOpts) App() channel.App { // AppData returns the option's configured app data. func (o ProposalOpts) AppData() channel.Data { if v := o[optNames.appData]; v != nil { - return v.(channel.Data) + data, ok := v.(channel.Data) + if !ok { + log.Panicf("wrong type: expected channel.Data, got %T", v) + } + return data } return channel.NoData() } @@ -63,7 +71,11 @@ func (o ProposalOpts) fundingAgreement() channel.Balances { if !ok { panic("Option FundingAgreement not set") } - return a.(channel.Balances) + bals, ok := a.(channel.Balances) + if !ok { + log.Panicf("wrong type: expected channel.Balances, got %T", a) + } + return bals } // nonce returns the option's configured nonce share, or a random nonce share. @@ -73,7 +85,11 @@ func (o ProposalOpts) nonce() NonceShare { n = WithRandomNonce().nonce() o[optNames.nonce] = n } - return n.(NonceShare) + share, ok := n.(NonceShare) + if !ok { + log.Panicf("wrong type: expected NonceShare, got %T", n) + } + return share } // isNonce returns whether a ProposalOpts contains a manually set nonce. diff --git a/client/sync.go b/client/sync.go index b1792647..efe60e81 100644 --- a/client/sync.go +++ b/client/sync.go @@ -73,7 +73,8 @@ func (c *Client) syncChannel(ctx context.Context, ch *persistence.Channel, p wir defer recv.Close() // ignore error id := ch.ID() err = c.conn.Subscribe(recv, func(m *wire.Envelope) bool { - return m.Msg.Type() == wire.ChannelSync && m.Msg.(ChannelMsg).ID() == id + msg, ok := m.Msg.(*ChannelSyncMsg) + return ok && msg.ID() == id }) if err != nil { return errors.WithMessage(err, "subscribing on relay") diff --git a/client/test/backend.go b/client/test/backend.go index 978e6e07..e76a392e 100644 --- a/client/test/backend.go +++ b/client/test/backend.go @@ -396,7 +396,7 @@ func (b *MockBackend) removeSubscription(ch channel.ID, sub *MockSubscription) { // MockSubscription is a subscription for MockBackend. type MockSubscription struct { - ctx context.Context + ctx context.Context //nolint:containedctx // This is just done for testing. Could be revised. events chan channel.AdjudicatorEvent err chan error onClose func() diff --git a/client/update.go b/client/update.go index 31e21a34..171986b8 100644 --- a/client/update.go +++ b/client/update.go @@ -411,8 +411,7 @@ func (c *Channel) OnUpdate(cb func(from, to *channel.State)) { // * Sub-allocations do not change. func (c *Channel) validTwoPartyUpdate(up ChannelUpdate, sigIdx channel.Index) error { if up.ActorIdx != sigIdx { - return errors.Errorf( - "Currently, only update proposals with the proposing peer as actor are allowed.") + return errors.New("invalid proposer") } if err := channel.SubAllocsAssertEqual(c.machine.State().Locked, up.State.Locked); err != nil { return errors.WithMessage(err, "sub-allocation changed") diff --git a/watcher/local/watcher_test.go b/watcher/local/watcher_test.go index 3b8419f0..fc6872b3 100644 --- a/watcher/local/watcher_test.go +++ b/watcher/local/watcher_test.go @@ -119,360 +119,360 @@ func Test_StartWatching(t *testing.T) { }) } -func Test_Watcher_Working(t *testing.T) { +func Test_Watcher_WithoutSubchannel(t *testing.T) { rng := test.Prng(t) - t.Run("ledger_channel_without_sub_channel", func(t *testing.T) { - // Send a registered event on the adjudicator subscription, with the latest state. - // Watcher should relay the event and not refute. - t.Run("happy/latest_state_registered", func(t *testing.T) { - // Setup - params, txs := randomTxsForSingleCh(rng, 3) - adjSub := &mocks.AdjudicatorSubscription{} - trigger := setExpectationNextCall(adjSub, makeRegisteredEvents(txs[2])...) - - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSub, nil) - w := newWatcher(t, rs) - - // Publish both the states to the watcher. - statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) - require.NoError(t, statesPub.Publish(context.Background(), txs[1])) - require.NoError(t, statesPub.Publish(context.Background(), txs[2])) - - // Trigger events and assert. - triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) - rs.AssertExpectations(t) - }) - - // Send a registered event on the adjudicator subscription, - // with a state newer than the latest state (published to the watcher), - // Watch should relay the event and not refute. - t.Run("happy/newer_than_latest_state_registered", func(t *testing.T) { - // Setup - params, txs := randomTxsForSingleCh(rng, 3) - adjSub := &mocks.AdjudicatorSubscription{} - trigger := setExpectationNextCall(adjSub, makeRegisteredEvents(txs[2])...) - - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSub, nil) - w := newWatcher(t, rs) - - // Publish only one of the two newly created off-chain states to the watcher. - statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) - require.NoError(t, statesPub.Publish(context.Background(), txs[1])) - - // Trigger adjudicator events with a state newer than the latest state (published to the watcher). - triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) - rs.AssertExpectations(t) - }) - - // First, send a registered event on the adjudicator subscription, - // with a state older than the latest state (published to the watcher), - // Watch should relay the event and refute by registering the latest state. - // - // Next, send a registered event on adjudicator subscription, - // with the state that was registered. - // This time, watcher should relay the event and not dispute. - t.Run("happy/older_state_registered", func(t *testing.T) { - // Setup - params, txs := randomTxsForSingleCh(rng, 3) - adjSub := &mocks.AdjudicatorSubscription{} - trigger := setExpectationNextCall(adjSub, makeRegisteredEvents(txs[1], txs[2])...) - - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSub, nil) - setExpectationRegisterCalls(t, rs, &channelTree{txs[2], []channel.Transaction{}}) - w := newWatcher(t, rs) - - // Publish both the states to the watcher. - statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) - require.NoError(t, statesPub.Publish(context.Background(), txs[1])) - require.NoError(t, statesPub.Publish(context.Background(), txs[2])) - - // Trigger adjudicator events with an older state and assert if Register was called once. - triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) - time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. - rs.AssertNumberOfCalls(t, "Register", 1) - - // Trigger adjudicator events with the registered state and assert. - triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) - rs.AssertExpectations(t) - }) - - testIfEventsAreRelayed := func( - t *testing.T, - eventConstructor func(txs ...channel.Transaction, - ) []channel.AdjudicatorEvent, - ) { - t.Helper() - // Setup: Generate the params and off-chain states for a ledger channel. - params, txs := randomTxsForSingleCh(rng, 2) + // Send a registered event on the adjudicator subscription, with the latest state. + // Watcher should relay the event and not refute. + t.Run("happy/latest_state_registered", func(t *testing.T) { + // Setup + params, txs := randomTxsForSingleCh(rng, 3) + adjSub := &mocks.AdjudicatorSubscription{} + trigger := setExpectationNextCall(adjSub, makeRegisteredEvents(txs[2])...) - // Setup: Adjudicator event subscription for the ledger. - adjSub := &mocks.AdjudicatorSubscription{} - trigger := setExpectationNextCall(adjSub, eventConstructor(txs[1])...) - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSub, nil) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSub, nil) + w := newWatcher(t, rs) - // Setup: Initialize the watcher and start watching for the ledger. - w := newWatcher(t, rs) - _, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) + // Publish both the states to the watcher. + statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) + require.NoError(t, statesPub.Publish(context.Background(), txs[1])) + require.NoError(t, statesPub.Publish(context.Background(), txs[2])) - // Trigger the event for ledger channel and assert if they are relayed to the adjudicator subscription - // (eventsForClient). - triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) - rs.AssertExpectations(t) - } - // Test if progressed events are relayed to the adjudicator subscription. - t.Run("happy/progressed_event", func(t *testing.T) { - testIfEventsAreRelayed(t, makeProgressedEvents) - }) - // Test if concluded events are relayed to the adjudicator subscription. - t.Run("happy/concluded_event", func(t *testing.T) { - testIfEventsAreRelayed(t, makeConcludedEvents) - }) + // Trigger events and assert. + triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) + rs.AssertExpectations(t) }) - t.Run("ledger_channel_with_sub_channel", func(t *testing.T) { - // For both, the parent and the sub-channel, - // send a registered event on the adjudicator subscription, with the latest state. - // Watcher should relay the events and not refute. - t.Run("happy/latest_state_registered", func(t *testing.T) { - // Setup - parentParams, parentTxs := randomTxsForSingleCh(rng, 3) - childParams, childTxs := randomTxsForSingleCh(rng, 3) - // Add sub-channel to allocation. This transaction represents funding of the sub-channel. - parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} - - adjSubParent := &mocks.AdjudicatorSubscription{} - triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[2])...) - adjSubChild := &mocks.AdjudicatorSubscription{} - triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[2])...) - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSubParent, nil) - setExpectationSubscribeCall(rs, adjSubChild, nil) - - w := newWatcher(t, rs) - // Parent: Publish both the states to the watcher. - parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) - statesPubParent, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) - require.NoError(t, statesPubParent.Publish(context.Background(), parentTxs[1])) - require.NoError(t, statesPubParent.Publish(context.Background(), parentTxs[2])) - - // Child: Publish both the states to the watcher. - childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) - statesPubChild, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) - require.NoError(t, statesPubChild.Publish(context.Background(), childTxs[1])) - require.NoError(t, statesPubChild.Publish(context.Background(), childTxs[2])) + // Send a registered event on the adjudicator subscription, + // with a state newer than the latest state (published to the watcher), + // Watch should relay the event and not refute. + t.Run("happy/newer_than_latest_state_registered", func(t *testing.T) { + // Setup + params, txs := randomTxsForSingleCh(rng, 3) + adjSub := &mocks.AdjudicatorSubscription{} + trigger := setExpectationNextCall(adjSub, makeRegisteredEvents(txs[2])...) - // Parent and child: Trigger adjudicator events with the latest states and assert. - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - rs.AssertExpectations(t) - }) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSub, nil) + w := newWatcher(t, rs) - // For both, the parent and the sub-channel, - // send a registered event on the adjudicator subscription, - // with a state newer than the latest state (published to the watcher), - // Watch should relay the event and not refute. - t.Run("happy/newer_than_latest_state_registered", func(t *testing.T) { - // Setup - parentParams, parentTxs := randomTxsForSingleCh(rng, 3) - childParams, childTxs := randomTxsForSingleCh(rng, 3) - // Add sub-channel to allocation. This transaction represents funding of the sub-channel. - parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} + // Publish only one of the two newly created off-chain states to the watcher. + statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) + require.NoError(t, statesPub.Publish(context.Background(), txs[1])) - adjSubParent := &mocks.AdjudicatorSubscription{} - triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[2])...) - adjSubChild := &mocks.AdjudicatorSubscription{} - triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[2])...) + // Trigger adjudicator events with a state newer than the latest state (published to the watcher). + triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) + rs.AssertExpectations(t) + }) - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSubParent, nil) - setExpectationSubscribeCall(rs, adjSubChild, nil) + // First, send a registered event on the adjudicator subscription, + // with a state older than the latest state (published to the watcher), + // Watch should relay the event and refute by registering the latest state. + // + // Next, send a registered event on adjudicator subscription, + // with the state that was registered. + // This time, watcher should relay the event and not dispute. + t.Run("happy/older_state_registered", func(t *testing.T) { + // Setup + params, txs := randomTxsForSingleCh(rng, 3) + adjSub := &mocks.AdjudicatorSubscription{} + trigger := setExpectationNextCall(adjSub, makeRegisteredEvents(txs[1], txs[2])...) - w := newWatcher(t, rs) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSub, nil) + setExpectationRegisterCalls(t, rs, &channelTree{txs[2], []channel.Transaction{}}) + w := newWatcher(t, rs) - // Parent: Publish only one of the two newly created off-chain states to the watcher. - parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) - statesPubParent, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) - require.NoError(t, statesPubParent.Publish(context.Background(), parentTxs[1])) + // Publish both the states to the watcher. + statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) + require.NoError(t, statesPub.Publish(context.Background(), txs[1])) + require.NoError(t, statesPub.Publish(context.Background(), txs[2])) - // Child: Publish only one of the two newly created off-chain states to the watcher. - childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) - statesPubChild, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) - require.NoError(t, statesPubChild.Publish(context.Background(), childTxs[1])) + // Trigger adjudicator events with an older state and assert if Register was called once. + triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) + time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. + rs.AssertNumberOfCalls(t, "Register", 1) - // Parent, Child: Trigger adjudicator events with a state newer than - // the latest state (published to the watcher). - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - rs.AssertExpectations(t) - }) - - // For both, the parent and the sub-channel, - // - // First, send a registered event on the adjudicator subscription, - // with a state older than the latest state (published to the watcher), - // Watch should relay the event and refute by registering the latest state. - // - // Next, send a registered event on adjudicator subscription, - // with the state that was registered. - // This time, watcher should relay the event and not dispute. - t.Run("happy/older_state_registered", func(t *testing.T) { - // Setup - parentParams, parentTxs := randomTxsForSingleCh(rng, 3) - childParams, childTxs := randomTxsForSingleCh(rng, 3) - // Add sub-channel to allocation. This transaction represents funding of the sub-channel. - parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} - - adjSubParent := &mocks.AdjudicatorSubscription{} - triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[1], parentTxs[2])...) - adjSubChild := &mocks.AdjudicatorSubscription{} - triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[1], childTxs[2])...) + // Trigger adjudicator events with the registered state and assert. + triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) + rs.AssertExpectations(t) + }) - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSubParent, nil) - setExpectationSubscribeCall(rs, adjSubChild, nil) - setExpectationRegisterCalls(t, rs, &channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}}) + testIfEventsAreRelayed := func( + t *testing.T, + eventConstructor func(txs ...channel.Transaction, + ) []channel.AdjudicatorEvent, + ) { + t.Helper() + // Setup: Generate the params and off-chain states for a ledger channel. + params, txs := randomTxsForSingleCh(rng, 2) + + // Setup: Adjudicator event subscription for the ledger. + adjSub := &mocks.AdjudicatorSubscription{} + trigger := setExpectationNextCall(adjSub, eventConstructor(txs[1])...) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSub, nil) + + // Setup: Initialize the watcher and start watching for the ledger. + w := newWatcher(t, rs) + _, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State)) - w := newWatcher(t, rs) + // Trigger the event for ledger channel and assert if they are relayed to the adjudicator subscription + // (eventsForClient). + triggerAdjEventAndExpectNotification(t, trigger, eventsForClient) + rs.AssertExpectations(t) + } + // Test if progressed events are relayed to the adjudicator subscription. + t.Run("happy/progressed_event", func(t *testing.T) { + testIfEventsAreRelayed(t, makeProgressedEvents) + }) + // Test if concluded events are relayed to the adjudicator subscription. + t.Run("happy/concluded_event", func(t *testing.T) { + testIfEventsAreRelayed(t, makeConcludedEvents) + }) +} - // Parent: Publish both the states to the watcher. - parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) - parentStatesPub, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) - require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[1])) - require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[2])) +func Test_Watcher_WithSubchannel(t *testing.T) { + rng := test.Prng(t) + // For both, the parent and the sub-channel, + // send a registered event on the adjudicator subscription, with the latest state. + // Watcher should relay the events and not refute. + t.Run("happy/latest_state_registered", func(t *testing.T) { + // Setup + parentParams, parentTxs := randomTxsForSingleCh(rng, 3) + childParams, childTxs := randomTxsForSingleCh(rng, 3) + // Add sub-channel to allocation. This transaction represents funding of the sub-channel. + parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} + + adjSubParent := &mocks.AdjudicatorSubscription{} + triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[2])...) + adjSubChild := &mocks.AdjudicatorSubscription{} + triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[2])...) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSubParent, nil) + setExpectationSubscribeCall(rs, adjSubChild, nil) - // Child: Publish both the states to the watcher. - childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) - childStatesPub, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) - require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[1])) - require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[2])) + w := newWatcher(t, rs) - // Parent and Child: Trigger adjudicator events with an older state and assert if Register was called once. - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. - rs.AssertNumberOfCalls(t, "Register", 1) + // Parent: Publish both the states to the watcher. + parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) + statesPubParent, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) + require.NoError(t, statesPubParent.Publish(context.Background(), parentTxs[1])) + require.NoError(t, statesPubParent.Publish(context.Background(), parentTxs[2])) + + // Child: Publish both the states to the watcher. + childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) + statesPubChild, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) + require.NoError(t, statesPubChild.Publish(context.Background(), childTxs[1])) + require.NoError(t, statesPubChild.Publish(context.Background(), childTxs[2])) + + // Parent and child: Trigger adjudicator events with the latest states and assert. + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + rs.AssertExpectations(t) + }) - // Parent and Child: Trigger adjudicator events with the registered state and assert. - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - rs.AssertExpectations(t) - }) + // For both, the parent and the sub-channel, + // send a registered event on the adjudicator subscription, + // with a state newer than the latest state (published to the watcher), + // Watch should relay the event and not refute. + t.Run("happy/newer_than_latest_state_registered", func(t *testing.T) { + // Setup + parentParams, parentTxs := randomTxsForSingleCh(rng, 3) + childParams, childTxs := randomTxsForSingleCh(rng, 3) + // Add sub-channel to allocation. This transaction represents funding of the sub-channel. + parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} - // First, for both, the parent and the sub-channel, - // Send a registered event on the adjudicator subscription, - // with a state older than the latest state (published to the watcher), - // Watch should relay the event and refute by registering the latest state. - // - // Next, for the sub-channel, publish another off-chain state to the watcher. - // - // Next, for both, the parent and the sub-channel, - // Send a registered event on adjudicator subscription, - // with the state that was registered. - // This time, because the registered state is older than the latest state for the sub-channel, - // Watch should relay the event and refute by registering the latest state. - // - // Next, for the sub-channel, send a registered event on the adjudicator - // subscription. This time, the watcher should relay the event and not - // dispute. - t.Run("happy/older_state_registered_then_newer_state_received", func(t *testing.T) { - // Setup - parentParams, parentTxs := randomTxsForSingleCh(rng, 3) - childParams, childTxs := randomTxsForSingleCh(rng, 4) - // Add sub-channel to allocation. This transaction represents funding of the sub-channel. - parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} + adjSubParent := &mocks.AdjudicatorSubscription{} + triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[2])...) + adjSubChild := &mocks.AdjudicatorSubscription{} + triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[2])...) - adjSubParent := &mocks.AdjudicatorSubscription{} - triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[1], parentTxs[2])...) - adjSubChild := &mocks.AdjudicatorSubscription{} - triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[1], childTxs[2], childTxs[3])...) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSubParent, nil) + setExpectationSubscribeCall(rs, adjSubChild, nil) - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSubParent, nil) - setExpectationSubscribeCall(rs, adjSubChild, nil) - setExpectationRegisterCalls(t, rs, - &channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}}, - &channelTree{parentTxs[2], []channel.Transaction{childTxs[3]}}) + w := newWatcher(t, rs) - w := newWatcher(t, rs) + // Parent: Publish only one of the two newly created off-chain states to the watcher. + parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) + statesPubParent, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) + require.NoError(t, statesPubParent.Publish(context.Background(), parentTxs[1])) + + // Child: Publish only one of the two newly created off-chain states to the watcher. + childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) + statesPubChild, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) + require.NoError(t, statesPubChild.Publish(context.Background(), childTxs[1])) + + // Parent, Child: Trigger adjudicator events with a state newer than + // the latest state (published to the watcher). + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + rs.AssertExpectations(t) + }) - // Parent: Publish both the states to the watcher. - parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) - parentStatesPub, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) - require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[1])) - require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[2])) + // For both, the parent and the sub-channel, + // + // First, send a registered event on the adjudicator subscription, + // with a state older than the latest state (published to the watcher), + // Watch should relay the event and refute by registering the latest state. + // + // Next, send a registered event on adjudicator subscription, + // with the state that was registered. + // This time, watcher should relay the event and not dispute. + t.Run("happy/older_state_registered", func(t *testing.T) { + // Setup + parentParams, parentTxs := randomTxsForSingleCh(rng, 3) + childParams, childTxs := randomTxsForSingleCh(rng, 3) + // Add sub-channel to allocation. This transaction represents funding of the sub-channel. + parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} - // Child: Publish both the states to the watcher. - childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) - childStatesPub, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) - require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[1])) - require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[2])) + adjSubParent := &mocks.AdjudicatorSubscription{} + triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[1], parentTxs[2])...) + adjSubChild := &mocks.AdjudicatorSubscription{} + triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[1], childTxs[2])...) - // Parent and Child: Trigger adjudicator events with an older state and assert if Register was called once. - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. - rs.AssertNumberOfCalls(t, "Register", 1) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSubParent, nil) + setExpectationSubscribeCall(rs, adjSubChild, nil) + setExpectationRegisterCalls(t, rs, &channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}}) - // Child: After register was called, publish a new state to the watcher. - require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[3])) + w := newWatcher(t, rs) - // Parent and Child: Trigger adjudicator events with the registered state and assert if Register was called once. - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. - rs.AssertNumberOfCalls(t, "Register", 2) + // Parent: Publish both the states to the watcher. + parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) + parentStatesPub, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) + require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[1])) + require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[2])) + + // Child: Publish both the states to the watcher. + childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) + childStatesPub, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) + require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[1])) + require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[2])) + + // Parent and Child: Trigger adjudicator events with an older state and assert if Register was called once. + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. + rs.AssertNumberOfCalls(t, "Register", 1) + + // Parent and Child: Trigger adjudicator events with the registered state and assert. + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + rs.AssertExpectations(t) + }) - // Child: Trigger adjudicator events with the new state and assert. - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - rs.AssertExpectations(t) - }) + // First, for both, the parent and the sub-channel, + // Send a registered event on the adjudicator subscription, + // with a state older than the latest state (published to the watcher), + // Watch should relay the event and refute by registering the latest state. + // + // Next, for the sub-channel, publish another off-chain state to the watcher. + // + // Next, for both, the parent and the sub-channel, + // Send a registered event on adjudicator subscription, + // with the state that was registered. + // This time, because the registered state is older than the latest state for the sub-channel, + // Watch should relay the event and refute by registering the latest state. + // + // Next, for the sub-channel, send a registered event on the adjudicator + // subscription. This time, the watcher should relay the event and not + // dispute. + t.Run("happy/older_state_registered_then_newer_state_received", func(t *testing.T) { + // Setup + parentParams, parentTxs := randomTxsForSingleCh(rng, 3) + childParams, childTxs := randomTxsForSingleCh(rng, 4) + // Add sub-channel to allocation. This transaction represents funding of the sub-channel. + parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} + + adjSubParent := &mocks.AdjudicatorSubscription{} + triggerParent := setExpectationNextCall(adjSubParent, makeRegisteredEvents(parentTxs[1], parentTxs[2])...) + adjSubChild := &mocks.AdjudicatorSubscription{} + triggerChild := setExpectationNextCall(adjSubChild, makeRegisteredEvents(childTxs[1], childTxs[2], childTxs[3])...) + + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSubParent, nil) + setExpectationSubscribeCall(rs, adjSubChild, nil) + setExpectationRegisterCalls(t, rs, + &channelTree{parentTxs[2], []channel.Transaction{childTxs[2]}}, + &channelTree{parentTxs[2], []channel.Transaction{childTxs[3]}}) - testIfEventsAreRelayed := func( - t *testing.T, - eventConstructor func(txs ...channel.Transaction, - ) []channel.AdjudicatorEvent, - ) { - t.Helper() - // Setup: Generate the params and off-chain states for a ledger channel and a sub-channel. - parentParams, parentTxs := randomTxsForSingleCh(rng, 2) - childParams, childTxs := randomTxsForSingleCh(rng, 2) - parentTxs[1].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation. + w := newWatcher(t, rs) - // Setup: Adjudicator event subscription for the ledger and sub-channel. - adjSubParent := &mocks.AdjudicatorSubscription{} - triggerParent := setExpectationNextCall(adjSubParent, eventConstructor(parentTxs[1])...) - adjSubChild := &mocks.AdjudicatorSubscription{} - triggerChild := setExpectationNextCall(adjSubChild, eventConstructor(childTxs[1])...) - rs := &mocks.RegisterSubscriber{} - setExpectationSubscribeCall(rs, adjSubParent, nil) - setExpectationSubscribeCall(rs, adjSubChild, nil) + // Parent: Publish both the states to the watcher. + parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) + parentStatesPub, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) + require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[1])) + require.NoError(t, parentStatesPub.Publish(context.Background(), parentTxs[2])) + + // Child: Publish both the states to the watcher. + childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) + childStatesPub, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) + require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[1])) + require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[2])) + + // Parent and Child: Trigger adjudicator events with an older state and assert if Register was called once. + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. + rs.AssertNumberOfCalls(t, "Register", 1) + + // Child: After register was called, publish a new state to the watcher. + require.NoError(t, childStatesPub.Publish(context.Background(), childTxs[3])) + + // Parent and Child: Trigger adjudicator events with the registered state and assert if Register was called once. + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + time.Sleep(50 * time.Millisecond) // Wait for the watcher to refute. + rs.AssertNumberOfCalls(t, "Register", 2) + + // Child: Trigger adjudicator events with the new state and assert. + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + rs.AssertExpectations(t) + }) - // Setup: Initialize the watcher and start watching for the ledger and sub-channel. - w := newWatcher(t, rs) - parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) - _, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) - childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) - _, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) + testIfEventsAreRelayed := func( + t *testing.T, + eventConstructor func(txs ...channel.Transaction, + ) []channel.AdjudicatorEvent, + ) { + t.Helper() + // Setup: Generate the params and off-chain states for a ledger channel and a sub-channel. + parentParams, parentTxs := randomTxsForSingleCh(rng, 2) + childParams, childTxs := randomTxsForSingleCh(rng, 2) + parentTxs[1].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation. + + // Setup: Adjudicator event subscription for the ledger and sub-channel. + adjSubParent := &mocks.AdjudicatorSubscription{} + triggerParent := setExpectationNextCall(adjSubParent, eventConstructor(parentTxs[1])...) + adjSubChild := &mocks.AdjudicatorSubscription{} + triggerChild := setExpectationNextCall(adjSubChild, eventConstructor(childTxs[1])...) + rs := &mocks.RegisterSubscriber{} + setExpectationSubscribeCall(rs, adjSubParent, nil) + setExpectationSubscribeCall(rs, adjSubChild, nil) + + // Setup: Initialize the watcher and start watching for the ledger and sub-channel. + w := newWatcher(t, rs) + parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State) + _, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState) + childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State) + _, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID) - // Trigger the events for both (the ledger channel and the sub-channel) and, assert if they are relayed to - // the adjudicator subscription (eventsForClient). - triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) - triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) + // Trigger the events for both (the ledger channel and the sub-channel) and, assert if they are relayed to + // the adjudicator subscription (eventsForClient). + triggerAdjEventAndExpectNotification(t, triggerParent, eventsForClientParent) + triggerAdjEventAndExpectNotification(t, triggerChild, eventsForClientChild) - rs.AssertExpectations(t) - } - // Test if progressed events are relayed to the adjudicator subscription. - t.Run("happy/progressed_event", func(t *testing.T) { - testIfEventsAreRelayed(t, makeProgressedEvents) - }) - // Test if concluded events are relayed to the adjudicator subscription. - t.Run("happy/concluded_event", func(t *testing.T) { - testIfEventsAreRelayed(t, makeConcludedEvents) - }) + rs.AssertExpectations(t) + } + // Test if progressed events are relayed to the adjudicator subscription. + t.Run("happy/progressed_event", func(t *testing.T) { + testIfEventsAreRelayed(t, makeProgressedEvents) + }) + // Test if concluded events are relayed to the adjudicator subscription. + t.Run("happy/concluded_event", func(t *testing.T) { + testIfEventsAreRelayed(t, makeConcludedEvents) }) } diff --git a/wire/cache.go b/wire/cache.go index c43975ce..0810dd33 100644 --- a/wire/cache.go +++ b/wire/cache.go @@ -52,16 +52,16 @@ func (c *Cache) Release(p *Predicate) { // If it matches several predicates, it is still only added once to the cache. func (c *Cache) Put(e *Envelope) bool { // we filter the predicates for non-active and lazily remove them - any := false + found := false for p := range c.preds { - any = any || (*p)(e) + found = found || (*p)(e) } - if any { + if found { c.msgs = append(c.msgs, e) } - return any + return found } // Messages retrieves all messages from the cache that match the predicate. They are diff --git a/wire/relay.go b/wire/relay.go index d130a2eb..9e5afe91 100644 --- a/wire/relay.go +++ b/wire/relay.go @@ -160,15 +160,15 @@ func (p *Relay) Put(e *Envelope) { return } - any := false + found := false for _, sub := range p.consumers { if sub.predicate(e) { sub.consumer.Put(e) - any = true + found = true } } - if !any { + if !found { if !p.cache.Put(e) { p.defaultMsgHandler(e) }