Skip to content

Commit

Permalink
feat: replace multiple updates requests with a single requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
armory-abedonik committed Apr 14, 2023
1 parent c0788cb commit dc8eb74
Showing 1 changed file with 3 additions and 30 deletions.
33 changes: 3 additions & 30 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -239,25 +239,11 @@ class SqlQueue(
* Selects the primary key ulid's of up to ([maxMessages] * 3) ready and unlocked messages,
* sorted by delivery time.
*
* To minimize lock contention, this is a non-locking read. The id's returned may be
* locked or removed by another instance before we can acquire them. We read more id's
* than [maxMessages] and shuffle them to decrease the likelihood that multiple instances
* polling concurrently are all competing for the oldest ready messages when many more
* than [maxMessages] are read.
*
* Candidate rows are locked via an autocommit update query by primary key that will
* only modify unlocked rows. When (candidates > maxMessages), a sliding window is used
* to traverse the shuffled candidates, sized to (maxMessages - changed) with up-to 3
* attempts (and update queries) to grab [maxMessages].
*
* I.e. if maxMessage == 5 and
* candidates == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].shuffle() == [9, 3, 7, 1, 10, 8, 5, 2, 6, 4]
*
* - pass1: attempts to claim [9, 3, 7, 1, 10], locks 3 messages
* - pass2: attempts to claim [8, 5], locks 1 message
* - pass3: attempt to claim [2], succeeds but if not, there are no further attempts
* - proceeds to process 5 messages locked via 3 update queries.
*
* This makes a trade-off between grabbing the maximum number of ready messages per poll cycle
* vs. minimizing [poll] runtime which is also critical to throughput. In testing a scenario
* with up-to 100k ready messages and 7 orca/keiko-sql instances with [fillExecutorEachCycle]
Expand All @@ -269,7 +255,7 @@ class SqlQueue(
* [MaxAttemptsAttribute] has been set to a positive integer. Otherwise,
* [AttemptsAttribute] is unused.
*/
var candidates = jooq.select(idField)
val candidates = jooq.select(idField)
.from(queueTable)
.where(deliveryField.le(now), lockedField.eq("0"))
.orderBy(deliveryField.asc())
Expand All @@ -281,23 +267,10 @@ class SqlQueue(
return
}

// Ordering is essential to prevent Deadlock in PostgreSQL datasource.
candidates = candidates.sorted()

var position = 0
var passes = 0
while (changed < maxMessages && position < candidates.size && passes < 3) {
passes++
val sliceNext = min(maxMessages - 1 - changed, candidates.size - 1 - position)
val ids = candidates.slice(IntRange(position, position + sliceNext))
when (sliceNext) {
0 -> position++
else -> position += sliceNext
}

candidates.parallelStream().forEach {
changed += jooq.update(queueTable)
.set(lockedField, "$lockId:$now")
.where(idField.`in`(*ids.toTypedArray()), lockedField.eq("0"))
.where(idField.eq(it), lockedField.eq("0"))
.execute()
}

Expand Down

0 comments on commit dc8eb74

Please sign in to comment.