Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
dvovk committed Dec 3, 2024
1 parent 7c43720 commit d1c0a59
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 41 deletions.
83 changes: 48 additions & 35 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,37 @@ func (p *Peer) Torrent() *Torrent {
return p.t
}

func LOGDBG(msg string, source string) {
//create current time string with format [12-03|07:55:09.819]
currentTime := time.Now().Format("01-02|15:04:05.000")

fmt.Printf("[%s] %s - source:%s\n", currentTime, msg, source)
}

func (p *Peer) AddRequestCancelled(r RequestIndex, source string) bool {
result := p.requestState.Cancelled.CheckedAdd(r)
if(result) {
p.requestState.CancelCounter.Store(int32(p.requestState.CancelCounter.Load()) + 1)
LOGDBG(fmt.Sprintf("Added to cancel request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source)
} else {
LOGDBG(fmt.Sprintf("Already in cancelled request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source)
}

return result
}

func (p *Peer) RemoveRequestCancelled(r RequestIndex, source string) bool {
result := p.requestState.Cancelled.CheckedRemove(r)
if(result) {
p.requestState.CancelCounter.Store(int32(p.requestState.CancelCounter.Load()) - 1)
LOGDBG(fmt.Sprintf("Request removed request:%v-%s", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source)
} else {
LOGDBG(fmt.Sprintf("Request already in removed request:%v-%s", r, p.RemoteAddr.String() + "/" + p.t.info.Name), source)
}

return result
}

func (p *Peer) initRequestState() {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -571,13 +602,7 @@ func (cn *Peer) request(r RequestIndex, maxRequests int, lock bool, lockTorrent
// this is required in case this is a re-request of a previously
// cancelled request - we need to clear the cancelled flag
//cn.requestState.Cancelled.Remove(r)
removed := cn.requestState.Cancelled.CheckedRemove(r)
if removed {
cn.requestState.CancelCounter.Store(int32(cn.requestState.CancelCounter.Load()) - 1)
cn.logger.Levelf(log.Debug, "1 request %d removed by peer remote adderss %s ", r, cn.RemoteAddr.String() + "" + cn.t.info.Name)
} else {
cn.logger.Levelf(log.Debug, "1 request %d NOT removed by peer remote adderss %s : %s", r, cn.RemoteAddr.String() + "" + cn.t.info.Name)
}
_ = cn.RemoveRequestCancelled(r, "peer.go->Peer.request")

if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[RequestIndex]int)
Expand Down Expand Up @@ -614,16 +639,18 @@ func (me *Peer) cancel(r RequestIndex, lock bool, lockTorrent bool) {
if !me.deleteRequest(r, false, false) {
panic(fmt.Sprintf("request %d not existing: should have been guarded", r))
}

LOGDBG(fmt.Sprintf("start-Cancel request:%v-%s\n", r, me.RemoteAddr.String() + "/" + me.t.info.Name), "peer.go->Peer.cancel")
if me._cancel(r, false, false) {
// Record that we expect to get a cancel ack.
added := me.requestState.Cancelled.CheckedAdd(r)
added := me.AddRequestCancelled(r, "peer.go->Peer.cancel")
if !added {
panic(fmt.Sprintf("request %d: already cancelled for hash: %s", r, me.t.InfoHash()))
} else {
me.logger.Levelf(log.Debug, "Add to cancel %d peer remote adderss: %s", r, me.RemoteAddr.String() + "" + me.t.info.Name)
me.requestState.CancelCounter.Store(int32(me.requestState.CancelCounter.Load()) + 1)
}
}
}

LOGDBG(fmt.Sprintf("end-Cancel request:%v-%s\n", r, me.RemoteAddr.String() + "/" + me.t.info.Name), "peer.go->Peer.cancel")

me.decPeakRequests(false)
}()
}
Expand Down Expand Up @@ -744,15 +771,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
if c.deleteRequest(r, false, false) {
c.decPeakRequests(false)
} else {

removed := c.requestState.Cancelled.CheckedRemove(r)
if removed {
c.requestState.CancelCounter.Store(int32(c.requestState.CancelCounter.Load()) - 1)
c.logger.Levelf(log.Debug, "3 request %d removed by peer remote adderss %s ", r, c.RemoteAddr.String() + "" + c.t.info.Name)
} else {
c.logger.Levelf(log.Debug, "3 request %d NOT removed by peer remote adderss %s ", r, c.RemoteAddr.String() + "" + c.t.info.Name)
}

removed := c.RemoveRequestCancelled(r, "peer.go->Peer.remoteRejectedRequest")
if !removed {
return false
}
Expand Down Expand Up @@ -859,16 +878,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
defer c.mu.Unlock()

// Request has been satisfied.

removed := c.requestState.Cancelled.CheckedRemove(req)
if removed {
c.requestState.CancelCounter.Store(int32(c.requestState.CancelCounter.Load()) - 1)
c.logger.Levelf(log.Debug, "2 request %d removed by peer remote adderss %s ", req, c.RemoteAddr.String() + "" + c.t.info.Name)
} else {
c.logger.Levelf(log.Debug, "2 request %d NOT removed by peer remote adderss %s ", req, c.RemoteAddr.String() + "" + c.t.info.Name)
}

if c.deleteRequest(req, false, false) || removed {
if c.deleteRequest(req, false, false) || c.RemoveRequestCancelled(req, "peer.go->Peer.receiveChunk") {
intended = true
if !c.peerChoking {
c._chunksReceivedWhileExpecting++
Expand All @@ -881,7 +891,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// Do we actually want this chunk?
if t.haveChunk(ppReq, false) {
// panic(fmt.Sprintf("%+v", ppReq))
c.logger.Levelf(log.Debug, "haveChunk %d peer remote adderss %s: %s", req, c.RemoteAddr.String() + "" + c.t.info.Name)
LOGDBG(fmt.Sprintf("haveChunk request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk")
chunksReceived.Add("redundant", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil, false, nil
Expand Down Expand Up @@ -914,14 +924,15 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {

// Cancel pending requests for this chunk from *other* peers.
if p := t.requestingPeer(req, false); p != nil {
p.logger.Levelf(log.Debug, "Cancel pending request %d peer remote adderss %s: %s", req, p.RemoteAddr.String() + "" + p.t.info.Name)
LOGDBG(fmt.Sprintf("start-Cancel pending request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk")
if p == c {
panic("should not be pending request from conn that just received it")
}
LOGDBG(fmt.Sprintf("end-Cancel pending request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk")
p.cancel(req, true, false)
if p.isLowOnRequests(true, false) {
p.updateRequests("Peer.receiveChunk", false)
p.logger.Levelf(log.Debug, "peer low on requests, remote adderss %s: %s", req, p.RemoteAddr.String() + "" + p.t.info.Name)
LOGDBG(fmt.Sprintf("peer low on requests request:%v-%s\n", req, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.receiveChunk")
}
}

Expand Down Expand Up @@ -1044,7 +1055,7 @@ func (c *Peer) deleteRequest(r RequestIndex, lock bool, lockTorrent bool) bool {
}

removed := c.requestState.Requests.CheckedRemove(r)
c.logger.Levelf(log.Debug, "1request %d removed by peer remote adderss %s - succesfully: %s", r, c.RemoteAddr.String() + "" + c.t.info.Name, removed)

if !removed {
return false
}
Expand Down Expand Up @@ -1113,6 +1124,7 @@ func (c *Peer) cancelAllRequests(lock bool, lockTorrent bool) {
}

c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
LOGDBG(fmt.Sprintf("cancelAllRequests request:%v-%s\n", x, c.RemoteAddr.String() + "/" + c.t.info.Name), "peer.go->Peer.cancelAllRequests")
c.cancel(x, false, false)
return true
})
Expand Down Expand Up @@ -1208,6 +1220,7 @@ func (p *Peer) decPeakRequests(lock bool) {
defer p.mu.RUnlock()
}
p.peakRequests--
p.logger.Levelf(log.Debug, "decPeakRequests %s", p.RemoteAddr.String() + "/" + p.t.info.Name)
}

func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) {
Expand Down
47 changes: 47 additions & 0 deletions request-strategy/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,50 @@ type PeerRequests interface {

Bitmap() *typedRoaring.Bitmap[RequestIndex]
}

/*type SyncBitmap struct {
mu sync.Mutex
bitmap typedRoaring.Bitmap[RequestIndex]
}
func (sb *SyncBitmap) GetCardinality() uint64 {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.bitmap.GetCardinality()
}
func (sb *SyncBitmap) Contains(r RequestIndex) bool {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.bitmap.Contains(r)
}
func (sb *SyncBitmap) Remove(r RequestIndex) {
sb.mu.Lock()
defer sb.mu.Unlock()
sb.bitmap.Remove(r)
}
func (sb *SyncBitmap) CheckedAdd(r RequestIndex) bool{
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.bitmap.CheckedAdd(r)
}
func (sb *SyncBitmap) CheckedRemove(r RequestIndex) bool{
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.bitmap.CheckedRemove(r)
}
func (sb *SyncBitmap) Clone() typedRoaring.Bitmap[RequestIndex] {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.bitmap.Clone()
}
func (sb *SyncBitmap) IsEmpty() bool {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.bitmap.IsEmpty()
}*/
2 changes: 1 addition & 1 deletion requesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (p *Peer) getDesiredRequestState(debug bool, lock bool, lockTorrent bool) (

if !cancelled.IsEmpty() && cancelled.Contains(r) {
// Can't re-request while awaiting acknowledgement.
p.logger.Levelf(log.Debug, "cancelled.Contains %d peer remote adderss %s", r, p.RemoteAddr.String() + "" + p.t.info.Name)
LOGDBG(fmt.Sprintf("cancelled.Contains request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), "requesting.go->getDesiredRequestState")
awaitingCancelCount++
return
}
Expand Down
1 change: 1 addition & 0 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,7 @@ func (t *Torrent) cancelRequest(r RequestIndex, updateRequests, lock bool) *Peer

p := t.requestingPeer(r, false)
if p != nil {
LOGDBG(fmt.Sprintf("cancelling request:%v-%s\n", r, p.RemoteAddr.String() + "/" + p.t.info.Name), "torrent.go->cancelRequest")
p.cancel(r, true, false)

if updateRequests && p.isLowOnRequests(true, lock) {
Expand Down
35 changes: 30 additions & 5 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,22 @@ func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool
}

active, ok = ws.activeRequests[req]
return
LOGDBG(fmt.Sprintf("ws.activeRequests[req] active:%v, ok:%v, request:%v-%s\n", active, ok, r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
return
}(); ok {
active.Cancel()
if lock {
ws.peer.mu.RLock()
defer ws.peer.mu.RUnlock()
}

if active.Context().Err() == nil {
LOGDBG(fmt.Sprintf("Ctx alive cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
active.Cancel()
} else {
LOGDBG(fmt.Sprintf("start-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
ws.peer.RemoveRequestCancelled(r, "Ctx dead cancel")
LOGDBG(fmt.Sprintf("end-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
}
// The requester is running and will handle the result.
return true
}
Expand Down Expand Up @@ -377,11 +390,17 @@ func (ws *webseedPeer) logProgress(label string, lockTorrent bool) {
persisting := ws.persisting.Load()
activeCount := len(ws.activeRequests)

ws.peer.logger.Levelf(log.Debug, "%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s cancelCounter=%d",
LOGDBG(fmt.Sprintf("%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s cancelCounter=%d\n",
label, ws.processedRequests, pendingRequests, desiredRequests, nominalMaxRequests,
activeCount-int(receiving)-int(persisting), receiving, persisting, ws.maxActiveRequests, ws.waiting,
t.numQueuedForHash(false), t.activePieceHashes.Load(), t.hashing.Load(), len(t.hashResults),
t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate), ws.peer.GetCancelCount()), "webseed-peer.go->webseedPeer.logProgress")

/*ws.peer.logger.Levelf(log.Debug, "%s %d (p=%d,d=%d,n=%d) active(c=%d,r=%d,p=%d,m=%d,w=%d) hashing(q=%d,a=%d,h=%d,r=%d) complete(%d/%d) lastUpdate=%s cancelCounter=%d",
label, ws.processedRequests, pendingRequests, desiredRequests, nominalMaxRequests,
activeCount-int(receiving)-int(persisting), receiving, persisting, ws.maxActiveRequests, ws.waiting,
t.numQueuedForHash(false), t.activePieceHashes.Load(), t.hashing.Load(), len(t.hashResults),
t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate), ws.peer.GetCancelCount())
t.numPiecesCompleted(false), t.NumPieces(), time.Since(ws.peer.lastRequestUpdate), ws.peer.GetCancelCount())*/
}

func requestUpdate(ws *webseedPeer) {
Expand Down Expand Up @@ -578,6 +597,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
err = result.Ctx.Err()
}

reqIdx := ws.peer.t.requestIndexFromRequest(r, true)

if err != nil {
switch {
case errors.Is(err, context.Canceled):
Expand All @@ -595,20 +616,24 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}
}

if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, true)) {
LOGDBG(fmt.Sprintf("strat-remoteRejectedRequest request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler")
if !ws.peer.remoteRejectedRequest(reqIdx) {
err = fmt.Errorf(`received invalid reject "%w", for request %v`, err, r)
ws.peer.logger.Levelf(log.Debug, "%v", err)
}
LOGDBG(fmt.Sprintf("end-remoteRejectedRequest request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler")

return err
}

LOGDBG(fmt.Sprintf("strat-receiveChunk request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler")
err = ws.peer.receiveChunk(&pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
Piece: piece,
})
LOGDBG(fmt.Sprintf("end-receiveChunk request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler")

if err != nil {
panic(err)
Expand Down
6 changes: 6 additions & 0 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type requestPart struct {
}

type Request struct {
ctx context.Context
cancel func()
Result chan RequestResult
readers []io.Reader
Expand All @@ -39,6 +40,10 @@ func (r Request) Cancel() {
r.cancel()
}

func (r Request) Context() context.Context {
return r.ctx
}

type Client struct {
HttpClient *http.Client
Url string
Expand Down Expand Up @@ -116,6 +121,7 @@ func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter
panic("request out of file bounds")
}
req := Request{
ctx: ctx,
cancel: cancel,
Result: make(chan RequestResult, 1),
}
Expand Down

0 comments on commit d1c0a59

Please sign in to comment.