Skip to content

Commit

Permalink
Merge branch 'master' into feature/ceremony-files
Browse files Browse the repository at this point in the history
  • Loading branch information
benbierens committed Jun 20, 2024
2 parents 118707f + 1a57341 commit 5a21c48
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 93 deletions.
207 changes: 117 additions & 90 deletions codex/sales/reservations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export logutils
logScope:
topics = "sales reservations"


type
AvailabilityId* = distinct array[32, byte]
ReservationId* = distinct array[32, byte]
Expand All @@ -71,7 +72,8 @@ type
size* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256
Reservations* = ref object
Reservations* = ref object of RootObj
availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability
repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
Expand All @@ -95,12 +97,22 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet

proc hash*(x: AvailabilityId): Hash {.borrow.}
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}

template withLock(lock, body) =
try:
await lock.acquire()
body
finally:
if lock.locked:
lock.release()


proc new*(T: type Reservations,
repo: RepoStore): Reservations =

T(repo: repo)
T(availabilityLock: newAsyncLock(),repo: repo)

proc init*(
_: type Availability,
Expand Down Expand Up @@ -221,20 +233,19 @@ proc updateImpl(

return success()

proc update*(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)

proc update*(
proc updateAvailability(
self: Reservations,
obj: Availability): Future[?!void] {.async.} =

logScope:
availabilityId = obj.id

without key =? obj.key, error:
return failure(error)

without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
Expand All @@ -248,14 +259,14 @@ proc update*(
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
return res
else:
return failure(err)

# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
Expand All @@ -280,11 +291,21 @@ proc update*(
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg

return res

proc update*(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)

proc update*(
self: Reservations,
obj: Availability): Future[?!void] {.async.} =
withLock(self.availabilityLock):
return await self.updateAvailability(obj)

proc delete(
self: Reservations,
key: Key): Future[?!void] {.async.} =
Expand Down Expand Up @@ -312,31 +333,32 @@ proc deleteReservation*(
without key =? key(reservationId, availabilityId), error:
return failure(error)

without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)

if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size
if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size

without availabilityKey =? availabilityId.key, error:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)

without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)

availability.freeSize += reservation.size
availability.freeSize += reservation.size

if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)

if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))

return success()
return success()

# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
Expand Down Expand Up @@ -370,54 +392,57 @@ proc createAvailability*(

return success(availability)

proc createReservation*(
method createReservation*(
self: Reservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
requestId: RequestId,
slotIndex: UInt256
): Future[?!Reservation] {.async.} =

trace "creating reservation", availabilityId, slotSize, requestId, slotIndex
): Future[?!Reservation] {.async, base.} =

let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)

without availabilityKey =? availabilityId.key, error:
return failure(error)
without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)

without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)

if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex

if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)

# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)

# update availability with reduced size
if updateErr =? (await self.update(availability)).errorOption:
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize

trace "rolling back reservation creation"
# update availability with reduced size
trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"

without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)

# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)

return failure(updateErr)
return failure(updateErr)

return success(reservation)
trace "Reservation succesfully created"
return success(reservation)

proc returnBytesToAvailability*(
self: Reservations,
Expand All @@ -429,47 +454,47 @@ proc returnBytesToAvailability*(
reservationId
availabilityId

withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)

without key =? key(reservationId, availabilityId), error:
return failure(error)

without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)

# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size

if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()

trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned

# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))

without availabilityKey =? availabilityId.key, error:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)

without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)

availability.freeSize += bytesToBeReturned
availability.freeSize += bytesToBeReturned

# Update availability with returned size
if updateErr =? (await self.update(availability)).errorOption:
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:

trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)

return failure(updateErr)
return failure(updateErr)

return success()
return success()

proc release*(
self: Reservations,
Expand Down Expand Up @@ -621,6 +646,7 @@ proc findAvailability*(
minPrice >= availability.minPrice:

trace "availability matched",
id = availability.id,
size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
Expand All @@ -635,6 +661,7 @@ proc findAvailability*(
return some availability

trace "availability did not match",
id = availability.id,
size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
Expand Down
14 changes: 14 additions & 0 deletions codex/sales/states/preparing.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pkg/questionable
import pkg/questionable/results
import pkg/metrics

import ../../logutils
import ../../market
Expand All @@ -13,6 +14,8 @@ import ./ignored
import ./downloading
import ./errored

declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")

type
SalePreparing* = ref object of ErrorHandlingState

Expand Down Expand Up @@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
request.id,
data.slotIndex
), error:
trace "Creation of reservation failed"
# Race condition:
# reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it.
# Should createReservation fail because there's no space, we proceed to SaleIgnored.
if error of BytesOutOfBoundsError:
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
codex_reservations_availability_mismatch.inc()
return some State(SaleIgnored())

return some State(SaleErrored(error: error))

trace "Reservation created succesfully"

data.reservation = some reservation
return some State(SaleDownloading())
Loading

0 comments on commit 5a21c48

Please sign in to comment.