Skip to content

Commit

Permalink
Added holdEvents API to processor
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 1, 2024
1 parent f680631 commit 189cdf9
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
Collection<OutputRecord> execute(Collection<InputRecord> records);

/**
* indicates if the processor holds the events or not
*/
default boolean holdsEvents() {
return false;
}


/**
* @since 1.2
* Indicates to the processor that shutdown is imminent and any data currently held by the Processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,12 @@ default AggregateActionOutput concludeGroup(final AggregateActionInput aggregate
return new AggregateActionOutput(Collections.emptyList());
}

/**
* indicates if the action holds the events or not
*
*/
default boolean holdsEvents() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public void shutdown() {

}

@Override
public boolean holdsEvents() {
return aggregateAction.holdsEvents();
}

@Override
public boolean isForLocalProcessingOnly(Event event) {
// no need to check for when condition here because it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelim
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
if (rateLimiterMode == RateLimiterMode.DROP) {
if (!rateLimiter.tryAcquire()) {
event.getEventHandle().release(true);
return AggregateActionResponse.nullEventResponse();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
List<Event> events = (List)groupState.getOrDefault(EVENTS_KEY, new ArrayList<>());
events.add(event);
// TODO: This event shouldn't be acknowledged by data prepper core
groupState.put(EVENTS_KEY, events);
if (condition != null && !condition.isEmpty() && expressionEvaluator.evaluateConditional(condition, event)) {
groupState.put(ERROR_STATUS_KEY, true);
Expand All @@ -78,6 +77,10 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) {
return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of()));
}
List<Event> events = (List)groupState.getOrDefault(EVENTS_KEY, List.of());
for (final Event event : events) {
event.getEventHandle().release(true);
}
return new AggregateActionOutput(List.of());
}

Expand Down

0 comments on commit 189cdf9

Please sign in to comment.