Skip to content

Commit

Permalink
SeekableStreamSupervisor: Coalesce adjacent RunNotices. (#58)
Browse files Browse the repository at this point in the history
The idea is that if multiple notices come in around the same time due
to rapid task status changes, we only need to execute one of them.

(cherry picked from commit 5147b2d)

Co-authored-by: Gian Merlino <[email protected]>
  • Loading branch information
harinirajendran and gianm committed Feb 22, 2022
1 parent 27a31d4 commit 86028ea
Showing 1 changed file with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -282,6 +282,15 @@ private interface Notice
String getType();

void handle() throws ExecutionException, InterruptedException, TimeoutException;

/**
* Whether this notice can also handle the work of another notice. Used to coalesce notices and avoid
* redundant work.
*/
default boolean canAlsoHandle(Notice otherNotice)
{
return false;
}
}

private static class StatsFromTaskResult
Expand Down Expand Up @@ -338,6 +347,12 @@ public String getType()
{
return TYPE;
}

@Override
public boolean canAlsoHandle(Notice otherNotice)
{
return otherNotice.getType().equals(TYPE);
}
}

// change taskCount without resubmitting.
Expand Down Expand Up @@ -653,7 +668,7 @@ public String getType()
private final ScheduledExecutorService scheduledExec;
private final ScheduledExecutorService reportingExec;
private final ListeningExecutorService workerExec;
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final BlockingDeque<Notice> notices = new LinkedBlockingDeque<>();
private final Object stopLock = new Object();
private final Object stateChangeLock = new Object();
private final ReentrantLock recordSupplierLock = new ReentrantLock();
Expand Down Expand Up @@ -947,6 +962,12 @@ public void tryInit()
continue;
}

// Coalesce notices.
Notice nextNotice;
while ((nextNotice = notices.peek()) != null && notice.canAlsoHandle(nextNotice)) {
notices.removeFirst();
}

try {
Instant handleNoticeStartTime = Instant.now();
notice.handle();
Expand Down

0 comments on commit 86028ea

Please sign in to comment.