Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Oct 25, 2024
1 parent 7b5cfcd commit ec98d4c
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'
implementation("org.springframework:spring-web:${libs.versions.spring.get()}")
implementation 'org.springframework.retry:spring-retry:1.3.4'

implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
plugins {
id 'java-library'
}

group = 'org.opensearch.dataprepper.plugins.source.source_crawler'

Expand Down Expand Up @@ -28,7 +25,3 @@ dependencies {
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@Named
public class SaasPluginExecutorServiceProvider {
private static final Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class);
public static final int DEFAULT_THREAD_COUNT = 50;
private static final int DEFAULT_THREAD_COUNT = 50;
private final ExecutorService executorService;

public SaasPluginExecutorServiceProvider() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,46 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.model;


import lombok.Getter;
import lombok.NonNull;

import java.time.Instant;
import java.util.Map;


@Getter
public abstract class ItemInfo {

public interface ItemInfo {

/**
* Use this field to store primary item of a repository. Primary item of a repository is something
* which can be fetched/queried/obtained from repository just using its item ID.
* which can be fetched/queried/obtained from source service just using its item ID.
*/
@NonNull
String itemId;
String getItemId();

/**
* Use this field to store items metadata. Item metadata can be any information other than item
* contents itself which can be used to apply regex filtering, change data capture etc. general
* assumption here is that fetching metadata should be faster than fetching entire Item
*/
Map<String, Object> metadata;
Map<String, Object> getMetadata();

/**
* Process your change log events serially (preferably in a single thread) and ensure that you are
* applying monotonously increasing timeStamp. If you don't do that, then SDK could miss latest
* updates as it processes events out of order and it relies on this member to decide which change
* log events to keep and which ones to discard.
*/
Instant eventTime;
Instant getEventTime();

public ItemInfo(String itemId) {
this.itemId = itemId;
}
String getPartitionKey();

public ItemInfo(@NonNull String itemId, Map<String, Object> metadata, Instant eventTime) {
this.itemId = itemId;
this.metadata = metadata;
this.eventTime = eventTime;
}
/**
* Service specific Unique Id of this Item.
* @return String value indicating unique id of this item.
*/
String getId();

public abstract String getPartitionKey();
public abstract String getId();
public abstract Map<String, String> getKeyAttributes();
/**
* Key attributes related to this Item.
*
* @return A map of key attributes of this Item.
*/
Map<String, String> getKeyAttributes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;

import java.lang.reflect.Field;
import java.time.Instant;
Expand Down Expand Up @@ -157,27 +158,6 @@ private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessExcep
return (int) maxItemsPerPageField.get(null);
}

private static class TestItemInfo extends ItemInfo {
public TestItemInfo(String itemId, Map<String, Object> metadata, Instant eventTime) {
super(itemId, metadata, eventTime);

}
@Override
public String getPartitionKey() {
return getItemId();
}

@Override
public String getId() {
return getItemId();
}

@Override
public Map<String, String> getKeyAttributes() {
return new HashMap<>();
}
}

private ItemInfo createTestItemInfo(String id, String created, String updated) {
Map<String, Object> metadata = new HashMap<>();
if (created != null) metadata.put(Crawler.CREATED, created);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.submit(leaderScheduler);

//Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute
Thread.sleep(70000);
Thread.sleep(100);
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(2)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void testDefaultValues() throws JsonProcessingException {
void testInitializedValues() throws JsonProcessingException {
String state = "{\"last_poll_time\":1729391235717, \"initialized\": true}";
LeaderProgressState leaderProgressState = objectMapper.readValue(state, LeaderProgressState.class);
assert leaderProgressState.getLastPollTime() == Instant.ofEpochMilli(1729391235717L);
assert Instant.ofEpochMilli(1729391235717L).equals(leaderProgressState.getLastPollTime());
assert leaderProgressState.isInitialized();
}
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,13 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.model;

import lombok.NonNull;
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.Map;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertThrows;

public class ItemInfoTest {

static class TestItemInfo extends ItemInfo {

public TestItemInfo(@NonNull String itemId, Map<String, Object> metadata, Instant eventTime) {
super(itemId, metadata, eventTime);
}

public TestItemInfo(String itemId) {
super(itemId);
}

@Override
public String getPartitionKey() {
return "partitionKey";
}

@Override
public String getId() {
return "id";
}

@Override
public Map<String, String> getKeyAttributes() {
return Map.of();
}
}

@Test
void testItemInfoSimpleConstructor() {
String itemId = UUID.randomUUID().toString();
Expand All @@ -48,16 +19,6 @@ void testItemInfoSimpleConstructor() {
assert itemInfo.getKeyAttributes().isEmpty();
}

@Test
void testItemInfoWithNullValues() {
assertThrows(NullPointerException.class, () -> new TestItemInfo(null, null, null));
assertThrows(NullPointerException.class, () -> new TestItemInfo(null, null, Instant.ofEpochMilli(1234L)),
"itemId should not be null");
assertThrows(NullPointerException.class, () -> new TestItemInfo("itemid", null, null),
"eventTime should not be null");

}

@Test
void testItemInfo() {
String itemId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.model;

import java.time.Instant;
import java.util.Map;

public class TestItemInfo implements ItemInfo {

String itemId;
Map<String, Object> metadata;
Instant eventTime;

public TestItemInfo(String itemId, Map<String, Object> metadata, Instant eventTime) {
this.itemId = itemId;
this.metadata = metadata;
this.eventTime = eventTime;
}

public TestItemInfo(String itemId) {
this.itemId = itemId;
}

@Override
public String getItemId() {
return itemId;
}

@Override
public Map<String, Object> getMetadata() {
return this.metadata;
}

@Override
public Instant getEventTime() {
return this.eventTime;
}

@Override
public String getPartitionKey() {
return "partitionKey";
}

@Override
public String getId() {
return "id";
}

@Override
public Map<String, String> getKeyAttributes() {
return Map.of();
}
}

0 comments on commit ec98d4c

Please sign in to comment.