Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sbose2k21 committed Oct 4, 2024
1 parent be26d07 commit e1fe2ba
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void start(final Buffer<T> buffer) {

if (sourceConfig.hasHealthCheckService()) {
logger.info("{} source health check is enabled", sourceName);
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
sb.service(getHttpHealthCheckPath(), HealthCheckService.builder().longPolling(0).build());
}

server = sb.build();
Expand Down Expand Up @@ -191,4 +191,7 @@ public void stop() {

public abstract BaseHttpService getHttpService(int bufferTimeoutInMillis, Buffer<T> buffer, PluginMetrics pluginMetrics);

public String getHttpHealthCheckPath() {
return HTTP_HEALTH_CHECK_PATH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ public class MultiLineJsonCodecTest {
private final HttpData serializedRequestBadEmptyNewLines = HttpData.ofUtf8("\n\n\n\n\n\n\n \n");
private final HttpData serializedRequestBadInvalidJson = HttpData.ofUtf8("{\"text\":");

private final MultiLineJsonCodec multiLineJsonCodec = new MultiLineJsonCodec();
private MultiLineJsonCodec createObjectUnderTest() {
return new MultiLineJsonCodec();
}

@Test
public void testParseSuccess() throws IOException {
// When
MultiLineJsonCodec multiLineJsonCodec = createObjectUnderTest();
List<Map<String, Object>> res = multiLineJsonCodec.parse(serializedRequest);

// Then
Expand All @@ -55,9 +58,9 @@ public void testParseSuccess() throws IOException {
@Test
public void testParseSuccess2() throws IOException {
// When
MultiLineJsonCodec multiLineJsonCodec = createObjectUnderTest();
List<Map<String, Object>> res = multiLineJsonCodec.parse(serializedRequestMultipleRows);


// Then
assertEquals(10, res.size());

Expand All @@ -75,6 +78,7 @@ public void testParseSuccess2() throws IOException {

@Test
public void testParseFailure() {
MultiLineJsonCodec multiLineJsonCodec = createObjectUnderTest();
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBad));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmpty));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmptyNewLines));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
@DataPrepperPlugin(name = "opensearch_api", pluginType = Source.class, pluginConfigurationType = OpenSearchAPISourceConfig.class)
public class OpenSearchAPISource extends BaseHttpSource<Record<Event>> {
private static final String SOURCE_NAME = "OpenSearch API";
private static final String HTTP_HEALTH_CHECK_PATH = "/";

@DataPrepperPluginConstructor
public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
Expand All @@ -32,4 +33,9 @@ public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final P
public BaseHttpService getHttpService(final int bufferWriteTimeoutInMillis, final Buffer<Record<Event>> buffer, final PluginMetrics pluginMetrics) {
return new OpenSearchAPIService(bufferWriteTimeoutInMillis, buffer, pluginMetrics);
}

@Override
public String getHttpHealthCheckPath() {
return HTTP_HEALTH_CHECK_PATH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public class OpenSearchAPISourceConfig extends BaseHttpServerConfig {

static final String DEFAULT_ENDPOINT_URI = "/";
static final int DEFAULT_PORT = 9202;
static final int DEFAULT_PORT = 9200;

@Override
public int getDefaultPort() {
Expand Down
Loading

0 comments on commit e1fe2ba

Please sign in to comment.