diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index ba4c9262..798696ea 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -6,8 +6,13 @@ jobs: compatibility-test: strategy: matrix: - go: [ 1.15, "1.21" ] - os: [ X64, ARM64 ] + go: [ 1.15, 1.22 ] + # - "ubuntu-latest" is for Linux with X64 CPU, hosted by GitHub, + # fewer CPUs but high speed international network + # - "ARM64" is for Linux with ARM64 CPU, hosted by bytedance, + # more CPUs but inside CN internet which may download go cache slowly. + # GitHub don't have free runner with ARM CPU. + os: [ ubuntu-latest, ARM64 ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -15,14 +20,8 @@ jobs: uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} -# - uses: actions/cache@v2 -# with: -# path: ~/go/pkg/mod -# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} -# restore-keys: | -# ${{ runner.os }}-go- - name: Unit Test - run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./... + run: go test -timeout=2m -v -race -covermode=atomic -coverprofile=coverage.out ./... - name: Benchmark run: go test -bench=. -benchmem -run=none ./... windows-test: @@ -32,13 +31,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: "1.20" -# - uses: actions/cache@v2 -# with: -# path: ~/go/pkg/mod -# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} -# restore-keys: | -# ${{ runner.os }}-go- + go-version: 1.22 - name: Build Test run: go vet -v ./... style-test: diff --git a/connection_test.go b/connection_test.go index eb28a0e4..163645ab 100644 --- a/connection_test.go +++ b/connection_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "math/rand" "net" "os" "runtime" @@ -102,11 +101,13 @@ func TestConnectionLargeWrite(t *testing.T) { func TestConnectionRead(t *testing.T) { r, w := GetSysFdPairs() var rconn, wconn = &connection{}, &connection{} - rconn.init(&netFD{fd: r}, nil) - wconn.init(&netFD{fd: w}, nil) + err := rconn.init(&netFD{fd: r}, nil) + MustNil(t, err) + err = wconn.init(&netFD{fd: w}, nil) + MustNil(t, err) var size = 256 - var cycleTime = 100000 + var cycleTime = 1000 var msg = make([]byte, size) var wg sync.WaitGroup wg.Add(1) @@ -389,34 +390,28 @@ func TestConnectionLargeMemory(t *testing.T) { rconn.init(&netFD{fd: r}, nil) var wg sync.WaitGroup - defer wg.Wait() - var rn, wn = 1024, 1 * 1024 * 1024 wg.Add(1) go func() { defer wg.Done() - rconn.Reader().Next(wn) + _, err := rconn.Reader().Next(wn) + MustNil(t, err) }() var msg = make([]byte, rn) for i := 0; i < wn/rn; i++ { n, err := syscall.Write(w, msg) if err != nil { - panic(err) - } - if n != rn { - panic(fmt.Sprintf("n[%d]!=rn[%d]", n, rn)) + MustNil(t, err) } - time.Sleep(time.Millisecond) + Equal(t, n, rn) } runtime.ReadMemStats(&end) alloc := end.TotalAlloc - start.TotalAlloc limit := uint64(4 * 1024 * 1024) - if alloc > limit { - panic(fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit)) - } + Assert(t, alloc <= limit, fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit)) } // TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly @@ -438,7 +433,7 @@ func TestConnectionUntil(t *testing.T) { rconn, wconn := &connection{}, &connection{} rconn.init(&netFD{fd: r}, nil) wconn.init(&netFD{fd: w}, nil) - loopSize := 100000 + loopSize := 10000 msg := make([]byte, 1002) msg[500], msg[1001] = '\n', '\n' @@ -466,37 +461,48 @@ func TestConnectionUntil(t *testing.T) { func TestBookSizeLargerThanMaxSize(t *testing.T) { r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} - rconn.init(&netFD{fd: r}, nil) - wconn.init(&netFD{fd: w}, nil) + rconn, wconn := &connection{}, &connection{} + err := rconn.init(&netFD{fd: r}, nil) + MustNil(t, err) + err = wconn.init(&netFD{fd: w}, nil) + MustNil(t, err) - var length = 25 - dataCollection := make([][]byte, length) - for i := 0; i < length; i++ { - dataCollection[i] = make([]byte, 2<<i) - for j := 0; j < 2<<i; j++ { - dataCollection[i][j] = byte(rand.Intn(256)) + // prepare data + maxSize := 1024 * 1024 * 128 + origin := make([][]byte, 0) + for size := maxSize; size > 0; size = size >> 1 { + ch := 'a' + byte(size%26) + origin = append(origin, make([]byte, size)) + for i := 0; i < size; i++ { + origin[len(origin)-1][i] = ch } } + // read var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - for i := 0; i < length; i++ { - buf, err := rconn.Reader().Next(2 << i) + idx := 0 + for size := maxSize; size > 0; size = size >> 1 { + buf, err := rconn.Reader().Next(size) MustNil(t, err) - Equal(t, string(buf), string(dataCollection[i])) - rconn.Reader().Release() + Equal(t, string(buf), string(origin[idx])) + err = rconn.Reader().Release() + MustNil(t, err) + idx++ } }() - for i := 0; i < length; i++ { - n, err := wconn.Write(dataCollection[i]) + + // write + for i := 0; i < len(origin); i++ { + n, err := wconn.Write(origin[i]) MustNil(t, err) - Equal(t, n, 2<<i) + Equal(t, n, len(origin[i])) } wg.Wait() rconn.Close() + wconn.Close() } func TestConnDetach(t *testing.T) { @@ -504,6 +510,8 @@ func TestConnDetach(t *testing.T) { ln, err := createTestListener("tcp", address) MustNil(t, err) + // accept => read => write + var wg sync.WaitGroup go func() { for { conn, err := ln.Accept() @@ -513,35 +521,33 @@ func TestConnDetach(t *testing.T) { if conn == nil { continue } + wg.Add(1) go func() { + defer wg.Done() buf := make([]byte, 1024) // slow read - for { - _, err := conn.Read(buf) - if err != nil { - return - } - time.Sleep(100 * time.Millisecond) - _, err = conn.Write(buf) - if err != nil { - return - } + _, err := conn.Read(buf) + if err != nil { + return + } + time.Sleep(10 * time.Millisecond) + _, err = conn.Write(buf) + if err != nil { + return } }() } }() + // dial => detach => write => read c, err := DialConnection("tcp", address, time.Second) MustNil(t, err) - conn := c.(*TCPConnection) - err = conn.Detach() MustNil(t, err) f := os.NewFile(uintptr(conn.fd), "netpoll-connection") defer f.Close() - gonetconn, err := net.FileConn(f) MustNil(t, err) buf := make([]byte, 1024) @@ -552,13 +558,14 @@ func TestConnDetach(t *testing.T) { err = gonetconn.Close() MustNil(t, err) - err = ln.Close() MustNil(t, err) + err = c.Close() + MustNil(t, err) + wg.Wait() } func TestParallelShortConnection(t *testing.T) { - t.Skip("TODO: it's not stable now, need fix CI") address := getTestAddress() ln, err := createTestListener("tcp", address) MustNil(t, err) @@ -599,11 +606,8 @@ func TestParallelShortConnection(t *testing.T) { } wg.Wait() - count := 100 - for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) { - t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize) - time.Sleep(time.Millisecond * 100) - count-- + for atomic.LoadInt64(&received) < int64(totalSize) { + runtime.Gosched() } Equal(t, atomic.LoadInt64(&received), int64(totalSize)) } @@ -626,7 +630,7 @@ func TestConnectionServerClose(t *testing.T) { var wg sync.WaitGroup el, err := NewEventLoop( func(ctx context.Context, connection Connection) error { - t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) + //t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) defer wg.Done() buf, err := connection.Reader().Next(len(PONG)) // pong Equal(t, string(buf), PONG) @@ -649,14 +653,14 @@ func TestConnectionServerClose(t *testing.T) { err = connection.Writer().Flush() MustNil(t, err) connection.AddCloseCallback(func(connection Connection) error { - t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) + //t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) wg.Done() return nil }) return ctx }), WithOnPrepare(func(connection Connection) context.Context { - t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) + //t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) defer wg.Done() return context.WithValue(context.Background(), "prepare", "true") }), @@ -697,13 +701,12 @@ func TestConnectionServerClose(t *testing.T) { err = conn.SetOnRequest(clientOnRequest) MustNil(t, err) conn.AddCloseCallback(func(connection Connection) error { - t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) + //t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) defer wg.Done() return nil }) }() } - //time.Sleep(time.Second) wg.Wait() } diff --git a/netpoll_config.go b/netpoll_config.go index 85c05925..dda65a35 100644 --- a/netpoll_config.go +++ b/netpoll_config.go @@ -17,12 +17,14 @@ package netpoll import ( "context" "io" + "time" ) // global config var ( - defaultLinkBufferSize = pagesize - featureAlwaysNoCopyRead = false + defaultLinkBufferSize = pagesize + defaultGracefulShutdownCheckInterval = time.Second + featureAlwaysNoCopyRead = false ) // Config expose some tuning parameters to control the internal behaviors of netpoll. diff --git a/netpoll_server.go b/netpoll_server.go index 78f26b59..ace92ba6 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -63,7 +63,7 @@ func (s *server) Close(ctx context.Context) error { s.operator.Control(PollDetach) s.ln.Close() - var ticker = time.NewTicker(time.Second) + var ticker = time.NewTicker(defaultGracefulShutdownCheckInterval) defer ticker.Stop() var hasConn bool for { diff --git a/netpoll_unix_test.go b/netpoll_unix_test.go index 4156a0b8..70638c57 100644 --- a/netpoll_unix_test.go +++ b/netpoll_unix_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "math/rand" "os" "runtime" "sync" @@ -65,6 +64,15 @@ func Assert(t *testing.T, cond bool, val ...interface{}) { } } +func TestMain(m *testing.M) { + // defaultGracefulShutdownCheckInterval will affect shutdown function running time, + // so for speed up tests, we change it to 10ms here + oldGracefulShutdownCheckInterval := defaultGracefulShutdownCheckInterval + defaultGracefulShutdownCheckInterval = time.Millisecond * 10 + m.Run() + defaultGracefulShutdownCheckInterval = oldGracefulShutdownCheckInterval +} + var testPort int32 = 10000 // getTestAddress return a unique port for every tests, so all tests will not share a same listerner @@ -270,40 +278,43 @@ func TestGracefulExit(t *testing.T) { MustNil(t, err) // exit with processing connections + trigger := make(chan struct{}) var eventLoop2 = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { - time.Sleep(10 * time.Second) + <-trigger return nil }) for i := 0; i < 10; i++ { - if i%2 == 0 { - var conn, err = DialConnection(network, address, time.Second) - MustNil(t, err) - _, err = conn.Write(make([]byte, 16)) - MustNil(t, err) - } + // connect success + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + _, err = conn.Write(make([]byte, 16)) + MustNil(t, err) } - var ctx2, cancel2 = context.WithTimeout(context.Background(), 5*time.Second) + // shutdown timeout + var ctx2, cancel2 = context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel2() err = eventLoop2.Shutdown(ctx2) MustTrue(t, err != nil) Equal(t, err.Error(), ctx2.Err().Error()) + // shutdown success + close(trigger) + err = eventLoop2.Shutdown(ctx2) + MustTrue(t, err == nil) - // exit with some processing connections + // exit with read connections + size := 16 var eventLoop3 = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { - time.Sleep(time.Duration(rand.Intn(3)) * time.Second) - if l := connection.Reader().Len(); l > 0 { - var _, err = connection.Reader().Next(l) - MustNil(t, err) - } + _, err := connection.Reader().Next(size) + MustNil(t, err) return nil }) for i := 0; i < 10; i++ { var conn, err = DialConnection(network, address, time.Second) MustNil(t, err) if i%2 == 0 { - _, err = conn.Write(make([]byte, 16)) + _, err := conn.Write(make([]byte, size)) MustNil(t, err) } } @@ -409,15 +420,12 @@ func TestCloseConnWhenOnConnect(t *testing.T) { func TestServerReadAndClose(t *testing.T) { var network, address = "tcp", getTestAddress() var sendMsg = []byte("hello") - var closed int32 var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(len(sendMsg)) MustNil(t, err) - err = connection.Close() MustNil(t, err) - atomic.AddInt32(&closed, 1) return nil }, ) @@ -429,14 +437,13 @@ func TestServerReadAndClose(t *testing.T) { err = conn.Writer().Flush() MustNil(t, err) - for atomic.LoadInt32(&closed) == 0 { + for conn.IsActive() { runtime.Gosched() // wait for poller close connection } - time.Sleep(time.Millisecond * 50) _, err = conn.Writer().WriteBinary(sendMsg) MustNil(t, err) err = conn.Writer().Flush() - MustTrue(t, errors.Is(err, ErrConnClosed)) + Assert(t, errors.Is(err, ErrConnClosed), err) err = loop.Shutdown(context.Background()) MustNil(t, err) diff --git a/poll_manager_test.go b/poll_manager_test.go index 63559051..c5648a76 100644 --- a/poll_manager_test.go +++ b/poll_manager_test.go @@ -21,14 +21,15 @@ import ( "runtime" "sync" "testing" - "time" ) func TestPollManager(t *testing.T) { r, w := GetSysFdPairs() var rconn, wconn = &connection{}, &connection{} - rconn.init(&netFD{fd: r}, nil) - wconn.init(&netFD{fd: w}, nil) + err := rconn.init(&netFD{fd: r}, nil) + MustNil(t, err) + err = wconn.init(&netFD{fd: w}, nil) + MustNil(t, err) var msg = []byte("hello world") n, err := wconn.Write(msg) @@ -41,9 +42,9 @@ func TestPollManager(t *testing.T) { err = wconn.Close() MustNil(t, err) - time.Sleep(10 * time.Millisecond) - MustTrue(t, !rconn.IsActive()) - MustTrue(t, !wconn.IsActive()) + for rconn.IsActive() || wconn.IsActive() { + runtime.Gosched() + } } func TestPollManagerReset(t *testing.T) { @@ -79,10 +80,7 @@ func TestPollManagerSetNumLoops(t *testing.T) { wg.Add(1) go func() { poll := pm.Pick() - newGs := runtime.NumGoroutine() - t.Logf("old=%d, new=%d", oldGs, newGs) Assert(t, poll != nil) - Assert(t, newGs-oldGs == 100) Assert(t, len(pm.polls) == 100) wg.Done() <-finish // hold goroutines