Skip to content

Commit

Permalink
YARN-11745: Fix TimSort contract violation in PriorityQueueComparator…
Browse files Browse the repository at this point in the history
… Class (#7278)
  • Loading branch information
Hean-Chhinling authored Jan 20, 2025
1 parent 06d36f5 commit 9bf5e38
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ public static int compare(double relativeAssigned1, double relativeAssigned2,
/**
* Comparator that both looks at priority and utilization
*/
final private class PriorityQueueComparator
final public class PriorityQueueComparator
implements Comparator<PriorityQueueResourcesForSorting> {

final private String partition;

private PriorityQueueComparator(String partition) {
public PriorityQueueComparator(String partition) {
this.partition = partition;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ private int compare(PriorityQueueResourcesForSorting q1Sort,
q1Sort.configuredMinResource;
Resource minEffRes2 =
q2Sort.configuredMinResource;
if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
if (!minEffRes1.equals(Resources.none()) || !minEffRes2.equals(
Resources.none())) {
return minEffRes2.compareTo(minEffRes1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Collections;

import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand Down Expand Up @@ -309,6 +311,58 @@ public void testComparatorDoesNotValidateGeneralContract() {
assertDoesNotThrow(() -> policy.getAssignmentIterator(partition));
}

@Test
public void testComparatorClassDoesNotViolateTimSortContract() {
String partition = "testPartition";

List<PriorityUtilizationQueueOrderingPolicy.
PriorityQueueResourcesForSorting> queues = new ArrayList<>();
for (int i = 0; i < 1000; i++) { // 1000 queues to have enough queues so the exception occur
queues.add(createMockPriorityQueueResourcesForSorting(partition));
}

Collections.shuffle(queues);
// java.lang.IllegalArgumentException: Comparison method violates its general contract!
assertDoesNotThrow(() -> queues.sort(new PriorityUtilizationQueueOrderingPolicy(true)
.new PriorityQueueComparator(partition)));

}

private PriorityUtilizationQueueOrderingPolicy.
PriorityQueueResourcesForSorting createMockPriorityQueueResourcesForSorting(
String partition) {
QueueResourceQuotas resourceQuotas = randomResourceQuotas(partition);

boolean isZeroResource = ThreadLocalRandom.current().nextBoolean();
if (isZeroResource) {
resourceQuotas.setConfiguredMinResource(partition, Resource.newInstance(0, 0));
}

QueueCapacities mockQueueCapacities = mock(QueueCapacities.class);
when(mockQueueCapacities.getAbsoluteUsedCapacity(partition))
.thenReturn(4.2f); // could be any specific number, so that there are equal values
when(mockQueueCapacities.getUsedCapacity(partition))
.thenReturn(1.0f); // could be any specific number, so that there are equal values
when(mockQueueCapacities.getAbsoluteCapacity(partition))
.thenReturn(6.2f); // could be any specific number, so that there are equal values

CSQueue mockQueue = mock(CSQueue.class);
when(mockQueue.getQueueCapacities())
.thenReturn(mockQueueCapacities);
when(mockQueue.getPriority())
.thenReturn(Priority.newInstance(7)); // could be any specific number,
// so that there are equal values
when(mockQueue.getAccessibleNodeLabels())
.thenReturn(Collections.singleton(partition));
when(mockQueue.getQueueResourceQuotas())
.thenReturn(resourceQuotas);

return new PriorityUtilizationQueueOrderingPolicy.PriorityQueueResourcesForSorting(
mockQueue, partition
);

}

private QueueCapacities randomQueueCapacities(String partition) {
QueueCapacities qc = new QueueCapacities(false);
qc.setAbsoluteCapacity(partition, (float) randFloat(0.0d, 100.0d));
Expand Down

0 comments on commit 9bf5e38

Please sign in to comment.