Skip to content

Commit

Permalink
KAFKA-15915: Flaky ProducerIdManagerTest error injection fix (apache#…
Browse files Browse the repository at this point in the history
…15605)

testUnrecoverableErrors was flaky as the wanted error either affected the next block request (prefecthing) or just missed that.

First I tried to wait for the background thread to be finished before setting the Errors.X. But then it consistently failed, because the generateProducerId call does prefetching too and after a successful producer id generation we set the error and expected that it will fail again with coordinator-load-in-progress exception but since the block was prefetched, it was able to serve us with a proper producer id.

    calling generateProducerId --> no current block exists, so requesting block --> CoordinatorLoadInProgressException
    asserting exception
    calling generateProducerId again --> prefetching, requesting the next block --> giving back the producer id from the first block
    asserting received producer id
    setting error -- waiting for the background callback(s) to be finished first
    calling generateProducerId, expecting CoordinatorLoadInProgressException, but --> works like 2), just the prefetching callback is failing due to the error we set before

Note: without the waiting for the background thread completions the error setting could happened before the 2) step's callback or after that, the test was written in a way that it expected to happen before the cb.

This was the point I realised that we need to have a queue to control the responses rather than trying to do it in the middle of the test method.

Errors can be passed in a queue at creation of the mock id manager instead modifying on-the-fly.
In the queue we're specifying Errors, how the background thread (which imitates the controllerChannel) should behave, return an error or a proper response and call the callback accordingly with that.

I was able to simplify the mock manager id class as well, no need for the maybeRequestNextBlock overriding if the errors are handled this way via a queue.

Reviewers: Igor Soarez <[email protected]>, Daniel Urban <[email protected]>, Viktor Somogyi-Vass <[email protected]>
  • Loading branch information
akatona84 authored Apr 5, 2024
1 parent 16019b0 commit 5e4c7da
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ class RPCProducerIdManager(brokerId: Int,
}


// Visible for testing
private[transaction] def maybeRequestNextBlock(): Unit = {
private def maybeRequestNextBlock(): Unit = {
val retryTimestamp = backoffDeadlineMs.get()
if (retryTimestamp == NoRetry || time.milliseconds() >= retryTimestamp) {
// Send a request only if we reached the retry deadline, or if no deadline was set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, when}

import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

class ProducerIdManagerTest {

Expand All @@ -50,10 +50,9 @@ class ProducerIdManagerTest {
val brokerId: Int,
var idStart: Long,
val idLen: Int,
var error: Errors = Errors.NONE,
val errorQueue: ConcurrentLinkedQueue[Errors] = new ConcurrentLinkedQueue[Errors](),
val isErroneousBlock: Boolean = false,
val time: Time = Time.SYSTEM,
var remainingRetries: Int = 1
val time: Time = Time.SYSTEM
) extends RPCProducerIdManager(brokerId, time, () => 1, brokerToController) {

private val brokerToControllerRequestExecutor = Executors.newSingleThreadExecutor()
Expand All @@ -62,7 +61,8 @@ class ProducerIdManagerTest {
override private[transaction] def sendRequest(): Unit = {

brokerToControllerRequestExecutor.submit(() => {
if (error == Errors.NONE) {
val error = errorQueue.poll()
if (error == null || error == Errors.NONE) {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
if (!isErroneousBlock) {
Expand All @@ -79,17 +79,6 @@ class ProducerIdManagerTest {
super.handleAllocateProducerIdsResponse(response)
capturedFailure.set(nextProducerIdBlock.get == null)
}

override private[transaction] def maybeRequestNextBlock(): Unit = {
if (error == Errors.NONE && !isErroneousBlock) {
super.maybeRequestNextBlock()
} else {
if (remainingRetries > 0) {
super.maybeRequestNextBlock()
remainingRetries -= 1
}
}
}
}

@Test
Expand Down Expand Up @@ -190,15 +179,12 @@ class ProducerIdManagerTest {
@EnumSource(value = classOf[Errors], names = Array("UNKNOWN_SERVER_ERROR", "INVALID_REQUEST"))
def testUnrecoverableErrors(error: Errors): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1, time = time)
val manager = new MockProducerIdManager(0, 0, 1, errorQueue = queue(Errors.NONE, error), time = time)

verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)

manager.error = error
time.sleep(RetryBackoffMs)
verifyFailure(manager)

manager.error = Errors.NONE
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 1, 1), 1)
}
Expand All @@ -219,19 +205,24 @@ class ProducerIdManagerTest {
def testRetryBackoff(): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1,
error = Errors.UNKNOWN_SERVER_ERROR, time = time, remainingRetries = 2)
errorQueue = queue(Errors.UNKNOWN_SERVER_ERROR), time = time)

verifyFailure(manager)
manager.error = Errors.NONE

// We should only get a new block once retry backoff ms has passed.
assertEquals(classOf[CoordinatorLoadInProgressException], manager.generateProducerId().failed.get.getClass)
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}

private def queue(errors: Errors*): ConcurrentLinkedQueue[Errors] = {
val queue = new ConcurrentLinkedQueue[Errors]()
errors.foreach(queue.add)
queue
}

private def verifyFailure(manager: MockProducerIdManager): Unit = {
assertEquals(classOf[CoordinatorLoadInProgressException], manager.generateProducerId().failed.get.getClass)
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
TestUtils.waitUntilTrue(() => {
manager synchronized {
manager.capturedFailure.get
Expand All @@ -244,12 +235,17 @@ class ProducerIdManagerTest {
expectedBlock: ProducerIdsBlock,
expectedPid: Long): Unit = {

assertEquals(classOf[CoordinatorLoadInProgressException], manager.generateProducerId().failed.get.getClass)
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
TestUtils.waitUntilTrue(() => {
val nextBlock = manager.nextProducerIdBlock.get
nextBlock != null && nextBlock.equals(expectedBlock)
}, "failed to generate block")
assertEquals(expectedPid, manager.generateProducerId().get)
}

private def assertCoordinatorLoadInProgressExceptionFailure(generatedProducerId: Try[Long]): Unit = {
assertTrue(generatedProducerId.isFailure, () => s"expected failure but got producerId: ${generatedProducerId.get}")
assertEquals(classOf[CoordinatorLoadInProgressException], generatedProducerId.failed.get.getClass)
}
}

0 comments on commit 5e4c7da

Please sign in to comment.