Skip to content

Commit

Permalink
feat(slot-reservations): support SlotReservationsFull event
Browse files Browse the repository at this point in the history
  • Loading branch information
emizzle committed Sep 30, 2024
1 parent 3c207c6 commit 7eb126b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 1 deletion.
11 changes: 11 additions & 0 deletions codex/contracts/market.nim
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ method subscribeSlotFreed*(market: OnChainMarket,
let subscription = await market.contract.subscribe(SlotFreed, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)

method subscribeSlotReservationsFull*(
market: OnChainMarket,
callback: OnSlotReservationsFull): Future[MarketSubscription] {.async.} =

proc onEvent(event: SlotReservationsFull) {.upraises:[].} =
callback(event.requestId, event.slotIndex)

convertEthersError:
let subscription = await market.contract.subscribe(SlotReservationsFull, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)

Check warning on line 319 in codex/contracts/market.nim

View check run for this annotation

Codecov / codecov/patch

codex/contracts/market.nim#L309-L319

Added lines #L309 - L319 were not covered by tests
method subscribeFulfillment(market: OnChainMarket,
callback: OnFulfillment):
Future[MarketSubscription] {.async.} =
Expand Down
10 changes: 10 additions & 0 deletions codex/market.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type
OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].}
OnSlotFreed* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSlotReservationsFull* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].}
Expand All @@ -42,6 +43,9 @@ type
SlotFreed* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
SlotReservationsFull* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
RequestFulfilled* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
RequestCancelled* = object of MarketplaceEvent
Expand Down Expand Up @@ -203,6 +207,12 @@ method subscribeSlotFreed*(market: Market,
Future[Subscription] {.base, async.} =
raiseAssert("not implemented")

method subscribeSlotReservationsFull*(
market: Market,
callback: OnSlotReservationsFull): Future[Subscription] {.base, async.} =

Check warning on line 213 in codex/market.nim

View check run for this annotation

Codecov / codecov/patch

codex/market.nim#L212-L213

Added lines #L212 - L213 were not covered by tests
raiseAssert("not implemented")

method subscribeRequestCancelled*(market: Market,
callback: OnRequestCancelled):
Future[Subscription] {.base, async.} =
Expand Down
18 changes: 18 additions & 0 deletions codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,23 @@ proc subscribeSlotFreed(sales: Sales) {.async.} =
except CatchableError as e:
error "Unable to subscribe to slot freed events", msg = e.msg

proc subscribeSlotReservationsFull(sales: Sales) {.async.} =
let context = sales.context
let market = context.market
let queue = context.slotQueue

proc onSlotReservationsFull(requestId: RequestId, slotIndex: UInt256) =
trace "reservations for slot full, removing from slot queue", requestId, slotIndex
queue.delete(requestId, slotIndex.truncate(uint16))

try:
let sub = await market.subscribeSlotReservationsFull(onSlotReservationsFull)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:

Check warning on line 482 in codex/sales.nim

View check run for this annotation

Codecov / codecov/patch

codex/sales.nim#L481-L482

Added lines #L481 - L482 were not covered by tests
error "Unable to subscribe to slot filled events", msg = e.msg

proc startSlotQueue(sales: Sales) {.async.} =
let slotQueue = sales.context.slotQueue
let reservations = sales.context.reservations
Expand All @@ -488,6 +505,7 @@ proc subscribe(sales: Sales) {.async.} =
await sales.subscribeSlotFilled()
await sales.subscribeSlotFreed()
await sales.subscribeCancellation()
await sales.subscribeSlotReservationsFull()

proc unsubscribe(sales: Sales) {.async.} =
for sub in sales.subscriptions:
Expand Down
25 changes: 25 additions & 0 deletions tests/codex/helpers/mockmarket.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type
onFulfillment: seq[FulfillmentSubscription]
onSlotFilled: seq[SlotFilledSubscription]
onSlotFreed: seq[SlotFreedSubscription]
onSlotReservationsFull: seq[SlotReservationsFullSubscription]
onRequestCancelled: seq[RequestCancelledSubscription]
onRequestFailed: seq[RequestFailedSubscription]
onProofSubmitted: seq[ProofSubmittedSubscription]
Expand All @@ -72,6 +73,9 @@ type
SlotFreedSubscription* = ref object of Subscription
market: MockMarket
callback: OnSlotFreed
SlotReservationsFullSubscription* = ref object of Subscription
market: MockMarket
callback: OnSlotReservationsFull
RequestCancelledSubscription* = ref object of Subscription
market: MockMarket
requestId: ?RequestId
Expand Down Expand Up @@ -202,6 +206,15 @@ proc emitSlotFreed*(market: MockMarket,
for subscription in subscriptions:
subscription.callback(requestId, slotIndex)

proc emitSlotReservationsFull*(
market: MockMarket,
requestId: RequestId,
slotIndex: UInt256) =

var subscriptions = market.subscriptions.onSlotReservationsFull
for subscription in subscriptions:
subscription.callback(requestId, slotIndex)

proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) =
var subscriptions = market.subscriptions.onRequestCancelled
for subscription in subscriptions:
Expand Down Expand Up @@ -389,6 +402,15 @@ method subscribeSlotFreed*(market: MockMarket,
market.subscriptions.onSlotFreed.add(subscription)
return subscription

method subscribeSlotReservationsFull*(
market: MockMarket,
callback: OnSlotReservationsFull): Future[Subscription] {.async.} =

let subscription =
SlotReservationsFullSubscription(market: market, callback: callback)
market.subscriptions.onSlotReservationsFull.add(subscription)
return subscription

method subscribeRequestCancelled*(market: MockMarket,
callback: OnRequestCancelled):
Future[Subscription] {.async.} =
Expand Down Expand Up @@ -481,3 +503,6 @@ method unsubscribe*(subscription: RequestFailedSubscription) {.async.} =

method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} =
subscription.market.subscriptions.onProofSubmitted.keepItIf(it != subscription)

method unsubscribe*(subscription: SlotReservationsFullSubscription) {.async.} =
subscription.market.subscriptions.onSlotReservationsFull.keepItIf(it != subscription)
6 changes: 6 additions & 0 deletions tests/codex/sales/testsales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))

test "removes slot index from slot queue once SlotReservationsFull emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitSlotReservationsFull(request1.id, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))

test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
Expand Down
24 changes: 24 additions & 0 deletions tests/contracts/testMarket.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,30 @@ ethersuite "On-Chain Market":
check receivedIdxs == @[slotIndex]
await subscription.unsubscribe()

test "supports slot reservations full subscriptions":
let account2 = ethProvider.getSigner(accounts[2])
let account3 = ethProvider.getSigner(accounts[3])

await market.requestStorage(request)

var receivedRequestIds: seq[RequestId] = @[]
var receivedIdxs: seq[UInt256] = @[]
proc onSlotReservationsFull(requestId: RequestId, idx: UInt256) =
receivedRequestIds.add(requestId)
receivedIdxs.add(idx)
let subscription =
await market.subscribeSlotReservationsFull(onSlotReservationsFull)

await market.reserveSlot(request.id, slotIndex)
switchAccount(account2)
await market.reserveSlot(request.id, slotIndex)
switchAccount(account3)
await market.reserveSlot(request.id, slotIndex)

check receivedRequestIds == @[request.id]
check receivedIdxs == @[slotIndex]
await subscription.unsubscribe()

test "support fulfillment subscriptions":
await market.requestStorage(request)
var receivedIds: seq[RequestId]
Expand Down
2 changes: 1 addition & 1 deletion vendor/codex-contracts-eth

0 comments on commit 7eb126b

Please sign in to comment.