Skip to content

Commit

Permalink
chore: make unit test more stable
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 8, 2023
1 parent 129d68a commit cae1cf0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 34 deletions.
14 changes: 6 additions & 8 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,18 +499,15 @@ func TestConnDetach(t *testing.T) {
func TestParallelShortConnection(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

var received int64
el, err := NewEventLoop(func(ctx context.Context, connection Connection) error {
data, err := connection.Reader().Next(connection.Reader().Len())
if err != nil {
return err
}
Assert(t, err == nil || errors.Is(err, ErrEOF))
atomic.AddInt64(&received, int64(len(data)))
//t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
return nil
})
defer el.Shutdown(context.Background())
go func() {
el.Serve(ln)
}()
Expand All @@ -536,10 +533,11 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

for atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
start := time.Now()
for atomic.LoadInt64(&received) < int64(totalSize) && time.Now().Sub(start) < time.Second {
time.Sleep(time.Millisecond * 100)
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}

func TestConnectionServerClose(t *testing.T) {
Expand Down
47 changes: 21 additions & 26 deletions mux/shard_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,38 @@ package mux

import (
"net"
"sync"
"testing"
"time"

"github.com/cloudwego/netpoll"
)

func TestShardQueue(t *testing.T) {
var svrConn net.Conn
accepted := make(chan struct{})

network, address := "tcp", ":18888"
ln, err := net.Listen("tcp", ":18888")
MustNil(t, err)
stop := make(chan int, 1)
defer close(stop)
count, pkgsize := 16, 11
var wg sync.WaitGroup
wg.Add(1)
go func() {
var err error
for {
select {
case <-stop:
err = ln.Close()
MustNil(t, err)
return
default:
}
svrConn, err = ln.Accept()
MustNil(t, err)
accepted <- struct{}{}
}
defer wg.Done()
svrConn, err := ln.Accept()
MustNil(t, err)
accepted <- struct{}{}

total := count * pkgsize
recv := make([]byte, total)
rn, err := svrConn.Read(recv)
MustNil(t, err)
Equal(t, rn, total)

<-stop
err = ln.Close()
MustNil(t, err)
}()

conn, err := netpoll.DialConnection(network, address, time.Second)
Expand All @@ -56,8 +59,7 @@ func TestShardQueue(t *testing.T) {

// test
queue := NewShardQueue(4, conn)
count, pkgsize := 16, 11
for i := 0; i < int(count); i++ {
for i := 0; i < count; i++ {
var getter WriterGetter = func() (buf netpoll.Writer, isNil bool) {
buf = netpoll.NewLinkBuffer(pkgsize)
buf.Malloc(pkgsize)
Expand All @@ -68,14 +70,7 @@ func TestShardQueue(t *testing.T) {

err = queue.Close()
MustNil(t, err)
total := count * pkgsize
recv := make([]byte, total)
rn, err := svrConn.Read(recv)
MustNil(t, err)
Equal(t, rn, total)
}

// TODO: need mock flush
func BenchmarkShardQueue(b *testing.B) {
b.Skip()
close(stop)
wg.Wait()
}

0 comments on commit cae1cf0

Please sign in to comment.