diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index bb4ba0deb1ff..e23f5fb05275 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -251,7 +251,12 @@ public void executePrefetchInternal() { } // nack pollable event and re-enqueue it to prefetchingQueue - if (ev.pollable()) { + if (ev.eagerlyPollable()) { + ev.nack(); // now pollable (the nack operation here is actually unnecessary) + enqueueEventToPrefetchingQueue(ev); + // no need to log warn for eagerly pollable event + return null; // remove this entry + } else if (ev.pollable()) { ev.nack(); // now pollable enqueueEventToPrefetchingQueue(ev); LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index d3672e2e21e7..2f7902f1c6b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -190,6 +190,11 @@ public void recordLastPolledTimestamp() { } while (!lastPolledTimestamp.compareAndSet(currentTimestamp, newTimestamp)); } + /** + * @return {@code true} if this event is pollable, including eagerly pollable (by active nack) and + * lazily pollable (by inactive recycle); For events that have already been committed, they + * are not pollable. + */ public boolean pollable() { if (isCommitted()) { return false; @@ -200,6 +205,17 @@ public boolean pollable() { return canRecycle(); } + /** + * @return {@code true} if this event is eagerly pollable; For events that have already been + * committed, they are not pollable. + */ + public boolean eagerlyPollable() { + if (isCommitted()) { + return false; + } + return lastPolledTimestamp.get() == INVALID_TIMESTAMP; + } + private boolean canRecycle() { // Recycle events that may not be able to be committed, i.e., those that have been polled but // not committed within a certain period of time. @@ -403,9 +419,7 @@ public String getFileName() { @Override public String toString() { - return "SubscriptionEvent{pipeEvents=" - + pipeEvents.toString() - + ", responses=" + return "SubscriptionEvent{responses=" + Arrays.toString(responses) + ", responses' byte buffer size=" + Arrays.stream(byteBuffers) @@ -423,6 +437,8 @@ public String toString() { + lastPolledTimestamp + ", committedTimestamp=" + committedTimestamp + + ", pipeEvents=" + + pipeEvents + "}"; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index abd73e030696..54363ac8c7cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -201,15 +201,24 @@ public String toString() { @Override protected Map coreReportMessage() { final Map coreReportMessage = super.coreReportMessage(); - coreReportMessage.put( - "enrichedEvents", - enrichedEvents.stream() - .map(EnrichedEvent::coreReportMessage) - .collect(Collectors.toList()) - .toString()); + coreReportMessage.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4)); coreReportMessage.put("size of tablets", String.valueOf(tablets.size())); coreReportMessage.put("firstEventProcessingTime", String.valueOf(firstEventProcessingTime)); coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize)); return coreReportMessage; } + + private static String formatEnrichedEvents( + final List enrichedEvents, final int threshold) { + final List eventMessageList = + enrichedEvents.stream() + .limit(threshold) + .map(EnrichedEvent::coreReportMessage) + .collect(Collectors.toList()); + if (eventMessageList.size() > threshold) { + eventMessageList.add( + String.format("omit the remaining %s event(s)...", eventMessageList.size() - threshold)); + } + return eventMessageList.toString(); + } }