Skip to content

Commit

Permalink
[marketplace] Availability improvements (#535)
Browse files Browse the repository at this point in the history
## Problem
When Availabilities are created, the amount of bytes in the Availability are reserved in the repo, so those bytes on disk cannot be written to otherwise. When a request for storage is received by a node, if a previously created Availability is matched, an attempt will be made to fill a slot in the request (more accurately, the request's slots are added to the SlotQueue, and eventually those slots will be processed). During download, bytes that were reserved for the Availability were released (as they were written to disk). To prevent more bytes from being released than were reserved in the Availability, the Availability was marked as used during the download, so that no other requests would match the Availability, and therefore no new downloads (and byte releases) would begin. The unfortunate downside to this, is that the number of Availabilities a node has determines the download concurrency capacity. If, for example, a node creates a single Availability that covers all available disk space the operator is willing to use, that single Availability would mean that only one download could occur at a time, meaning the node could potentially miss out on storage opportunities.

## Solution
To alleviate the concurrency issue, each time a slot is processed, a Reservation is created, which takes size (aka reserved bytes) away from the Availability and stores them in the Reservation object. This can be done as many times as needed as long as there are enough bytes remaining in the Availability. Therefore, concurrent downloads are no longer limited by the number of Availabilities. Instead, they would more likely be limited to the SlotQueue's `maxWorkers`.

From a database design perspective, an Availability has zero or more Reservations.

Reservations are persisted in the RepoStore's metadata, along with Availabilities. The metadata store key path for Reservations is ` meta / sales / reservations / <availabilityId> / <reservationId>`, while Availabilities are stored one level up, eg `meta / sales / reservations / <availabilityId> `, allowing all Reservations for an Availability to be queried (this is not currently needed, but may be useful when work to restore Availability size is implemented, more on this later).

### Lifecycle
When a reservation is created, its size is deducted from the Availability, and when a reservation is deleted, any remaining size (bytes not written to disk) is returned to the Availability. If the request finishes, is cancelled (expired), or an error occurs, the Reservation is deleted (and any undownloaded bytes returned to the Availability). In addition, when the Sales module starts, any Reservations that are not actively being used in a filled slot, are deleted.

Having a Reservation persisted until after a storage request is completed, will allow for the originally set Availability size to be reclaimed once a request contract has been completed. This is a feature that is yet to be implemented, however the work in this PR is a step in the direction towards enabling this.

### Unknowns
Reservation size is determined by the `StorageAsk.slotSize`. If during download, more bytes than `slotSize` are attempted to be downloaded than this, then the Reservation update will fail, and the state machine will move to a `SaleErrored` state, deleting the Reservation. This will likely prevent the slot from being filled.

### Notes
Based on #514
  • Loading branch information
emizzle authored Sep 29, 2023
1 parent 2f1c778 commit 570a1f7
Show file tree
Hide file tree
Showing 23 changed files with 837 additions and 483 deletions.
8 changes: 7 additions & 1 deletion codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ proc requestStorage*(
let request = StorageRequest(
ask: StorageAsk(
slots: nodes + tolerance,
slotSize: (encoded.blockSize.int * encoded.steps).u256,
# TODO: Specify slot-specific size (as below) once dispersal is
# implemented. The current implementation downloads the entire dataset, so
# it is currently set to be the size of the entire dataset. This is
# because the slotSize is used to determine the amount of bytes to reserve
# in a Reservations
# TODO: slotSize: (encoded.blockSize.int * encoded.steps).u256,
slotSize: (encoded.blockSize.int * encoded.blocks.len).u256,
duration: duration,
proofProbability: proofProbability,
reward: reward,
Expand Down
25 changes: 13 additions & 12 deletions codex/rest/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")

without unused =? (await contracts.sales.context.reservations.unused), err:
without avails =? (await contracts.sales.context.reservations.all(Availability)), err:
return RestApiResponse.error(Http500, err.msg)

let json = %unused
let json = %avails
return RestApiResponse.response($json, contentType="application/json")

router.rawApi(
Expand All @@ -365,20 +365,21 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
return RestApiResponse.error(Http400, error.msg)

let reservations = contracts.sales.context.reservations
# assign id to availability via init
let availability = Availability.init(restAv.size,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)

if not reservations.hasAvailable(availability.size.truncate(uint)):
if not reservations.hasAvailable(restAv.size.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")

if err =? (await reservations.reserve(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
without availability =? (
await reservations.createAvailability(
restAv.size,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
), error:
return RestApiResponse.error(Http500, error.msg)

let json = %availability
return RestApiResponse.response($json, contentType="application/json")
return RestApiResponse.response(availability.toJson,
contentType="application/json")

router.api(
MethodGet,
Expand Down
102 changes: 86 additions & 16 deletions codex/sales.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import std/sequtils
import std/sugar
import std/tables
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/chronicles
import pkg/datastore
Expand Down Expand Up @@ -101,8 +101,49 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
if sales.running:
sales.agents.keepItIf(it != agent)

proc filled(sales: Sales,
processing: Future[void]) =
proc cleanUp(sales: Sales,
agent: SalesAgent,
processing: Future[void]) {.async.} =

let data = agent.data

trace "cleaning up sales agent",
requestId = data.requestId,
slotIndex = data.slotIndex,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default

# TODO: return bytes that were used in the request back to the availability
# as well, which will require removing the bytes from disk (perhaps via
# setting blockTTL to -1 and then running block maintainer?)

# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
reservation.id,
reservation.availabilityId
)).errorOption:
error "failure deleting reservation",
error = deleteErr.msg,
reservationId = reservation.id,
availabilityId = reservation.availabilityId

await sales.remove(agent)

# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()

proc filled(
sales: Sales,
request: StorageRequest,
slotIndex: UInt256,
processing: Future[void]) =

if onSale =? sales.context.onSale:
onSale(request, slotIndex)

# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()

Expand All @@ -117,15 +158,39 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)

agent.context.onCleanUp = proc {.async.} =
await sales.remove(agent)
agent.onCleanUp = proc {.async.} =
await sales.cleanUp(agent, done)

agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done)

agent.start(SalePreparing())
sales.agents.add agent

proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} =
let reservations = sales.context.reservations
without reservs =? await reservations.all(Reservation):
info "no unused reservations found for deletion"

let unused = reservs.filter(r => (
let slotId = slotId(r.requestId, r.slotIndex)
not activeSlots.any(slot => slot.id == slotId)
))
info "found unused reservations for deletion", unused = unused.len

for reservation in unused:

logScope:
reservationId = reservation.id
availabilityId = reservation.availabilityId

if err =? (await reservations.deleteReservation(
reservation.id, reservation.availabilityId
)).errorOption:
error "failed to delete unused reservation", error = err.msg
else:
trace "deleted unused reservation"

proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
let market = sales.context.market
let slotIds = await market.mySlots()
Expand All @@ -139,21 +204,26 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
return slots

proc load*(sales: Sales) {.async.} =
let slots = await sales.mySlots()
let activeSlots = await sales.mySlots()

await sales.deleteInactiveReservations(activeSlots)

for slot in slots:
for slot in activeSlots:
let agent = newSalesAgent(
sales.context,
slot.request.id,
slot.slotIndex,
some slot.request)

agent.context.onCleanUp = proc {.async.} = await sales.remove(agent)
agent.onCleanUp = proc {.async.} =
let done = newFuture[void]("onCleanUp_Dummy")
await sales.cleanUp(agent, done)
await done # completed in sales.cleanUp

agent.start(SaleUnknown())
sales.agents.add agent

proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
## Query last 256 blocks for new requests, adding them to the queue. `push`
## checks for availability before adding to the queue. If processed, the
## sales agent will check if the slot is free.
Expand All @@ -162,9 +232,9 @@ proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =
let queue = context.slotQueue

logScope:
topics = "marketplace sales onReservationAdded callback"
topics = "marketplace sales onAvailabilityAdded callback"

trace "reservation added, querying past storage requests to add to queue"
trace "availability added, querying past storage requests to add to queue"

try:
let events = await market.queryPastStorageRequests(256)
Expand Down Expand Up @@ -384,10 +454,10 @@ proc startSlotQueue(sales: Sales) {.async.} =

asyncSpawn slotQueue.start()

reservations.onReservationAdded =
proc(availability: Availability) {.async.} =
await sales.onReservationAdded(availability)
proc onAvailabilityAdded(availability: Availability) {.async.} =
await sales.onAvailabilityAdded(availability)

reservations.onAvailabilityAdded = onAvailabilityAdded

proc subscribe(sales: Sales) {.async.} =
await sales.subscribeRequested()
Expand Down
Loading

0 comments on commit 570a1f7

Please sign in to comment.