From dc8eb743fa351437fe5135f1a6e014f61d14f187 Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Fri, 14 Apr 2023 13:43:34 +0200 Subject: [PATCH] feat: replace multiple updates requests with a single requests. --- .../com/netflix/spinnaker/q/sql/SqlQueue.kt | 33 ++----------------- 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index f16cb74b696..6d91ab6f204 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -239,25 +239,11 @@ class SqlQueue( * Selects the primary key ulid's of up to ([maxMessages] * 3) ready and unlocked messages, * sorted by delivery time. * - * To minimize lock contention, this is a non-locking read. The id's returned may be - * locked or removed by another instance before we can acquire them. We read more id's - * than [maxMessages] and shuffle them to decrease the likelihood that multiple instances - * polling concurrently are all competing for the oldest ready messages when many more - * than [maxMessages] are read. - * * Candidate rows are locked via an autocommit update query by primary key that will * only modify unlocked rows. When (candidates > maxMessages), a sliding window is used * to traverse the shuffled candidates, sized to (maxMessages - changed) with up-to 3 * attempts (and update queries) to grab [maxMessages]. * - * I.e. if maxMessage == 5 and - * candidates == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].shuffle() == [9, 3, 7, 1, 10, 8, 5, 2, 6, 4] - * - * - pass1: attempts to claim [9, 3, 7, 1, 10], locks 3 messages - * - pass2: attempts to claim [8, 5], locks 1 message - * - pass3: attempt to claim [2], succeeds but if not, there are no further attempts - * - proceeds to process 5 messages locked via 3 update queries. - * * This makes a trade-off between grabbing the maximum number of ready messages per poll cycle * vs. minimizing [poll] runtime which is also critical to throughput. In testing a scenario * with up-to 100k ready messages and 7 orca/keiko-sql instances with [fillExecutorEachCycle] @@ -269,7 +255,7 @@ class SqlQueue( * [MaxAttemptsAttribute] has been set to a positive integer. Otherwise, * [AttemptsAttribute] is unused. */ - var candidates = jooq.select(idField) + val candidates = jooq.select(idField) .from(queueTable) .where(deliveryField.le(now), lockedField.eq("0")) .orderBy(deliveryField.asc()) @@ -281,23 +267,10 @@ class SqlQueue( return } - // Ordering is essential to prevent Deadlock in PostgreSQL datasource. - candidates = candidates.sorted() - - var position = 0 - var passes = 0 - while (changed < maxMessages && position < candidates.size && passes < 3) { - passes++ - val sliceNext = min(maxMessages - 1 - changed, candidates.size - 1 - position) - val ids = candidates.slice(IntRange(position, position + sliceNext)) - when (sliceNext) { - 0 -> position++ - else -> position += sliceNext - } - + candidates.parallelStream().forEach { changed += jooq.update(queueTable) .set(lockedField, "$lockId:$now") - .where(idField.`in`(*ids.toTypedArray()), lockedField.eq("0")) + .where(idField.eq(it), lockedField.eq("0")) .execute() }