From 189cdf9ab330f051cfcf26a7a65f9767053eef4d Mon Sep 17 00:00:00 2001 From: Kondaka Date: Thu, 31 Oct 2024 21:46:47 -0700 Subject: [PATCH] Added holdEvents API to processor Signed-off-by: Kondaka --- .../opensearch/dataprepper/model/processor/Processor.java | 8 ++++++++ .../dataprepper/core/pipeline/ProcessWorker.java | 3 ++- .../plugins/processor/aggregate/AggregateAction.java | 8 ++++++++ .../plugins/processor/aggregate/AggregateProcessor.java | 5 +++++ .../aggregate/actions/RateLimiterAggregateAction.java | 1 + .../aggregate/actions/TailSamplerAggregateAction.java | 5 ++++- 6 files changed, 28 insertions(+), 2 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 551aed3d01..f950ff4789 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -26,6 +26,14 @@ public interface Processor, OutputRecord extends R */ Collection execute(Collection records); + /** + * indicates if the processor holds the events or not + */ + default boolean holdsEvents() { + return false; + } + + /** * @since 1.2 * Indicates to the processor that shutdown is imminent and any data currently held by the Processor diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index e313430b49..8fb314fd83 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -137,7 +137,8 @@ private void doRun() { try { records = processor.execute(records); - if (inputEvents != null) { + // acknowledge missing events only if the processor is not holding events + if (!processor.holdsEvents() && inputEvents != null) { processAcknowledgements(inputEvents, records); } } catch (final Exception e) { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java index ae798af032..3afcf94464 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java @@ -41,4 +41,12 @@ default AggregateActionOutput concludeGroup(final AggregateActionInput aggregate return new AggregateActionOutput(Collections.emptyList()); } + /** + * indicates if the action holds the events or not + * + */ + default boolean holdsEvents() { + return false; + } + } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 098f7e93bc..3ad393cd1f 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -189,6 +189,11 @@ public void shutdown() { } + @Override + public boolean holdsEvents() { + return aggregateAction.holdsEvents(); + } + @Override public boolean isForLocalProcessingOnly(Event event) { // no need to check for when condition here because it is diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java index 3ea0d0b8af..5f69bd5abc 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java @@ -35,6 +35,7 @@ public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelim public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { if (rateLimiterMode == RateLimiterMode.DROP) { if (!rateLimiter.tryAcquire()) { + event.getEventHandle().release(true); return AggregateActionResponse.nullEventResponse(); } } else { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java index 29cf2bd6ae..216666245d 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java @@ -62,7 +62,6 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct } List events = (List)groupState.getOrDefault(EVENTS_KEY, new ArrayList<>()); events.add(event); - // TODO: This event shouldn't be acknowledged by data prepper core groupState.put(EVENTS_KEY, events); if (condition != null && !condition.isEmpty() && expressionEvaluator.evaluateConditional(condition, event)) { groupState.put(ERROR_STATUS_KEY, true); @@ -78,6 +77,10 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) { return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of())); } + List events = (List)groupState.getOrDefault(EVENTS_KEY, List.of()); + for (final Event event : events) { + event.getEventHandle().release(true); + } return new AggregateActionOutput(List.of()); }