Skip to content

Commit

Permalink
fix: don't sendEvent if context cancelled
Browse files Browse the repository at this point in the history
Fixes: #343
  • Loading branch information
rvagg committed Jul 5, 2023
1 parent 013521e commit 1e12a93
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
6 changes: 3 additions & 3 deletions pkg/retriever/graphsyncretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,20 @@ func (pg *ProtocolGraphsync) Retrieve(
eventsSubscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
switch event.Code {
case datatransfer.Open:
shared.sendEvent(events.Proposed(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate))
shared.sendEvent(ctx, events.Proposed(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate))
case datatransfer.NewVoucherResult:
lastVoucher := channelState.LastVoucherResult()
resType, err := retrievaltypes.DealResponseFromNode(lastVoucher.Voucher)
if err != nil {
return
}
if resType.Status == retrievaltypes.DealStatusAccepted {
shared.sendEvent(events.Accepted(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate))
shared.sendEvent(ctx, events.Accepted(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate))
}
case datatransfer.DataReceivedProgress:
if !receivedFirstByte {
receivedFirstByte = true
shared.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate, retrieval.Clock.Since(retrievalStart), multicodec.TransportGraphsyncFilecoinv1))
shared.sendEvent(ctx, events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate, retrieval.Clock.Since(retrievalStart), multicodec.TransportGraphsyncFilecoinv1))
}
if lastBytesReceivedTimer != nil {
doneLk.Lock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ph *ProtocolHttp) Retrieve(
var ttfb time.Duration
rdr := newTimeToFirstByteReader(resp.Body, func() {
ttfb = retrieval.Clock.Since(retrievalStart)
shared.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate, ttfb, multicodec.TransportIpfsGatewayHttp))
shared.sendEvent(ctx, events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, candidate, ttfb, multicodec.TransportIpfsGatewayHttp))
})
cfg := verifiedcar.Config{
Root: retrieval.request.Cid,
Expand Down
33 changes: 17 additions & 16 deletions pkg/retriever/parallelpeerretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func (shared *retrievalShared) canSendResult() bool {

// sendResult will only send a result to the parent goroutine if a retrieval has
// finished (likely by a success), otherwise it will send the result
func (shared *retrievalShared) sendResult(result retrievalResult) bool {
func (shared *retrievalShared) sendResult(ctx context.Context, result retrievalResult) bool {
select {
case <-ctx.Done():
case <-shared.finishChan:
return false
case shared.resultChan <- result:
Expand All @@ -112,9 +113,9 @@ func (shared *retrievalShared) sendResult(result retrievalResult) bool {
return true
}

func (shared *retrievalShared) sendEvent(event events.EventWithProviderID) {
func (shared *retrievalShared) sendEvent(ctx context.Context, event events.EventWithProviderID) {
retrievalEvent := event.(types.RetrievalEvent)
shared.sendResult(retrievalResult{PeerID: event.ProviderId(), Event: &retrievalEvent})
shared.sendResult(ctx, retrievalResult{PeerID: event.ProviderId(), Event: &retrievalEvent})
}

func (cfg *parallelPeerRetriever) Retrieve(
Expand Down Expand Up @@ -192,8 +193,8 @@ func (retrieval *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.In
// optimistically try to wait for all routines to finish
select {
case <-finishAll:
case <-time.After(100 * time.Millisecond):
logger.Warn("Unable to successfully cancel all retrieval attempts withing 100ms")
case <-time.After(1 * time.Millisecond):
logger.Warnf("Unable to successfully cancel all %s retrieval attempts withing 100ms", retrieval.Protocol.Code().String())
}
return stats, err
}
Expand Down Expand Up @@ -297,7 +298,7 @@ func (retrieval *retrieval) runRetrievalCandidate(
var retrievalErr error
var done func()

shared.sendEvent(events.StartedRetrieval(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate, retrieval.Protocol.Code()))
shared.sendEvent(ctx, events.StartedRetrieval(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate, retrieval.Protocol.Code()))
connectCtx := ctx
if timeout != 0 {
var timeoutFunc func()
Expand All @@ -309,15 +310,15 @@ func (retrieval *retrieval) runRetrievalCandidate(
connectTime, err := retrieval.Protocol.Connect(connectCtx, retrieval, startTime, candidate)
if err != nil {
if ctx.Err() == nil { // not cancelled, maybe timed out though
logger.Warnf("Failed to connect to SP %s: %v", candidate.MinerPeer.ID, err)
logger.Warnf("Failed to connect to SP %s on protocol %s: %v", candidate.MinerPeer.ID, retrieval.Protocol.Code().String(), err)
retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err)
if err := retrieval.Session.RecordFailure(retrieval.request.RetrievalID, candidate.MinerPeer.ID); err != nil {
logger.Errorf("Error recording retrieval failure: %v", err)
logger.Errorf("Error recording retrieval failure on protocol %s: %v", retrieval.Protocol.Code().String(), err)
}
shared.sendEvent(events.FailedRetrieval(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate, retrievalErr.Error()))
shared.sendEvent(ctx, events.FailedRetrieval(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate, retrievalErr.Error()))
}
} else {
shared.sendEvent(events.ConnectedToProvider(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate))
shared.sendEvent(ctx, events.ConnectedToProvider(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate))

retrieval.Session.RecordConnectTime(candidate.MinerPeer.ID, connectTime)

Expand All @@ -332,12 +333,12 @@ func (retrieval *retrieval) runRetrievalCandidate(
if errors.Is(retrievalErr, ErrRetrievalTimedOut) {
msg = fmt.Sprintf("timeout after %s", timeout)
}
shared.sendEvent(events.FailedRetrieval(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate, msg))
shared.sendEvent(ctx, events.FailedRetrieval(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, candidate, msg))
if err := retrieval.Session.RecordFailure(retrieval.request.RetrievalID, candidate.MinerPeer.ID); err != nil {
logger.Errorf("Error recording retrieval failure: %v", err)
logger.Errorf("Error recording retrieval failure for protocol %s: %v", retrieval.Protocol.Code().String(), err)
}
} else {
shared.sendEvent(events.Success(
shared.sendEvent(ctx, events.Success(
retrieval.parallelPeerRetriever.Clock.Now(),
retrieval.request.RetrievalID,
candidate,
Expand All @@ -359,13 +360,13 @@ func (retrieval *retrieval) runRetrievalCandidate(
if shared.canSendResult() {
if retrievalErr != nil {
if ctx.Err() != nil { // cancelled, don't report the error
shared.sendResult(retrievalResult{PeerID: candidate.MinerPeer.ID})
shared.sendResult(ctx, retrievalResult{PeerID: candidate.MinerPeer.ID})
} else {
// an error of some kind to report
shared.sendResult(retrievalResult{PeerID: candidate.MinerPeer.ID, Err: retrievalErr})
shared.sendResult(ctx, retrievalResult{PeerID: candidate.MinerPeer.ID, Err: retrievalErr})
}
} else { // success, we have stats and no errors
shared.sendResult(retrievalResult{PeerID: candidate.MinerPeer.ID, Stats: stats})
shared.sendResult(ctx, retrievalResult{PeerID: candidate.MinerPeer.ID, Stats: stats})
}
} // else nothing to do, we were cancelled

Expand Down

0 comments on commit 1e12a93

Please sign in to comment.