Skip to content

Commit

Permalink
fix: delay accept new connections if out of fds
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Feb 22, 2024
1 parent 7ba622b commit a3e29f5
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 24 deletions.
10 changes: 9 additions & 1 deletion net_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) {
// tcp
var fd, sa, err = syscall.Accept(ln.fd)
if err != nil {
if err == syscall.EAGAIN {
/* https://man7.org/linux/man-pages/man2/accept.2.html
EAGAIN or EWOULDBLOCK
The socket is marked nonblocking and no connections are
present to be accepted. POSIX.1-2001 and POSIX.1-2008
allow either error to be returned for this case, and do
not require these constants to have the same value, so a
portable application should check for both possibilities.
*/
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
return nil, nil
}
return nil, err
Expand Down
95 changes: 72 additions & 23 deletions netpoll_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"strings"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -92,39 +93,87 @@ func (s *server) Close(ctx context.Context) error {
func (s *server) OnRead(p Poll) error {
// accept socket
conn, err := s.ln.Accept()
if err != nil {
// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.onQuit(err)
return err
if err == nil {
if conn != nil {
s.onAccept(conn.(Conn))
}
logger.Println("NETPOLL: accept conn failed:", err.Error())
return err
}
if conn == nil {
// EAGAIN | EWOULDBLOCK if conn and err both nil
return nil
}
logger.Println("NETPOLL: accept conn failed:", err.Error(), isOutOfFdErr(err))

// delay accept when too many open files
if isOutOfFdErr(err) {
// since we use Epoll LT, we have to detach listener fd from epoll first
// and re-register it when accept successfully or there is no available connection
cerr := s.operator.Control(PollDetach)
if cerr != nil {
logger.Println("NETPOLL: detach listener fd failed:", cerr.Error())
}
go func() {
retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms
retryTimeIndex := 0
for {
if retryTimeIndex > 0 {
time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond)
}
conn, err := s.ln.Accept()
println(conn, err)
if err == nil {
if conn == nil {
// recovery accept poll loop
s.operator.Control(PollReadable)
println("err and conn == nil")
return
}
s.onAccept(conn.(Conn))
logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr())
retryTimeIndex = 0
continue
}
if retryTimeIndex+1 < len(retryTimes) {
retryTimeIndex++
}
logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex])
}
}()
}

// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.onQuit(err)
return err
}

return err
}

// OnHup implements FDOperator.
func (s *server) OnHup(p Poll) error {
s.onQuit(errors.New("listener close"))
return nil
}

func (s *server) onAccept(conn Conn) {
// store & register connection
var connection = &connection{}
connection.init(conn.(Conn), s.opts)
if !connection.IsActive() {
return nil
var nconn = new(connection)
nconn.init(conn, s.opts)
if !nconn.IsActive() {
return
}
var fd = conn.(Conn).Fd()
connection.AddCloseCallback(func(connection Connection) error {
var fd = conn.Fd()
nconn.AddCloseCallback(func(connection Connection) error {
s.connections.Delete(fd)
return nil
})
s.connections.Store(fd, connection)
s.connections.Store(fd, nconn)

// trigger onConnect asynchronously
connection.onConnect()
return nil
nconn.onConnect()
}

// OnHup implements FDOperator.
func (s *server) OnHup(p Poll) error {
s.onQuit(errors.New("listener close"))
return nil
func isOutOfFdErr(err error) bool {
se, ok := err.(syscall.Errno)
return ok && (se == syscall.EMFILE || se == syscall.ENFILE)
}
73 changes: 73 additions & 0 deletions netpoll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"context"
"errors"
"math/rand"
"os"
"runtime"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -507,6 +509,77 @@ func TestClientWriteAndClose(t *testing.T) {
MustNil(t, err)
}

func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) {
if os.Getenv("N_LOCAL") == "" {
t.Skip("Only test for debug purpose")
return
}

var originalRlimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
MustNil(t, err)
t.Logf("Original RLimit: %v", originalRlimit)

rlimit := syscall.Rlimit{Cur: 32, Max: originalRlimit.Max}
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit)
MustNil(t, err)
err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit)
MustNil(t, err)
t.Logf("New RLimit: %v", rlimit)
defer func() { // reset
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
MustNil(t, err)
}()

var network, address = "tcp", ":18888"
var connected int32
var loop = newTestEventLoop(network, address,
func(ctx context.Context, connection Connection) error {
buf, err := connection.Reader().Next(connection.Reader().Len())
connection.Writer().WriteBinary(buf)
connection.Writer().Flush()
return err
},
WithOnConnect(func(ctx context.Context, connection Connection) context.Context {
atomic.AddInt32(&connected, 1)
t.Logf("Conn[%s] accpeted", connection.RemoteAddr())
return ctx
}),
WithOnDisconnect(func(ctx context.Context, connection Connection) {
t.Logf("Conn[%s] disconnected", connection.RemoteAddr())
}),
)

// out of fds
files := make([]*os.File, 0)
for {
f, err := os.Open("/dev/null")
if err != nil {
Assert(t, isOutOfFdErr(errors.Unwrap(err)), err)
break
}
files = append(files, f)
}
go func() {
time.Sleep(time.Second * 10)
t.Logf("close all files")
for _, f := range files {
f.Close()
}
}()

// we should use telnet manually
var connections = 1
for atomic.LoadInt32(&connected) < int32(connections) {
t.Logf("connected=%d", atomic.LoadInt32(&connected))
time.Sleep(time.Second)
}
time.Sleep(time.Second * 10)

err = loop.Shutdown(context.Background())
MustNil(t, err)
}

func createTestListener(network, address string) (Listener, error) {
for {
ln, err := CreateListener(network, address)
Expand Down
13 changes: 13 additions & 0 deletions test_conns.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

ip="$1"
port="$2"
conns="$3"
timeout="$4"

for i in $(seq 1 $conns);
do
nc -v -w $timeout $ip $port < /dev/null &
done

wait

0 comments on commit a3e29f5

Please sign in to comment.