Skip to content

Commit

Permalink
Fix slot queue push (#542)
Browse files Browse the repository at this point in the history
* [sales] remove availability check before adding to slot queue

* [sales] add missing return statement

* [tests] remove 'eventuallyCheck' helper

* [sales] remove reservations from slot queue

* [tests] rename module `eventually` -> `always`

* [sales] increase slot queue size

Because it will now also hold items for which we haven't
checked availability yet.
  • Loading branch information
markspanbroek authored Sep 4, 2023
1 parent d279eeb commit d3a22a7
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 190 deletions.
28 changes: 13 additions & 15 deletions codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func new*(_: type Sales,
market: market,
clock: clock,
reservations: reservations,
slotQueue: SlotQueue.new(reservations),
slotQueue: SlotQueue.new(),
simulateProofFailures: simulateProofFailures
),
trackedFutures: TrackedFutures.new(),
Expand Down Expand Up @@ -182,7 +182,7 @@ proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =

for slots in requests:
for slot in slots:
if err =? (await queue.push(slot)).errorOption:
if err =? queue.push(slot).errorOption:
# continue on error
if err of QueueNotRunningError:
warn "cannot push items to queue, queue is not running"
Expand Down Expand Up @@ -219,21 +219,19 @@ proc onStorageRequested(sales: Sales,
warn "Too many slots, cannot add to queue"
else:
warn "Failed to create slot queue items from request", error = err.msg
return

for item in items:
# continue on failure
slotQueue.push(item)
.track(sales)
.catch(proc(err: ref CatchableError) =
if err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running"
else:
warn "Error adding request to SlotQueue", error = err.msg
)
if err =? slotQueue.push(item).errorOption:
if err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running"
else:
warn "Error adding request to SlotQueue", error = err.msg

proc onSlotFreed(sales: Sales,
requestId: RequestId,
Expand Down Expand Up @@ -263,7 +261,7 @@ proc onSlotFreed(sales: Sales,

found = SlotQueueItem.init(request, slotIndex.truncate(uint16))

if err =? (await queue.push(found)).errorOption:
if err =? queue.push(found).errorOption:
raise err

addSlotToQueue()
Expand Down
20 changes: 4 additions & 16 deletions codex/sales/slotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ./reservations
import ../errors
import ../rng
import ../utils
Expand Down Expand Up @@ -46,7 +45,6 @@ type
maxWorkers: int
onProcessSlot: ?OnProcessSlot
queue: AsyncHeapQueue[SlotQueueItem]
reservations: Reservations
running: bool
workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures
Expand All @@ -68,7 +66,7 @@ const DefaultMaxWorkers = 3
# included in the queue if it is higher priority than any of the exisiting
# items. Older slots should be unfillable over time as other hosts fill the
# slots.
const DefaultMaxSize = 64'u16
const DefaultMaxSize = 128'u16

proc profitability(item: SlotQueueItem): UInt256 =
StorageAsk(collateral: item.collateral,
Expand Down Expand Up @@ -105,7 +103,6 @@ proc `==`*(a, b: SlotQueueItem): bool =
a.slotIndex == b.slotIndex

proc new*(_: type SlotQueue,
reservations: Reservations,
maxWorkers = DefaultMaxWorkers,
maxSize: SlotQueueSize = DefaultMaxSize): SlotQueue =

Expand All @@ -119,7 +116,6 @@ proc new*(_: type SlotQueue,
# Add 1 to always allow for an extra item to be pushed onto the queue
# temporarily. After push (and sort), the bottom-most item will be deleted
queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1),
reservations: reservations,
running: false,
trackedFutures: TrackedFutures.new()
)
Expand Down Expand Up @@ -228,7 +224,7 @@ proc populateItem*(self: SlotQueue,
)
return none SlotQueueItem

proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} =
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =

trace "pushing item to queue",
requestId = item.requestId, slotIndex = item.slotIndex
Expand All @@ -237,14 +233,6 @@ proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} =
let err = newException(QueueNotRunningError, "queue not running")
return failure(err)

without availability =? await self.reservations.find(item.slotSize,
item.duration,
item.profitability,
item.collateral,
used = false):
let err = newException(NoMatchingAvailabilityError, "no availability")
return failure(err)

if self.contains(item):
let err = newException(SlotQueueItemExistsError, "item already exists")
return failure(err)
Expand All @@ -259,9 +247,9 @@ proc push*(self: SlotQueue, item: SlotQueueItem): Future[?!void] {.async.} =
doAssert self.queue.len <= self.queue.size - 1
return success()

proc push*(self: SlotQueue, items: seq[SlotQueueItem]): Future[?!void] {.async.} =
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void =
for item in items:
if err =? (await self.push(item)).errorOption:
if err =? self.push(item).errorOption:
return failure(err)

return success()
Expand Down
4 changes: 2 additions & 2 deletions tests/codex/helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import pkg/codex/rng
import ./helpers/nodeutils
import ./helpers/randomchunker
import ./helpers/mockdiscovery
import ./helpers/eventually
import ./helpers/always
import ../checktest

export randomchunker, nodeutils, mockdiscovery, eventually, checktest, manifest
export randomchunker, nodeutils, mockdiscovery, always, checktest, manifest

export libp2p except setup, eventually

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
import pkg/chronos

template eventuallyCheck*(condition: untyped, timeout = 5.seconds): bool =
proc loop: Future[bool] {.async.} =
let start = Moment.now()
while true:
if condition:
return true
if Moment.now() > (start + timeout):
return false
else:
await sleepAsync(1.millis)
await loop()

template always*(condition: untyped, timeout = 50.millis): bool =
proc loop: Future[bool] {.async.} =
let start = Moment.now()
Expand Down
4 changes: 2 additions & 2 deletions tests/codex/sales/states/testproving.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ asyncchecksuite "sales state 'proving'":
market.setProofRequired(slot.id, true)
await market.advanceToNextPeriod()

check eventuallyCheck receivedIds == @[slot.id] and receivedProofs == @[proof]
check eventually receivedIds == @[slot.id] and receivedProofs == @[proof]

await future.cancelAndWait()
await subscription.unsubscribe()
Expand All @@ -77,6 +77,6 @@ asyncchecksuite "sales state 'proving'":
market.slotState[slot.id] = SlotState.Finished
await market.advanceToNextPeriod()

check eventuallyCheck future.finished
check eventually future.finished
check !(future.read()) of SalePayout

2 changes: 1 addition & 1 deletion tests/codex/sales/states/testsimulatedproving.nim
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,5 @@ asyncchecksuite "sales state 'simulated-proving'":
market.slotState[slot.id] = SlotState.Finished
await market.advanceToNextPeriod()

check eventuallyCheck future.finished
check eventually future.finished
check !(future.read()) of SalePayout
38 changes: 7 additions & 31 deletions tests/codex/sales/testsales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import pkg/codex/blocktype as bt
import pkg/codex/node
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/eventually
import ../helpers/always
import ../examples
import ./helpers

Expand Down Expand Up @@ -189,7 +189,10 @@ asyncchecksuite "Sales":
var request1 = StorageRequest.example
request1.ask.collateral = request.ask.collateral + 1
discard await reservations.reserve(availability)
await market.requestStorage(request)
# saturate queue
while queue.len < queue.size - 1:
await market.requestStorage(StorageRequest.example)
# send request
await market.requestStorage(request1)
await sleepAsync(5.millis) # wait for request slots to be added to queue
return request1
Expand Down Expand Up @@ -236,33 +239,6 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)

test "request slots are not added to the slot queue when no availabilities exist":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()

await market.requestStorage(request)
# check that request was ignored due to no matching availability
check always itemsProcessed.len == 0

test "non-matching availabilities/requests are not added to the slot queue":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()

let nonMatchingAvailability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=601.u256, # too high
maxCollateral=400.u256
)
check isOk await reservations.reserve(nonMatchingAvailability)
await market.requestStorage(request)
# check that request was ignored due to no matching availability
check always itemsProcessed.len == 0

test "adds past requests to queue once availability added":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
Expand All @@ -273,7 +249,7 @@ asyncchecksuite "Sales":

# now add matching availability
check isOk await reservations.reserve(availability)
check eventuallyCheck itemsProcessed.len == request.ask.slots.int
check eventually itemsProcessed.len == request.ask.slots.int

test "makes storage unavailable when downloading a matched request":
var used = false
Expand Down Expand Up @@ -395,7 +371,7 @@ asyncchecksuite "Sales":
test "fills a slot":
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventuallyCheck market.filled.len == 1
check eventually market.filled.len > 0
check market.filled[0].requestId == request.id
check market.filled[0].slotIndex < request.ask.slots.u256
check market.filled[0].proof == proof
Expand Down
1 change: 0 additions & 1 deletion tests/codex/sales/testsalesagent.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import pkg/codex/sales/states/errorhandling
import pkg/codex/proving
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/eventually
import ../helpers
import ../examples

Expand Down
Loading

0 comments on commit d3a22a7

Please sign in to comment.