Skip to content

Commit

Permalink
[Dataflow Streaming] fix max thread time calculation (#33686)
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp authored Jan 23, 2025
1 parent da94e20 commit dc7426f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class BoundedQueueExecutor {

private final ThreadPoolExecutor executor;
private final long maximumBytesOutstanding;

Expand All @@ -54,17 +55,17 @@ public class BoundedQueueExecutor {
private long totalTimeMaxActiveThreadsUsed;

public BoundedQueueExecutor(
int maximumPoolSize,
int initialMaximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory) {
this.maximumPoolSize = maximumPoolSize;
this.maximumPoolSize = initialMaximumPoolSize;
executor =
new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
initialMaximumPoolSize,
initialMaximumPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
// released (2.11.0)
@SuppressWarnings("unused")
public class BoundedQueueExecutorTest {

private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final int DEFAULT_MAX_THREADS = 2;
private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
Expand Down Expand Up @@ -247,7 +248,8 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
}

@Test
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated() throws Exception {
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncreased()
throws Exception {
CountDownLatch processStart1 = new CountDownLatch(1);
CountDownLatch processStart2 = new CountDownLatch(1);
CountDownLatch processStart3 = new CountDownLatch(1);
Expand Down Expand Up @@ -287,6 +289,58 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated()
executor.shutdown();
}

@Test
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced()
throws Exception {
CountDownLatch processStart1 = new CountDownLatch(1);
CountDownLatch processStop1 = new CountDownLatch(1);
CountDownLatch processStart2 = new CountDownLatch(1);
CountDownLatch processStop2 = new CountDownLatch(1);
CountDownLatch processStart3 = new CountDownLatch(1);
CountDownLatch processStop3 = new CountDownLatch(1);
Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);

// Initial state.
assertEquals(0, executor.activeCount());
assertEquals(2, executor.getMaximumPoolSize());

// m1 is accepted.
executor.execute(m1, 1);
processStart1.await();
assertEquals(1, executor.activeCount());
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(0L, executor.allThreadsActiveTime());

processStop1.countDown();
while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}

// Reduce max pool size to 1
executor.setMaximumPoolSize(1, 105);

assertEquals(0, executor.activeCount());
executor.execute(m2, 1);
processStart2.await();
Thread.sleep(100);
assertEquals(1, executor.activeCount());
assertEquals(1, executor.getMaximumPoolSize());
processStop2.countDown();

while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}

// allThreadsActiveTime() should be recorded
// since when the second task was running it reached the new max pool size.
assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
executor.shutdown();
}

@Test
public void testRenderSummaryHtml() {
String expectedSummaryHtml =
Expand Down

0 comments on commit dc7426f

Please sign in to comment.