Skip to content

Commit

Permalink
Fix logs wss publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
pustovalov committed Dec 7, 2023
1 parent 9cdd25a commit dd28fe5
Show file tree
Hide file tree
Showing 2 changed files with 377 additions and 1 deletion.
2 changes: 1 addition & 1 deletion rpc/node/events/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (eb *EventBroker) Start() {
// v.logsCh is buffered, use non-blocking send to protect the broker:
// timeout preferred instead of default to be able to tolerate slight delays
select {
case s.GetLogsCh() <- logs:
case s.GetLogsCh() <- matchedLogs:
case <-time.After(10 * time.Millisecond):
eb.l.Warn().Msg("Publishing to Logs channel fall into DEFAULT!")
}
Expand Down
376 changes: 376 additions & 0 deletions rpc/node/events/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/aurora-is-near/relayer2-base/broker"
"github.com/aurora-is-near/relayer2-base/rpc/node/events"
"github.com/aurora-is-near/relayer2-base/types/common"
"github.com/aurora-is-near/relayer2-base/types/event"
"github.com/aurora-is-near/relayer2-base/types/primitives"
"github.com/aurora-is-near/relayer2-base/types/request"
Expand Down Expand Up @@ -95,6 +96,381 @@ func TestBrokerFlows(t *testing.T) {

}

func TestBrokerReturnsCorrectEventsEmptyParams(t *testing.T) {
eb := events.NewEventBroker()
go eb.Start()

rcvEventCounter := 0
eventCounterCh := make(chan int)
go func() {
for range eventCounterCh {
rcvEventCounter++
}
}()

filterParams := request.LogSubscriptionOptions{}

clientLogSub := createClientAndSubscribeLogs(eb, eventCounterCh, filterParams)

time.Sleep(1 * time.Second)

sentNumMsgCh := make(chan int)
go func() {
sentMsgCounter := 0
allLogs := make(event.Logs, 0)

for i := 0; i < 10; i++ {
tmpLRS := make(event.Logs, 1)
tmpLRS[0] = GenerateLogResponse()
allLogs = append(allLogs, tmpLRS[0])
sentMsgCounter++
}

eb.PublishLogs(allLogs)
sentNumMsgCh <- sentMsgCounter
}()

sentEventCounter := <-sentNumMsgCh

time.Sleep(1 * time.Second)

assert.NotEqual(t, 0, sentEventCounter)
assert.Equal(t, 10, rcvEventCounter, "Number of received events does not match expected value")

for i := 0; i < numClients; i++ {
eb.UnsubscribeFromLogs(clientLogSub)
}

time.Sleep(1 * time.Second)
}

func TestBrokerReturnsCorrectEventsWithAddressAndTopics(t *testing.T) {
eb := events.NewEventBroker()
go eb.Start()

rcvEventCounter := 0
eventCounterCh := make(chan int)
go func() {
for range eventCounterCh {
rcvEventCounter++
}
}()

contractAddresses := []common.Address{
common.BytesToAddress(primitives.Data20FromHex("0x2f41af687164062f118297ca10751f4b55478ae1").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x03b666f3488a7992b2385b12df7f35156d7b29cd").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x20f8aefb5697b77e0bb835a8518be70775cda1b0").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x61c9e05d1cdb1b70856c7a2c53fa9c220830633c").Bytes()),
}

topics := request.Topics{
{},
{},
{[]byte(`0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e`)},
}

filterParams := request.LogSubscriptionOptions{
Address: contractAddresses,
Topics: topics,
}

clientLogSub := createClientAndSubscribeLogs(eb, eventCounterCh, filterParams)

time.Sleep(1 * time.Second)

sentNumMsgCh := make(chan int)
go func() {
sentMsgCounter := 0
timeout := time.After(eventTimeoutSeconds * time.Second)

allLogs := make(event.Logs, 0)

for {
tmpLRS := make(event.Logs, 1)
tmpLRS[0] = GenerateLogResponse()
allLogs = append(allLogs, tmpLRS[0])
sentMsgCounter += 1

time.Sleep(10 * time.Millisecond)
select {
case <-timeout:
tmpLog1 := &response.Log{
Removed: false,
LogIndex: primitives.HexUint(5),
TransactionIndex: primitives.HexUint(0),
TransactionHash: primitives.Data32FromHex("0x29d3cd070a26eb34cd1c8abb70cb1e966819a342bc03965a4cd662442f712615"),
BlockHash: primitives.Data32FromHex("0x0579fb6c14a212998fc0e3792c2994f5f0179d8f64aa6e9059edd1f69df05155"),
BlockNumber: primitives.HexUint(107219211),
Address: primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198"),
Data: primitives.VarDataFromHex("0x0000000000000000000000000000000000000000000b6afb14c2d46e19ffffc40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000305d9662647959"),
Topics: []primitives.Data32{
primitives.Data32FromHex("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"),
primitives.Data32FromHex("0x0000000000000000000000002cb45edb4517d5947afde3beabf95a582506858b"),
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
},
}
allLogs = append(allLogs, tmpLog1)
sentMsgCounter += 1

tmpLog2 := &response.Log{
Removed: false,
LogIndex: primitives.HexUint(5),
TransactionIndex: primitives.HexUint(0),
TransactionHash: primitives.Data32FromHex("0x29d3cd070a26eb34cd1c8abb70cb1e966819a342bc03965a4cd662442f712615"),
BlockHash: primitives.Data32FromHex("0x0579fb6c14a212998fc0e3792c2994f5f0179d8f64aa6e9059edd1f69df05155"),
BlockNumber: primitives.HexUint(107219211),
Address: primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198"),
Data: primitives.VarDataFromHex("0x0000000000000000000000000000000000000000000b6afb14c2d46e19ffffc40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000305d9662647959"),
Topics: []primitives.Data32{
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
primitives.Data32FromHex("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"),
},
}

allLogs = append(allLogs, tmpLog2)
sentMsgCounter += 1

eb.PublishLogs(allLogs)

sentNumMsgCh <- sentMsgCounter
return
default:
}
}
}()

sentEventCounter := <-sentNumMsgCh
time.Sleep(1 * time.Second)

assert.NotEqual(t, 0, sentEventCounter)
assert.Equal(t, 1, rcvEventCounter, "Number of received events does not match expected value")

for i := 0; i < numClients; i++ {
eb.UnsubscribeFromLogs(clientLogSub)
}

time.Sleep(1 * time.Second)
}

func TestBrokerReturnsCorrectEventsWithTopics(t *testing.T) {
eb := events.NewEventBroker()
go eb.Start()

rcvEventCounter := 0
eventCounterCh := make(chan int)
go func() {
for range eventCounterCh {
rcvEventCounter++
}
}()

topics := request.Topics{
{[]byte(`0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e`)},
}

filterParams := request.LogSubscriptionOptions{
Topics: topics,
}

clientLogSub := createClientAndSubscribeLogs(eb, eventCounterCh, filterParams)

time.Sleep(1 * time.Second)

sentNumMsgCh := make(chan int)
go func() {
sentMsgCounter := 0
timeout := time.After(eventTimeoutSeconds * time.Second)

allLogs := make(event.Logs, 0)

for {
tmpLRS := make(event.Logs, 1)
tmpLRS[0] = GenerateLogResponse()
allLogs = append(allLogs, tmpLRS[0])
sentMsgCounter += 1

time.Sleep(10 * time.Millisecond)
select {
case <-timeout:
tmpLog1 := &response.Log{
Removed: false,
LogIndex: primitives.HexUint(5),
TransactionIndex: primitives.HexUint(0),
TransactionHash: primitives.Data32FromHex("0x29d3cd070a26eb34cd1c8abb70cb1e966819a342bc03965a4cd662442f712615"),
BlockHash: primitives.Data32FromHex("0x0579fb6c14a212998fc0e3792c2994f5f0179d8f64aa6e9059edd1f69df05155"),
BlockNumber: primitives.HexUint(107219211),
Address: primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198"),
Data: primitives.VarDataFromHex("0x0000000000000000000000000000000000000000000b6afb14c2d46e19ffffc40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000305d9662647959"),
Topics: []primitives.Data32{
primitives.Data32FromHex("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"),
primitives.Data32FromHex("0x0000000000000000000000002cb45edb4517d5947afde3beabf95a582506858b"),
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
},
}
allLogs = append(allLogs, tmpLog1)
sentMsgCounter += 1

tmpLog2 := &response.Log{
Removed: false,
LogIndex: primitives.HexUint(5),
TransactionIndex: primitives.HexUint(0),
TransactionHash: primitives.Data32FromHex("0x29d3cd070a26eb34cd1c8abb70cb1e966819a342bc03965a4cd662442f712615"),
BlockHash: primitives.Data32FromHex("0x0579fb6c14a212998fc0e3792c2994f5f0179d8f64aa6e9059edd1f69df05155"),
BlockNumber: primitives.HexUint(107219211),
Address: primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198"),
Data: primitives.VarDataFromHex("0x0000000000000000000000000000000000000000000b6afb14c2d46e19ffffc40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000305d9662647959"),
Topics: []primitives.Data32{
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
primitives.Data32FromHex("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"),
},
}

allLogs = append(allLogs, tmpLog2)
sentMsgCounter += 1

eb.PublishLogs(allLogs)

sentNumMsgCh <- sentMsgCounter
return
default:
}
}
}()

sentEventCounter := <-sentNumMsgCh
time.Sleep(1 * time.Second)

assert.NotEqual(t, 0, sentEventCounter)
assert.Equal(t, 1, rcvEventCounter, "Number of received events does not match expected value")

for i := 0; i < numClients; i++ {
eb.UnsubscribeFromLogs(clientLogSub)
}

time.Sleep(1 * time.Second)
}

func TestBrokerReturnsCorrectEventsWithAddress(t *testing.T) {
eb := events.NewEventBroker()
go eb.Start()

rcvEventCounter := 0
eventCounterCh := make(chan int)
go func() {
for range eventCounterCh {
rcvEventCounter++
}
}()

contractAddresses := []common.Address{
common.BytesToAddress(primitives.Data20FromHex("0x2f41af687164062f118297ca10751f4b55478ae1").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x03b666f3488a7992b2385b12df7f35156d7b29cd").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x20f8aefb5697b77e0bb835a8518be70775cda1b0").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198").Bytes()),
common.BytesToAddress(primitives.Data20FromHex("0x61c9e05d1cdb1b70856c7a2c53fa9c220830633c").Bytes()),
}

filterParams := request.LogSubscriptionOptions{
Address: contractAddresses,
}

clientLogSub := createClientAndSubscribeLogs(eb, eventCounterCh, filterParams)

time.Sleep(1 * time.Second)

sentNumMsgCh := make(chan int)
go func() {
sentMsgCounter := 0
timeout := time.After(eventTimeoutSeconds * time.Second)

allLogs := make(event.Logs, 0)

for {
tmpLRS := make(event.Logs, 1)
tmpLRS[0] = GenerateLogResponse()
allLogs = append(allLogs, tmpLRS[0])
sentMsgCounter += 1

time.Sleep(10 * time.Millisecond)
select {
case <-timeout:
tmpLog1 := &response.Log{
Removed: false,
LogIndex: primitives.HexUint(5),
TransactionIndex: primitives.HexUint(0),
TransactionHash: primitives.Data32FromHex("0x29d3cd070a26eb34cd1c8abb70cb1e966819a342bc03965a4cd662442f712615"),
BlockHash: primitives.Data32FromHex("0x0579fb6c14a212998fc0e3792c2994f5f0179d8f64aa6e9059edd1f69df05155"),
BlockNumber: primitives.HexUint(107219211),
Address: primitives.Data20FromHex("0x20f8aefb5697b77e0bb835a8518be70775cda1b0"),
Data: primitives.VarDataFromHex("0x0000000000000000000000000000000000000000000b6afb14c2d46e19ffffc40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000305d9662647959"),
Topics: []primitives.Data32{
primitives.Data32FromHex("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"),
primitives.Data32FromHex("0x0000000000000000000000002cb45edb4517d5947afde3beabf95a582506858b"),
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
},
}
allLogs = append(allLogs, tmpLog1)
sentMsgCounter += 1

tmpLog2 := &response.Log{
Removed: false,
LogIndex: primitives.HexUint(5),
TransactionIndex: primitives.HexUint(0),
TransactionHash: primitives.Data32FromHex("0x29d3cd070a26eb34cd1c8abb70cb1e966819a342bc03965a4cd662442f712615"),
BlockHash: primitives.Data32FromHex("0x0579fb6c14a212998fc0e3792c2994f5f0179d8f64aa6e9059edd1f69df05155"),
BlockNumber: primitives.HexUint(107219211),
Address: primitives.Data20FromHex("0x63da4db6ef4e7c62168ab03982399f9588fcd198"),
Data: primitives.VarDataFromHex("0x0000000000000000000000000000000000000000000b6afb14c2d46e19ffffc40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000305d9662647959"),
Topics: []primitives.Data32{
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
primitives.Data32FromHex("0x0000000000000000000000005eec60f348cb1d661e4a5122cf4638c7db7a886e"),
primitives.Data32FromHex("0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"),
},
}

allLogs = append(allLogs, tmpLog2)
sentMsgCounter += 1

eb.PublishLogs(allLogs)

sentNumMsgCh <- sentMsgCounter
return
default:
}
}
}()

sentEventCounter := <-sentNumMsgCh
time.Sleep(1 * time.Second)

assert.NotEqual(t, 0, sentEventCounter)
assert.Equal(t, 2, rcvEventCounter, "Number of received events does not match expected value")

for i := 0; i < numClients; i++ {
eb.UnsubscribeFromLogs(clientLogSub)
}

time.Sleep(1 * time.Second)
}

func createClientAndSubscribeLogs(eb *events.EventBroker, eventCounterCh chan int, subOptions request.LogSubscriptionOptions) broker.Subscription {
clientLogCh := make(chan event.Logs)
subsLog := eb.SubscribeLogs(subOptions, clientLogCh)

go func() {
for logs := range clientLogCh {
for range logs {
eventCounterCh <- 1
}
}
}()

return subsLog
}

func createClientAndSubscribe(eb *events.EventBroker, eventCounterCh chan int) (broker.Subscription, broker.Subscription) {
clientNHCh := make(chan event.Block)
subsNH := eb.SubscribeNewHeads(clientNHCh)
Expand Down

0 comments on commit dd28fe5

Please sign in to comment.