From d1c0a595a350d5c1faa726f6b762ecc8e01706f9 Mon Sep 17 00:00:00 2001 From: dvovk Date: Tue, 3 Dec 2024 08:41:40 +0000 Subject: [PATCH] save --- peer.go | 83 +++++++++++++++++++++++----------------- request-strategy/peer.go | 47 +++++++++++++++++++++++ requesting.go | 2 +- torrent.go | 1 + webseed-peer.go | 35 ++++++++++++++--- webseed/client.go | 6 +++ 6 files changed, 133 insertions(+), 41 deletions(-) diff --git a/peer.go b/peer.go index a2cbe311a7..6e9ec49d7c 100644 --- a/peer.go +++ b/peer.go @@ -134,6 +134,37 @@ func (p *Peer) Torrent() *Torrent { return p.t } +func LOGDBG(msg string, source string) { + //create current time string with format [12-03|07:55:09.819] + currentTime := time.Now().Format("01-02|15:04:05.000") + + fmt.Printf("[%s] %s - source:%s\n", currentTime, msg, source) +} + +func (p *Peer) AddRequestCancelled(r RequestIndex, source string) bool { + result := p.requestState.Cancelled.CheckedAdd(r) + if(result) { + p.requestState.CancelCounter.Store(int32(p.requestState.CancelCounter.Load()) + 1) + LOGDBG(fmt.Sprintf("Added to cancel request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source) + } else { + LOGDBG(fmt.Sprintf("Already in cancelled request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source) + } + + return result +} + +func (p *Peer) RemoveRequestCancelled(r RequestIndex, source string) bool { + result := p.requestState.Cancelled.CheckedRemove(r) + if(result) { + p.requestState.CancelCounter.Store(int32(p.requestState.CancelCounter.Load()) - 1) + LOGDBG(fmt.Sprintf("Request removed request:%v-%s", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source) + } else { + LOGDBG(fmt.Sprintf("Request already in removed request:%v-%s", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source) + } + + return result +} + func (p *Peer) initRequestState() { p.mu.Lock() defer p.mu.Unlock() @@ -571,13 +602,7 @@ func (cn *Peer) request(r RequestIndex, maxRequests int, lock bool, lockTorrent // this is required in case this is a re-request of a previously // cancelled request - we need to clear the cancelled flag //cn.requestState.Cancelled.Remove(r) - removed := cn.requestState.Cancelled.CheckedRemove(r) - if removed { - cn.requestState.CancelCounter.Store(int32(cn.requestState.CancelCounter.Load()) - 1) - cn.logger.Levelf(log.Debug, "1 request %d removed by peer remote adderss %s ", r, cn.RemoteAddr.String() + "" + cn.t.info.Name) - } else { - cn.logger.Levelf(log.Debug, "1 request %d NOT removed by peer remote adderss %s : %s", r, cn.RemoteAddr.String() + "" + cn.t.info.Name) - } + _ = cn.RemoveRequestCancelled(r, "peer.go->Peer.request") if cn.validReceiveChunks == nil { cn.validReceiveChunks = make(map[RequestIndex]int) @@ -614,16 +639,18 @@ func (me *Peer) cancel(r RequestIndex, lock bool, lockTorrent bool) { if !me.deleteRequest(r, false, false) { panic(fmt.Sprintf("request %d not existing: should have been guarded", r)) } + + LOGDBG(fmt.Sprintf("start-Cancel request:%v-%s\n", r, me.RemoteAddr.String() + "/" + me.t.info.Name), "peer.go->Peer.cancel") if me._cancel(r, false, false) { // Record that we expect to get a cancel ack. - added := me.requestState.Cancelled.CheckedAdd(r) + added := me.AddRequestCancelled(r, "peer.go->Peer.cancel") if !added { panic(fmt.Sprintf("request %d: already cancelled for hash: %s", r, me.t.InfoHash())) - } else { - me.logger.Levelf(log.Debug, "Add to cancel %d peer remote adderss: %s", r, me.RemoteAddr.String() + "" + me.t.info.Name) - me.requestState.CancelCounter.Store(int32(me.requestState.CancelCounter.Load()) + 1) - } + } } + + LOGDBG(fmt.Sprintf("end-Cancel request:%v-%s\n", r, me.RemoteAddr.String() + "/" + me.t.info.Name), "peer.go->Peer.cancel") + me.decPeakRequests(false) }() } @@ -744,15 +771,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { if c.deleteRequest(r, false, false) { c.decPeakRequests(false) } else { - - removed := c.requestState.Cancelled.CheckedRemove(r) - if removed { - c.requestState.CancelCounter.Store(int32(c.requestState.CancelCounter.Load()) - 1) - c.logger.Levelf(log.Debug, "3 request %d removed by peer remote adderss %s ", r, c.RemoteAddr.String() + "" + c.t.info.Name) - } else { - c.logger.Levelf(log.Debug, "3 request %d NOT removed by peer remote adderss %s ", r, c.RemoteAddr.String() + "" + c.t.info.Name) - } - + removed := c.RemoveRequestCancelled(r, "peer.go->Peer.remoteRejectedRequest") if !removed { return false } @@ -859,16 +878,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { defer c.mu.Unlock() // Request has been satisfied. - - removed := c.requestState.Cancelled.CheckedRemove(req) - if removed { - c.requestState.CancelCounter.Store(int32(c.requestState.CancelCounter.Load()) - 1) - c.logger.Levelf(log.Debug, "2 request %d removed by peer remote adderss %s ", req, c.RemoteAddr.String() + "" + c.t.info.Name) - } else { - c.logger.Levelf(log.Debug, "2 request %d NOT removed by peer remote adderss %s ", req, c.RemoteAddr.String() + "" + c.t.info.Name) - } - - if c.deleteRequest(req, false, false) || removed { + if c.deleteRequest(req, false, false) || c.RemoveRequestCancelled(req, "peer.go->Peer.receiveChunk") { intended = true if !c.peerChoking { c._chunksReceivedWhileExpecting++ @@ -881,7 +891,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Do we actually want this chunk? if t.haveChunk(ppReq, false) { // panic(fmt.Sprintf("%+v", ppReq)) - c.logger.Levelf(log.Debug, "haveChunk %d peer remote adderss %s: %s", req, c.RemoteAddr.String() + "" + c.t.info.Name) + LOGDBG(fmt.Sprintf("haveChunk request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk") chunksReceived.Add("redundant", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil, false, nil @@ -914,14 +924,15 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Cancel pending requests for this chunk from *other* peers. if p := t.requestingPeer(req, false); p != nil { - p.logger.Levelf(log.Debug, "Cancel pending request %d peer remote adderss %s: %s", req, p.RemoteAddr.String() + "" + p.t.info.Name) + LOGDBG(fmt.Sprintf("start-Cancel pending request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk") if p == c { panic("should not be pending request from conn that just received it") } + LOGDBG(fmt.Sprintf("end-Cancel pending request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk") p.cancel(req, true, false) if p.isLowOnRequests(true, false) { p.updateRequests("Peer.receiveChunk", false) - p.logger.Levelf(log.Debug, "peer low on requests, remote adderss %s: %s", req, p.RemoteAddr.String() + "" + p.t.info.Name) + LOGDBG(fmt.Sprintf("peer low on requests request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk") } } @@ -1044,7 +1055,7 @@ func (c *Peer) deleteRequest(r RequestIndex, lock bool, lockTorrent bool) bool { } removed := c.requestState.Requests.CheckedRemove(r) - c.logger.Levelf(log.Debug, "1request %d removed by peer remote adderss %s - succesfully: %s", r, c.RemoteAddr.String() + "" + c.t.info.Name, removed) + if !removed { return false } @@ -1113,6 +1124,7 @@ func (c *Peer) cancelAllRequests(lock bool, lockTorrent bool) { } c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool { + LOGDBG(fmt.Sprintf("cancelAllRequests request:%v-%s\n", x, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.cancelAllRequests") c.cancel(x, false, false) return true }) @@ -1208,6 +1220,7 @@ func (p *Peer) decPeakRequests(lock bool) { defer p.mu.RUnlock() } p.peakRequests-- + p.logger.Levelf(log.Debug, "decPeakRequests %s", p.RemoteAddr.String() + "/" + p.t.info.Name) } func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) { diff --git a/request-strategy/peer.go b/request-strategy/peer.go index 2d9af1b993..34a9e8fb10 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -35,3 +35,50 @@ type PeerRequests interface { Bitmap() *typedRoaring.Bitmap[RequestIndex] } + +/*type SyncBitmap struct { + mu sync.Mutex + bitmap typedRoaring.Bitmap[RequestIndex] +} + +func (sb *SyncBitmap) GetCardinality() uint64 { + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.bitmap.GetCardinality() +} + +func (sb *SyncBitmap) Contains(r RequestIndex) bool { + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.bitmap.Contains(r) +} + +func (sb *SyncBitmap) Remove(r RequestIndex) { + sb.mu.Lock() + defer sb.mu.Unlock() + sb.bitmap.Remove(r) +} + +func (sb *SyncBitmap) CheckedAdd(r RequestIndex) bool{ + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.bitmap.CheckedAdd(r) +} + +func (sb *SyncBitmap) CheckedRemove(r RequestIndex) bool{ + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.bitmap.CheckedRemove(r) +} + +func (sb *SyncBitmap) Clone() typedRoaring.Bitmap[RequestIndex] { + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.bitmap.Clone() +} + +func (sb *SyncBitmap) IsEmpty() bool { + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.bitmap.IsEmpty() +}*/ \ No newline at end of file diff --git a/requesting.go b/requesting.go index f2eaa80481..b38ee74929 100644 --- a/requesting.go +++ b/requesting.go @@ -264,7 +264,7 @@ func (p *Peer) getDesiredRequestState(debug bool, lock bool, lockTorrent bool) ( if !cancelled.IsEmpty() && cancelled.Contains(r) { // Can't re-request while awaiting acknowledgement. - p.logger.Levelf(log.Debug, "cancelled.Contains %d peer remote adderss %s", r, p.RemoteAddr.String() + "" + p.t.info.Name) + LOGDBG(fmt.Sprintf("cancelled.Contains request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), "requesting.go->getDesiredRequestState") awaitingCancelCount++ return } diff --git a/torrent.go b/torrent.go index 47a097ebad..efe6d99708 100644 --- a/torrent.go +++ b/torrent.go @@ -3446,6 +3446,7 @@ func (t *Torrent) cancelRequest(r RequestIndex, updateRequests, lock bool) *Peer p := t.requestingPeer(r, false) if p != nil { + LOGDBG(fmt.Sprintf("cancelling request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), "torrent.go->cancelRequest") p.cancel(r, true, false) if updateRequests && p.isLowOnRequests(true, lock) { diff --git a/webseed-peer.go b/webseed-peer.go index 2bdbcc54c2..f35b4abd88 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -90,9 +90,22 @@ func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool } active, ok = ws.activeRequests[req] - return + LOGDBG(fmt.Sprintf("ws.activeRequests[req] active:%v, ok:%v, request:%v-%s\n", active, ok, r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") + return }(); ok { - active.Cancel() + if lock { + ws.peer.mu.RLock() + defer ws.peer.mu.RUnlock() + } + + if active.Context().Err() == nil { + LOGDBG(fmt.Sprintf("Ctx alive cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") + active.Cancel() + } else { + LOGDBG(fmt.Sprintf("start-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") + ws.peer.RemoveRequestCancelled(r, "Ctx dead cancel") + LOGDBG(fmt.Sprintf("end-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") + } // The requester is running and will handle the result. return true } @@ -377,11 +390,17 @@ func (ws *webseedPeer) logProgress(label string, lockTorrent bool) { persisting := ws.persisting.Load() activeCount := len(ws.activeRequests) - ws.peer.logger.Levelf(log.Debug, "%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s cancelCounter=%d", + LOGDBG(fmt.Sprintf("%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s cancelCounter=%d\n", + label, ws.processedRequests, pendingRequests, desiredRequests, nominalMaxRequests, + activeCount-int(receiving)-int(persisting), receiving, persisting, ws.maxActiveRequests, ws.waiting, + t.numQueuedForHash(false), t.activePieceHashes.Load(), t.hashing.Load(), len(t.hashResults), + t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate), ws.peer.GetCancelCount()), "webseed-peer.go->webseedPeer.logProgress") + + /*ws.peer.logger.Levelf(log.Debug, "%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s cancelCounter=%d", label, ws.processedRequests, pendingRequests, desiredRequests, nominalMaxRequests, activeCount-int(receiving)-int(persisting), receiving, persisting, ws.maxActiveRequests, ws.waiting, t.numQueuedForHash(false), t.activePieceHashes.Load(), t.hashing.Load(), len(t.hashResults), - t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate), ws.peer.GetCancelCount()) + t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate), ws.peer.GetCancelCount())*/ } func requestUpdate(ws *webseedPeer) { @@ -578,6 +597,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re err = result.Ctx.Err() } + reqIdx := ws.peer.t.requestIndexFromRequest(r, true) + if err != nil { switch { case errors.Is(err, context.Canceled): @@ -595,20 +616,24 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re } } - if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, true)) { + LOGDBG(fmt.Sprintf("strat-remoteRejectedRequest request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler") + if !ws.peer.remoteRejectedRequest(reqIdx) { err = fmt.Errorf(`received invalid reject "%w", for request %v`, err, r) ws.peer.logger.Levelf(log.Debug, "%v", err) } + LOGDBG(fmt.Sprintf("end-remoteRejectedRequest request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler") return err } + LOGDBG(fmt.Sprintf("strat-receiveChunk request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler") err = ws.peer.receiveChunk(&pp.Message{ Type: pp.Piece, Index: r.Index, Begin: r.Begin, Piece: piece, }) + LOGDBG(fmt.Sprintf("end-receiveChunk request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler") if err != nil { panic(err) diff --git a/webseed/client.go b/webseed/client.go index 4e9781d6e3..46ac49a397 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -30,6 +30,7 @@ type requestPart struct { } type Request struct { + ctx context.Context cancel func() Result chan RequestResult readers []io.Reader @@ -39,6 +40,10 @@ func (r Request) Cancel() { r.cancel() } +func (r Request) Context() context.Context { + return r.ctx +} + type Client struct { HttpClient *http.Client Url string @@ -116,6 +121,7 @@ func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter panic("request out of file bounds") } req := Request{ + ctx: ctx, cancel: cancel, Result: make(chan RequestResult, 1), }