Skip to content

Commit

Permalink
chore: add comment on cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 2, 2023
1 parent 2d4638e commit e863164
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
29 changes: 20 additions & 9 deletions src/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function handle ({ context, logger, batchSize = config.blocksBatchSize }) {
const normalizedWantlist = getNormalizedWantlist(wantlist, context, logger)

// Set state of processing blocks in canceled state
updateCanceledState(normalizedWantlist, context)
updateCanceledState(normalizedWantlist, context, logger)

process.nextTick(async () => {
// catch async error in libp2p connection
Expand Down Expand Up @@ -186,8 +186,20 @@ async function batchResponse ({ blocks, context, logger }) {
let message = new Message()
for (let i = 0; i < blocks.length; i++) {
const block = blocks[i]
// console.log('block key', block.key)
const canceledItem = context.canceled.get(block.key)
if (!canceledItem || canceledItem !== block.type) {
logger.info({ keyList: context.canceled.keyList }, 'check keyList')
logger.info({ key: block.key, type: block.type, wantType: block.wantType, block: block }, 'check')
logger.info({ canceled: canceledItem }, 'canceled')

if (canceledItem === block.wantType) {
const size = messageSize[block.type](block)
telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type])
telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size)

logger.info({ key: block.key }, 'delete')
context.canceled.delete(block.key)
} else {
const size = messageSize[block.type](block)

// maxMessageSize MUST BE larger than a single block info/data
Expand All @@ -198,12 +210,6 @@ async function batchResponse ({ blocks, context, logger }) {

message.push(block, size, context.protocol)
sentMetrics[block.type](block, size)
} else {
const size = messageSize[block.type](block)
telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type])
telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size)

context.canceled.delete(block.key)
}
}

Expand All @@ -217,22 +223,27 @@ async function batchResponse ({ blocks, context, logger }) {
}
}

function updateCanceledState (wantList, context) {
function updateCanceledState (wantList, context, logger) {
const { wantedBlocks, wantedHave, canceled } = wantList

// Removed previous canceled blocks
wantedBlocks.forEach(block => {
logger.info({ key: block.key }, 'remove block from canceled')
context.canceled.delete(block.key)
})

wantedHave.forEach(block => {
logger.info({ key: block.key }, 'remove wanted block from canceled')
context.canceled.delete(block.key)
})

// Add new canceled blocks
canceled.forEach(block => {
logger.info({ key: block.key }, 'add block to canceled')
context.canceled.set(block.key, block.wantType)
})

logger.info({ keyList: context.canceled.keyList }, 'check keyList')
}

// end response, close connection
Expand Down
18 changes: 15 additions & 3 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
autoDialInterval: connectionConfig.p2p.autoDialInterval
}
})
const cancelsPerPeer = new Map()

const handlerOptions = {
maxInboundStreams: connectionConfig.handler.maxInboundStreams,
Expand Down Expand Up @@ -109,8 +110,11 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
service.handle(protocol, async ({ connection: dial, stream }) => {
try {
const connection = new Connection(stream)
// const canceled = cancelsPerPeer.get(dial.remotePeer.toString())
const canceled = new LRU({ max: 200 })

!canceled && console.log('canceled')

const hrTime = process.hrtime()
const connectionId = hrTime[0] * 1000000000 + hrTime[1]

Expand Down Expand Up @@ -146,12 +150,13 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
// another multiplexed stream.
connection.on('end:receive', () => {
// GC canceled LRU on finish
canceled.clear()
logger.info({}, 'end:receive')
connection.close()
})

connection.on('error', err => {
// GC canceled LRU on error
logger.info({}, 'error')
canceled.clear()
logger.error({ err, dial, stream, protocol }, 'Connection error')
})
Expand All @@ -163,20 +168,27 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec

// TODO move to networking
service.connectionManager.addEventListener('peer:connect', connection => {
cancelsPerPeer.set(
connection.detail.remotePeer.toString(),
new LRU({ max: 200 })
)
try {
telemetry.increaseCount('bitswap-connections')
telemetry.increaseGauge('bitswap-active-connections')
} catch (err) {
logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer connecting')
logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer connecting')
}
})

// TODO move to networking
service.connectionManager.addEventListener('peer:disconnect', connection => {
cancelsPerPeer.delete(
connection.detail.remotePeer.toString()
)
try {
telemetry.decreaseGauge('bitswap-active-connections')
} catch (err) {
logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer disconnecting')
logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer disconnecting')
}
})

Expand Down
4 changes: 2 additions & 2 deletions test/handler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ t.test('handle', async t => {

await handle({ context: contextSpy, logger: loggerSpy, batchSize: 1 })

t.equal(connectionSpy.send.callCount, 2) // Only two sends in the 4 block messages
t.equal(connectionSpy.send.callCount, 1) // Only one sends in the 4 block messages
})

t.test('should handle a request with single batch canceling requested items', async t => {
Expand Down Expand Up @@ -356,7 +356,7 @@ t.test('handle', async t => {
])

t.equal(response.blocksInfo.length, 2)
t.equal(response.blocksData.length, 2)
t.equal(response.blocksData.length, 1)

t.ok(responseContainsInfo(response, cid7, BlockPresence.Type.Have))
t.ok(responseContainsInfo(response, cid9, BlockPresence.Type.Have))
Expand Down

0 comments on commit e863164

Please sign in to comment.