Skip to content

Commit

Permalink
Subscription: reduce the length of the stringified subscription event…
Browse files Browse the repository at this point in the history
… & do not print warn logs for eagerly pollable events (#13405)
  • Loading branch information
VGalaxies authored Sep 5, 2024
1 parent beabc19 commit 717c2a7
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ public void executePrefetchInternal() {
}

// nack pollable event and re-enqueue it to prefetchingQueue
if (ev.pollable()) {
if (ev.eagerlyPollable()) {
ev.nack(); // now pollable (the nack operation here is actually unnecessary)
enqueueEventToPrefetchingQueue(ev);
// no need to log warn for eagerly pollable event
return null; // remove this entry
} else if (ev.pollable()) {
ev.nack(); // now pollable
enqueueEventToPrefetchingQueue(ev);
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public void recordLastPolledTimestamp() {
} while (!lastPolledTimestamp.compareAndSet(currentTimestamp, newTimestamp));
}

/**
* @return {@code true} if this event is pollable, including eagerly pollable (by active nack) and
* lazily pollable (by inactive recycle); For events that have already been committed, they
* are not pollable.
*/
public boolean pollable() {
if (isCommitted()) {
return false;
Expand All @@ -200,6 +205,17 @@ public boolean pollable() {
return canRecycle();
}

/**
* @return {@code true} if this event is eagerly pollable; For events that have already been
* committed, they are not pollable.
*/
public boolean eagerlyPollable() {
if (isCommitted()) {
return false;
}
return lastPolledTimestamp.get() == INVALID_TIMESTAMP;
}

private boolean canRecycle() {
// Recycle events that may not be able to be committed, i.e., those that have been polled but
// not committed within a certain period of time.
Expand Down Expand Up @@ -403,9 +419,7 @@ public String getFileName() {

@Override
public String toString() {
return "SubscriptionEvent{pipeEvents="
+ pipeEvents.toString()
+ ", responses="
return "SubscriptionEvent{responses="
+ Arrays.toString(responses)
+ ", responses' byte buffer size="
+ Arrays.stream(byteBuffers)
Expand All @@ -423,6 +437,8 @@ public String toString() {
+ lastPolledTimestamp
+ ", committedTimestamp="
+ committedTimestamp
+ ", pipeEvents="
+ pipeEvents
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,24 @@ public String toString() {
@Override
protected Map<String, String> coreReportMessage() {
final Map<String, String> coreReportMessage = super.coreReportMessage();
coreReportMessage.put(
"enrichedEvents",
enrichedEvents.stream()
.map(EnrichedEvent::coreReportMessage)
.collect(Collectors.toList())
.toString());
coreReportMessage.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
coreReportMessage.put("size of tablets", String.valueOf(tablets.size()));
coreReportMessage.put("firstEventProcessingTime", String.valueOf(firstEventProcessingTime));
coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
return coreReportMessage;
}

private static String formatEnrichedEvents(
final List<EnrichedEvent> enrichedEvents, final int threshold) {
final List<String> eventMessageList =
enrichedEvents.stream()
.limit(threshold)
.map(EnrichedEvent::coreReportMessage)
.collect(Collectors.toList());
if (eventMessageList.size() > threshold) {
eventMessageList.add(
String.format("omit the remaining %s event(s)...", eventMessageList.size() - threshold));
}
return eventMessageList.toString();
}
}

0 comments on commit 717c2a7

Please sign in to comment.