From dbcfaf9da11b39db3fa38b49f1efa254928902ad Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 16 Sep 2024 11:42:10 +0800 Subject: [PATCH] Interrupt long standing requests (#27) --- fuse/api.go | 5 +++++ fuse/server.go | 47 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/fuse/api.go b/fuse/api.go index 7db5f0d0..6486538d 100644 --- a/fuse/api.go +++ b/fuse/api.go @@ -90,6 +90,8 @@ // you care about correctness. package fuse +import "time" + // Types for users to implement. // The result of Read is an array of bytes, but for performance @@ -227,6 +229,9 @@ type MountOptions struct { // don't alloc buffer for read operation NoAllocForRead bool + + // max duration for a request + Timeout time.Duration } // RawFileSystem is an interface close to the FUSE wire protocol. diff --git a/fuse/server.go b/fuse/server.go index 98fd4d0c..1fd5587c 100644 --- a/fuse/server.go +++ b/fuse/server.go @@ -52,6 +52,7 @@ type Server struct { // maxReaders is the maximum number of goroutines reading requests maxReaders int + maxUnique uint64 // Pools for []byte buffers bufferPool @@ -382,9 +383,7 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { } req = ms.reqPool.Get().(*request) - if ms.latencies != nil { - req.startTime = time.Now() - } + req.startTime = time.Now() gobbled := req.setInput(dest[:n]) if !gobbled { ms.readPool.Put(dest) @@ -400,6 +399,9 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { if ms.recentUnique != nil { ms.recentUnique = append(ms.recentUnique, req.inHeader.Unique) } + if req.inHeader.Unique > ms.maxUnique { + ms.maxUnique = req.inHeader.Unique + } req.inflightIndex = len(ms.reqInflight) ms.reqInflight = append(ms.reqInflight, req) @@ -422,13 +424,14 @@ func (ms *Server) checkLostRequests() { var recentUnique []uint64 time.Sleep(time.Second * 3) for { - if len(ms.recentUnique) > 10 { - ms.reqMu.Lock() + ms.reqMu.Lock() + if len(ms.recentUnique) >= 30 { recentUnique = ms.recentUnique ms.recentUnique = nil ms.reqMu.Unlock() break } + ms.reqMu.Unlock() time.Sleep(time.Second) } @@ -444,7 +447,7 @@ func (ms *Server) checkLostRequests() { // interrupt historic ones last = recentUnique[0] - 1 var c int - for last > 0 && c < 3e6 { + for last > 0 && c < 6e6 { ms.returnInterrupted(last) last-- c++ @@ -522,6 +525,9 @@ func (ms *Server) recordStats(req *request) { // // Each filesystem operation executes in a separate goroutine. func (ms *Server) Serve() { + if ms.opts.Timeout > 0 { + go ms.checkRequestTimeout(ms.opts.Timeout) + } ms.loop(false) ms.loops.Wait() @@ -560,6 +566,35 @@ func (ms *Server) wakeupReader() { _ = cmd.Run() } +func (ms *Server) checkRequestTimeout(timeout time.Duration) { + for { + time.Sleep(time.Second) + var batch = 100 + for i := 0; ; i += batch { + now := time.Now() + ms.reqMu.Lock() + if ms.shutdown || i >= len(ms.reqInflight) { + ms.reqMu.Unlock() + break + } + for j := 0; j < batch && i+j < len(ms.reqInflight); j++ { + req := ms.reqInflight[i+j] + if req.interrupted { + unique := req.inHeader.Unique + ms.reqMu.Unlock() + ms.returnInterrupted(unique) + ms.reqMu.Lock() + } else if used := now.Sub(req.startTime); used > timeout || req.inHeader.Unique+5.5e6 < ms.maxUnique { + log.Printf("interrupt request %d after %s: %+v", req.inHeader.Unique, used, req.inHeader) + req.interrupted = true + close(req.cancel) + } + } + ms.reqMu.Unlock() + } + } +} + func (ms *Server) Shutdown() bool { log.Printf("try to restart gracefully") start := time.Now()