From 3a587fa885b9fb2e44905c52aaff6078887578d8 Mon Sep 17 00:00:00 2001 From: Qing Liu Date: Thu, 28 Nov 2024 06:16:15 +0000 Subject: [PATCH] splice: refactor pipe pool Sadly we could not use go's internal package, most of the refactor is from linux_splice.go inside go's internal package. Also removed unused APIs, we respect most of the APIs. --- splice/copy.go | 4 +- splice/copy_test.go | 4 +- splice/pair.go | 4 ++ splice/pair_darwin.go | 2 +- splice/pair_linux.go | 2 +- splice/pair_windows.go | 2 +- splice/pool.go | 103 ++++++++++----------------------------- splice/splice.go | 19 -------- splice/splice_darwin.go | 13 +++-- splice/splice_linux.go | 24 +++++++-- splice/splice_windows.go | 9 ++-- 11 files changed, 69 insertions(+), 117 deletions(-) diff --git a/splice/copy.go b/splice/copy.go index e50ca0d12..ebe44527f 100644 --- a/splice/copy.go +++ b/splice/copy.go @@ -54,11 +54,11 @@ func CopyFile(dstName string, srcName string, mode int) error { } func CopyFds(dst *os.File, src *os.File) (err error) { - p, err := splicePool.get() + p, err := Get() if p != nil { p.Grow(256 * 1024) _, err := SpliceCopy(dst, src, p) - splicePool.done(p) + Done(p) return err } else { _, err = io.Copy(dst, src) diff --git a/splice/copy_test.go b/splice/copy_test.go index 1492a2a21..8bd842931 100644 --- a/splice/copy_test.go +++ b/splice/copy_test.go @@ -56,15 +56,13 @@ func TestSpliceCopy(t *testing.T) { if maxPipeSize%4096 != 0 || maxPipeSize < 4096 { t.Error("pipe size should be page size multiple", maxPipeSize) } - pool := newSplicePairPool() - p, err := pool.get() + p, err := Get() if p != nil { p.MaxGrow() t.Logf("Splice size %d", p.size) SpliceCopy(dst, src, p) dst.Close() src.Close() - p.Close() } else { t.Error("Could not open splice: ", err) } diff --git a/splice/pair.go b/splice/pair.go index 1fe822e8c..dc52a6fb4 100644 --- a/splice/pair.go +++ b/splice/pair.go @@ -11,6 +11,10 @@ import ( type Pair struct { r, w int size int + + // We want to use a finalizer, so ensure that the size is + // large enough to not use the tiny allocator. + _ [12]byte } func (p *Pair) MaxGrow() { diff --git a/splice/pair_darwin.go b/splice/pair_darwin.go index 81c1f091b..0e0a8b385 100644 --- a/splice/pair_darwin.go +++ b/splice/pair_darwin.go @@ -20,7 +20,7 @@ func (p *Pair) discard() { panic("not implemented") } -func (p *Pair) Close() error { +func (p *Pair) close() error { panic("not implemented") } diff --git a/splice/pair_linux.go b/splice/pair_linux.go index 26df205ab..2dc6781fb 100644 --- a/splice/pair_linux.go +++ b/splice/pair_linux.go @@ -54,7 +54,7 @@ func (p *Pair) discard() { } } -func (p *Pair) Close() error { +func (p *Pair) close() error { err1 := syscall.Close(p.r) err2 := syscall.Close(p.w) if err1 != nil { diff --git a/splice/pair_windows.go b/splice/pair_windows.go index 81c1f091b..0e0a8b385 100644 --- a/splice/pair_windows.go +++ b/splice/pair_windows.go @@ -20,7 +20,7 @@ func (p *Pair) discard() { panic("not implemented") } -func (p *Pair) Close() error { +func (p *Pair) close() error { panic("not implemented") } diff --git a/splice/pool.go b/splice/pool.go index 93598959e..6bec3f741 100644 --- a/splice/pool.go +++ b/splice/pool.go @@ -5,10 +5,25 @@ package splice import ( + "fmt" + "runtime" "sync" ) -var splicePool *pairPool +var splicePool = sync.Pool{ + New: newPoolPipe, +} + +func newPoolPipe() interface{} { + // Discard the error which occurred during the creation of pipe buffer, + // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. + p := newPipe() + if p == nil { + return nil + } + runtime.SetFinalizer(p, destroyPipe) + return p +} type pairPool struct { sync.Mutex @@ -16,90 +31,22 @@ type pairPool struct { usedCount int } -func ClearSplicePool() { - splicePool.clear() -} - func Get() (*Pair, error) { - return splicePool.get() -} - -func Total() int { - return splicePool.total() -} - -func Used() int { - return splicePool.used() + p := splicePool.Get() + if p == nil { + return nil, fmt.Errorf("create pipe failed") + } + return p.(*Pair), nil } // Done returns the pipe pair to pool. func Done(p *Pair) { - splicePool.done(p) + p.discard() + splicePool.Put(p) } // Closes and discards pipe pair. func Drop(p *Pair) { - splicePool.drop(p) -} - -func newSplicePairPool() *pairPool { - return &pairPool{} -} - -func (pp *pairPool) clear() { - pp.Lock() - for _, p := range pp.unused { - p.Close() - } - pp.unused = pp.unused[:0] - pp.Unlock() -} - -func (pp *pairPool) used() (n int) { - pp.Lock() - n = pp.usedCount - pp.Unlock() - - return n -} - -func (pp *pairPool) total() int { - pp.Lock() - n := pp.usedCount + len(pp.unused) - pp.Unlock() - return n -} - -func (pp *pairPool) drop(p *Pair) { - p.Close() - pp.Lock() - pp.usedCount-- - pp.Unlock() -} - -func (pp *pairPool) get() (p *Pair, err error) { - pp.Lock() - defer pp.Unlock() - - pp.usedCount++ - l := len(pp.unused) - if l > 0 { - p := pp.unused[l-1] - pp.unused = pp.unused[:l-1] - return p, nil - } - - return newSplicePair() -} - -func (pp *pairPool) done(p *Pair) { - p.discard() - pp.Lock() - pp.usedCount-- - pp.unused = append(pp.unused, p) - pp.Unlock() -} - -func init() { - splicePool = newSplicePairPool() + runtime.SetFinalizer(p, nil) + destroyPipe(p) } diff --git a/splice/splice.go b/splice/splice.go index 9a635c491..09b9c61ec 100644 --- a/splice/splice.go +++ b/splice/splice.go @@ -66,22 +66,3 @@ func init() { const F_SETPIPE_SZ = 1031 const F_GETPIPE_SZ = 1032 - -func newSplicePair() (p *Pair, err error) { - p = &Pair{} - p.r, p.w, err = osPipe() - if err != nil { - return nil, err - } - var errNo syscall.Errno - p.size, errNo = fcntl(uintptr(p.r), F_GETPIPE_SZ, 0) - if errNo == syscall.EINVAL { - p.size = DefaultPipeSize - return p, nil - } - if errNo != 0 { - p.Close() - return nil, fmt.Errorf("fcntl getsize: %v", errNo) - } - return p, nil -} diff --git a/splice/splice_darwin.go b/splice/splice_darwin.go index a2c29f4cf..584849315 100644 --- a/splice/splice_darwin.go +++ b/splice/splice_darwin.go @@ -1,10 +1,17 @@ package splice import ( - "fmt" "syscall" ) +func newPipe() *Pair { + return nil +} + +func destroyPipe(p *Pair) { + panic("not implemented") +} + // copy & paste from syscall. func fcntl(fd uintptr, cmd int, arg int) (val int, errno syscall.Errno) { r0, _, e1 := syscall.Syscall(syscall.SYS_FCNTL, fd, uintptr(cmd), uintptr(arg)) @@ -12,7 +19,3 @@ func fcntl(fd uintptr, cmd int, arg int) (val int, errno syscall.Errno) { errno = syscall.Errno(e1) return } - -func osPipe() (int, int, error) { - return 0, 0, fmt.Errorf("not implemented") -} diff --git a/splice/splice_linux.go b/splice/splice_linux.go index 64a94f6fd..a2e6a8c51 100644 --- a/splice/splice_linux.go +++ b/splice/splice_linux.go @@ -1,6 +1,9 @@ package splice -import "syscall" +import ( + "log" + "syscall" +) // copy & paste from syscall. func fcntl(fd uintptr, cmd int, arg int) (val int, errno syscall.Errno) { @@ -10,8 +13,21 @@ func fcntl(fd uintptr, cmd int, arg int) (val int, errno syscall.Errno) { return } -func osPipe() (int, int, error) { +func newPipe() *Pair { var fds [2]int - err := syscall.Pipe2(fds[:], syscall.O_NONBLOCK) - return fds[0], fds[1], err + var err error + err = syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK) + if err != nil { + log.Printf("Warning: create pipe failed: %v\n", err) + return nil + } + fcntl(uintptr(fds[0]), syscall.F_SETPIPE_SZ, maxPipeSize) + return &Pair{r: fds[0], w: fds[1], size: maxPipeSize} +} + +func destroyPipe(p *Pair) { + err := p.close() + if err != nil { + log.Printf("close pipe failed: %v\n", err) + } } diff --git a/splice/splice_windows.go b/splice/splice_windows.go index 4f0b57696..4e3b75e99 100644 --- a/splice/splice_windows.go +++ b/splice/splice_windows.go @@ -1,12 +1,15 @@ package splice import ( - "fmt" "syscall" ) -func osPipe() (int, int, error) { - return 0, 0, fmt.Errorf("not implemented") +func newPipe() *Pair { + return nil +} + +func destroyPipe(p *Pair) { + panic("not implemented") } func fcntl(fd uintptr, cmd int, arg int) (val int, errno syscall.Errno) {