Skip to content

Commit

Permalink
chore: make unit test more stable
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 8, 2023
1 parent 129d68a commit 49fbf6b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 63 deletions.
2 changes: 1 addition & 1 deletion connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (c *connection) rw2r() {
case opreadwrite:
c.operator.Control(PollRW2R)
case opwrite:
c.operator.Control(PollW2RW)
c.operator.Control(PollW2Hup)
}
c.triggerWrite(nil)
}
Expand Down
18 changes: 7 additions & 11 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,18 +499,15 @@ func TestConnDetach(t *testing.T) {
func TestParallelShortConnection(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

var received int64
el, err := NewEventLoop(func(ctx context.Context, connection Connection) error {
data, err := connection.Reader().Next(connection.Reader().Len())
if err != nil {
return err
}
Assert(t, err == nil || errors.Is(err, ErrEOF))
atomic.AddInt64(&received, int64(len(data)))
//t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
return nil
})
defer el.Shutdown(context.Background())
go func() {
el.Serve(ln)
}()
Expand All @@ -536,10 +533,11 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

for atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
start := time.Now()
for atomic.LoadInt64(&received) < int64(totalSize) && time.Now().Sub(start) < time.Second {
time.Sleep(time.Millisecond * 100)
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}

func TestConnectionServerClose(t *testing.T) {
Expand Down Expand Up @@ -643,8 +641,6 @@ func TestConnectionServerClose(t *testing.T) {
func TestConnectionDailTimeoutAndClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
_, err = connection.Reader().Next(connection.Reader().Len())
Expand All @@ -668,7 +664,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Nanosecond)
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"))
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"), err)
_ = conn
}()
}
Expand Down
45 changes: 18 additions & 27 deletions mux/shard_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,33 @@ package mux

import (
"net"
"sync"
"testing"
"time"

"github.com/cloudwego/netpoll"
)

func TestShardQueue(t *testing.T) {
var svrConn net.Conn
accepted := make(chan struct{})

network, address := "tcp", ":18888"
ln, err := net.Listen("tcp", ":18888")
MustNil(t, err)
stop := make(chan int, 1)
defer close(stop)
count, pkgsize := 16, 11
var wg sync.WaitGroup
wg.Add(1)
go func() {
var err error
for {
select {
case <-stop:
err = ln.Close()
MustNil(t, err)
return
default:
}
svrConn, err = ln.Accept()
MustNil(t, err)
accepted <- struct{}{}
}
defer wg.Done()
svrConn, err := ln.Accept()
MustNil(t, err)
accepted <- struct{}{}

total := count * pkgsize
recv := make([]byte, total)
rn, err := svrConn.Read(recv)
MustNil(t, err)
Equal(t, rn, total)
}()

conn, err := netpoll.DialConnection(network, address, time.Second)
Expand All @@ -56,8 +54,7 @@ func TestShardQueue(t *testing.T) {

// test
queue := NewShardQueue(4, conn)
count, pkgsize := 16, 11
for i := 0; i < int(count); i++ {
for i := 0; i < count; i++ {
var getter WriterGetter = func() (buf netpoll.Writer, isNil bool) {
buf = netpoll.NewLinkBuffer(pkgsize)
buf.Malloc(pkgsize)
Expand All @@ -68,14 +65,8 @@ func TestShardQueue(t *testing.T) {

err = queue.Close()
MustNil(t, err)
total := count * pkgsize
recv := make([]byte, total)
rn, err := svrConn.Read(recv)
MustNil(t, err)
Equal(t, rn, total)
}

// TODO: need mock flush
func BenchmarkShardQueue(b *testing.B) {
b.Skip()
wg.Wait()
err = ln.Close()
MustNil(t, err)
}
13 changes: 3 additions & 10 deletions net_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,8 @@ func TestDialerTCP(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

stop := make(chan int, 1)
defer close(stop)

go func() {
for {
select {
case <-stop:
err := ln.Close()
MustNil(t, err)
return
default:
}
conn, err := ln.Accept()
if conn == nil && err == nil {
continue
Expand All @@ -61,6 +51,9 @@ func TestDialerTCP(t *testing.T) {
MustNil(t, err)
MustTrue(t, strings.HasPrefix(conn.LocalAddr().String(), "127.0.0.1:"))
Equal(t, conn.RemoteAddr().String(), "127.0.0.1:1234")

err = ln.Close()
MustNil(t, err)
}

func TestDialerUnix(t *testing.T) {
Expand Down
14 changes: 3 additions & 11 deletions net_polldesc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,8 @@ func TestZeroTimer(t *testing.T) {
func TestRuntimePoll(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

stop := make(chan int, 1)
defer close(stop)

go func() {
for {
select {
case <-stop:
err := ln.Close()
MustNil(t, err)
return
default:
}
conn, err := ln.Accept()
if conn == nil && err == nil {
continue
Expand All @@ -54,4 +43,7 @@ func TestRuntimePoll(t *testing.T) {
MustNil(t, err)
conn.Close()
}

err = ln.Close()
MustNil(t, err)
}
7 changes: 4 additions & 3 deletions poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ const (
// PollRW2W is used to remove the readable monitor of FDOperator.
PollRW2W PollEvent = 0x6
// PollW2RW is used to add the readable monitor of FDOperator, generally used with PollRW2W.
PollW2RW PollEvent = 0x7
PollW2RW PollEvent = 0x7
PollW2Hup PollEvent = 0x8

// PollR2Hup is used to remove the readable monitor of FDOperator.
PollR2Hup PollEvent = 0x8
PollR2Hup PollEvent = 0x9
// PollHup2R is used to add the readable monitor of FDOperator, generally used with PollR2Hup.
PollHup2R PollEvent = 0x9
PollHup2R PollEvent = 0x10
)

0 comments on commit 49fbf6b

Please sign in to comment.