Skip to content

Commit

Permalink
fix: discard connections if out of fd
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Feb 22, 2024
1 parent 7ba622b commit a1c0872
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 27 deletions.
10 changes: 9 additions & 1 deletion net_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) {
// tcp
var fd, sa, err = syscall.Accept(ln.fd)
if err != nil {
if err == syscall.EAGAIN {
/* https://man7.org/linux/man-pages/man2/accept.2.html
EAGAIN or EWOULDBLOCK
The socket is marked nonblocking and no connections are
present to be accepted. POSIX.1-2001 and POSIX.1-2008
allow either error to be returned for this case, and do
not require these constants to have the same value, so a
portable application should check for both possibilities.
*/
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
return nil, nil
}
return nil, err
Expand Down
107 changes: 81 additions & 26 deletions netpoll_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ package netpoll
import (
"context"
"errors"
"os"
"strings"
"sync"
"syscall"
"time"
)

// newServer wrap listener into server, quit will be invoked when server exit.
func newServer(ln Listener, opts *options, onQuit func(err error)) *server {
f, _ := os.Open("/dev/null")
return &server{
ln: ln,
opts: opts,
onQuit: onQuit,
ln: ln,
opts: opts,
onQuit: onQuit,
urgentfd: f,
}
}

Expand All @@ -40,6 +44,7 @@ type server struct {
opts *options
onQuit func(err error)
connections sync.Map // key=fd, value=connection
urgentfd *os.File
}

// Run this server.
Expand Down Expand Up @@ -92,39 +97,89 @@ func (s *server) Close(ctx context.Context) error {
func (s *server) OnRead(p Poll) error {
// accept socket
conn, err := s.ln.Accept()
if err != nil {
// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.onQuit(err)
return err
if err == nil {
if conn != nil {
s.onAccept(conn.(Conn))
}
logger.Println("NETPOLL: accept conn failed:", err.Error())
return err
}
if conn == nil {
// EAGAIN | EWOULDBLOCK if conn and err both nil
return nil
}
logger.Println("NETPOLL: accept conn failed:", err.Error(), isOutOfFdErr(err))

// delay accept when too many open files
if false && isOutOfFdErr(err) {
// since we use Epoll LT, we have to detach listener fd from epoll first
// and re-register it when accept successfully or there is no available connection
cerr := s.operator.Control(PollDetach)
if cerr != nil {
logger.Println("NETPOLL: detach listener fd failed:", cerr.Error())
}
go func() {
println("create")
defer println("finish")
retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms
retryTimeIndex := 0
for {
if retryTimeIndex > 0 {
time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond)
}
conn, err := s.ln.Accept()
println(conn, err)
if err == nil {
if conn == nil {
// recovery accept poll loop
s.operator.Control(PollReadable)
println("err and conn == nil")
return
}
s.onAccept(conn.(Conn))
logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr())
retryTimeIndex = 0
continue
}
if retryTimeIndex+1 < len(retryTimes) {
retryTimeIndex++
}
logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex])
}
}()
}

// shut down
if strings.Contains(err.Error(), "closed") {
s.operator.Control(PollDetach)
s.onQuit(err)
return err
}

return err
}

// OnHup implements FDOperator.
func (s *server) OnHup(p Poll) error {
s.onQuit(errors.New("listener close"))
return nil
}

func (s *server) onAccept(conn Conn) {
// store & register connection
var connection = &connection{}
connection.init(conn.(Conn), s.opts)
if !connection.IsActive() {
return nil
var nconn = new(connection)
nconn.init(conn, s.opts)
if !nconn.IsActive() {
return
}
var fd = conn.(Conn).Fd()
connection.AddCloseCallback(func(connection Connection) error {
var fd = conn.Fd()
nconn.AddCloseCallback(func(connection Connection) error {
s.connections.Delete(fd)
return nil
})
s.connections.Store(fd, connection)
s.connections.Store(fd, nconn)

// trigger onConnect asynchronously
connection.onConnect()
return nil
nconn.onConnect()
}

// OnHup implements FDOperator.
func (s *server) OnHup(p Poll) error {
s.onQuit(errors.New("listener close"))
return nil
func isOutOfFdErr(err error) bool {
se, ok := err.(syscall.Errno)
return ok && (se == syscall.EMFILE || se == syscall.ENFILE)
}
86 changes: 86 additions & 0 deletions netpoll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package netpoll

import (
"bufio"
"context"
"errors"
"fmt"
"math/rand"
"os/exec"
"runtime"
"strconv"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -507,6 +512,62 @@ func TestClientWriteAndClose(t *testing.T) {
MustNil(t, err)
}

func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) {
//t.Skip("Only test for debug purpose")

var originalRlimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
MustNil(t, err)
t.Logf("Original RLimit: %v", originalRlimit)

var connections = 100
rlimit := syscall.Rlimit{Cur: uint64(connections), Max: originalRlimit.Max}
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit)
MustNil(t, err)
err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit)
MustNil(t, err)
t.Logf("New RLimit: %v", rlimit)
defer func() { // reset
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit)
MustNil(t, err)
}()

var network, address = "tcp", ":18888"
var connected int32
var loop = newTestEventLoop(network, address,
func(ctx context.Context, connection Connection) error {
buf, err := connection.Reader().Next(connection.Reader().Len())
connection.Writer().WriteBinary(buf)
connection.Writer().Flush()
return err
},
WithOnConnect(func(ctx context.Context, connection Connection) context.Context {
atomic.AddInt32(&connected, 1)
t.Logf("Conn[%s] accpeted", connection.RemoteAddr())
go func() {
time.Sleep(time.Second * 100)
connection.Close()
}()
return ctx
}),
WithOnDisconnect(func(ctx context.Context, connection Connection) {
t.Logf("Conn[%s] disconnected", connection.RemoteAddr())
}),
)

go func() {
err = createTestConnection("localhost", "18888", connections, 10*time.Second)
MustNil(t, err)
}()
for atomic.LoadInt32(&connected) < int32(connections) {
t.Logf("connected=%d", atomic.LoadInt32(&connected))
time.Sleep(time.Millisecond * 500)
}

err = loop.Shutdown(context.Background())
MustNil(t, err)
}

func createTestListener(network, address string) (Listener, error) {
for {
ln, err := CreateListener(network, address)
Expand All @@ -517,6 +578,31 @@ func createTestListener(network, address string) (Listener, error) {
}
}

func createTestConnection(ip string, port string, conns int, duration time.Duration) error {
sconns := strconv.Itoa(conns)
sduration := strconv.Itoa(int(duration.Seconds()))
cmd := exec.Command("./test_conns.sh", ip, port, sconns, sduration)

stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
go func() {
reader := bufio.NewReader(stdout)
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
fmt.Print(line)
_ = line
}
}()

return cmd.Run()
}

func newTestEventLoop(network, address string, onRequest OnRequest, opts ...Option) EventLoop {
ln, err := createTestListener(network, address)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (p *defaultPoll) Wait() error {
if operator.OnRead != nil {
// for non-connection
operator.OnRead(p)
println("operator OnRead", operator.FD, n)
} else {
// only for connection
var bs = operator.Inputs(barriers[i].bs)
Expand Down
13 changes: 13 additions & 0 deletions test_conns.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

ip="$1"
port="$2"
conns="$3"
timeout="$4"

for i in $(seq 1 $conns);
do
nc -v -d -w $timeout $ip $port < /dev/null &
done

wait

0 comments on commit a1c0872

Please sign in to comment.