-
Notifications
You must be signed in to change notification settings - Fork 202
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix for kafka source issue #3247 (offset commit stops on deserializat…
…ion error) (#3260) (#3262) Signed-off-by: Hardeep Singh <[email protected]> (cherry picked from commit 8114ab4) Co-authored-by: Hardeep Singh <[email protected]>
- Loading branch information
1 parent
af859c2
commit a4d3636
Showing
4 changed files
with
215 additions
and
32 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
...a-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/LogRateLimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package org.opensearch.dataprepper.plugins.kafka.util; | ||
|
||
// Poor-man implementation of rate-limiter for logging error messages. | ||
// Todo: Use token-bucket as a generic rate-limiter. | ||
public class LogRateLimiter { | ||
public static int MILLIS_PER_SECOND = 1000; | ||
public static int MAX_LOGS_PER_SECOND = 1000; | ||
private int tokens; | ||
private long lastMs; | ||
private long replenishInterval; | ||
|
||
public LogRateLimiter(final int ratePerSecond, final long currentMs) { | ||
if (ratePerSecond < 0 || ratePerSecond > MAX_LOGS_PER_SECOND) { | ||
throw new IllegalArgumentException( | ||
String.format("Invalid arguments. ratePerSecond should be >0 and less than %s", MAX_LOGS_PER_SECOND) | ||
); | ||
} | ||
replenishInterval = MILLIS_PER_SECOND / ratePerSecond; | ||
lastMs = currentMs; | ||
tokens = 1; | ||
} | ||
|
||
public boolean isAllowed(final long currentMs) { | ||
if ((currentMs- lastMs) >= replenishInterval) { | ||
tokens = 1; | ||
lastMs = currentMs; | ||
} | ||
|
||
if (tokens == 0) { | ||
return false; | ||
} | ||
|
||
tokens--; | ||
return true; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
...ugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/LogRateLimiterTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package org.opensearch.dataprepper.plugins.kafka.util; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
|
||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
public class LogRateLimiterTest { | ||
|
||
@Test | ||
public void testRateLimiter() { | ||
long currentMs = System.currentTimeMillis(); | ||
LogRateLimiter objUnderTest = new LogRateLimiter(10, currentMs); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 50; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 50; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 876; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
|
||
currentMs = System.currentTimeMillis(); | ||
objUnderTest = new LogRateLimiter(2, currentMs); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 100; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 200; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 500; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
|
||
currentMs = System.nanoTime(); | ||
objUnderTest = new LogRateLimiter(1000, currentMs); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 1; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(false)); | ||
currentMs += 2; | ||
assertThat(objUnderTest.isAllowed(currentMs), equalTo(true)); | ||
} | ||
|
||
@Test | ||
public void testRateLimiterInvalidArgs() { | ||
assertThrows( | ||
IllegalArgumentException.class, | ||
() -> new LogRateLimiter(1345, System.currentTimeMillis()) | ||
); | ||
} | ||
} |