Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Nov 3, 2023
1 parent e8455cf commit 740cb4b
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ public interface AcknowledgementSet {
* @param progressCheckInterval frequency of invocation of progress check callback
* @since 2.6
*/
public void addProgressCheck(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval);
public void addProgressCheck(final Consumer<ProgressCheck> progressCheckCallback, final Duration progressCheckInterval);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
Expand All @@ -29,7 +30,7 @@
public class DefaultAcknowledgementSet implements AcknowledgementSet {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class);
private final Consumer<Boolean> callback;
private Consumer<Double> progressCheckCallback;
private Consumer<ProgressCheck> progressCheckCallback;
private final Instant expiryTime;
private final ScheduledExecutorService scheduledExecutor;
// This lock protects all the non-final members
Expand Down Expand Up @@ -59,7 +60,7 @@ public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecuto
lock = new ReentrantLock(true);
}

public void addProgressCheck(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval) {
public void addProgressCheck(final Consumer<ProgressCheck> progressCheckCallback, final Duration progressCheckInterval) {
this.progressCheckCallback = progressCheckCallback;
this.progressCheckFuture = scheduledExecutor.scheduleAtFixedRate(this::checkProgress, 0L, progressCheckInterval.toMillis(), TimeUnit.MILLISECONDS);
}
Expand All @@ -69,7 +70,7 @@ public void checkProgress() {
int numberOfEventsPending = pendingAcknowledgments.size();
lock.unlock();
if (progressCheckCallback != null) {
progressCheckCallback.accept((double)numberOfEventsPending/totalEventsAdded.get());
progressCheckCallback.accept(new DefaultProgressCheck((double)numberOfEventsPending/totalEventsAdded.get()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void testWithProgressCheckCallbacks() {
lenient().when(event6.getEventHandle()).thenReturn(eventHandle6);

AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofMillis(10000));
acknowledgementSet2.addProgressCheck((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1));
acknowledgementSet2.addProgressCheck((progressCheck) -> {currentRatio = progressCheck.getRatio();}, Duration.ofSeconds(1));
acknowledgementSet2.add(event3);
acknowledgementSet2.add(event4);
acknowledgementSet2.add(event5);
Expand Down Expand Up @@ -186,7 +186,7 @@ void testWithProgressCheckCallbacks_AcksExpire() {
lenient().when(event6.getEventHandle()).thenReturn(eventHandle6);

AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, Duration.ofSeconds(10));
acknowledgementSet2.addProgressCheck((ratio) -> {currentRatio = ratio;}, Duration.ofSeconds(1));
acknowledgementSet2.addProgressCheck((progressCheck) -> {currentRatio = progressCheck.getRatio();}, Duration.ofSeconds(1));
acknowledgementSet2.add(event3);
acknowledgementSet2.add(event4);
acknowledgementSet2.add(event5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
}
);
defaultAcknowledgementSet.addProgressCheck(
(ratio) -> {
currentRatio = ratio;
(progressCheck) -> {
currentRatio = progressCheck.getRatio();
},
Duration.ofSeconds(1)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba
this.notify();
}
try {
Thread.sleep(4000);
Thread.sleep(2000);
} catch (Exception e){}

return null;
Expand Down Expand Up @@ -339,7 +339,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO
events.add(event);
}
try {
Thread.sleep(4000);
Thread.sleep(2000);
} catch (Exception e) {}
return null;
}).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SqsOptions {
@JsonProperty("extend_visibility_timeout")
private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT;

@JsonProperty("max_visibility_timeout_extesion")
@JsonProperty("max_visibility_timeout_extension")
@DurationMin(seconds = 30)
@DurationMax(seconds = 3600)
private Duration maxVisibilityTimeoutExtension = DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION;
Expand Down

0 comments on commit 740cb4b

Please sign in to comment.