Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Oct 25, 2024
1 parent 38a6890 commit 0a0a507
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
Expand All @@ -18,7 +18,7 @@
* This class represents a Jira client.
*/
@Named
public class JiraClient implements SaasClient {
public class JiraClient implements CrawlerClient {

private static final Logger log = LoggerFactory.getLogger(JiraClient.class);
private Instant lastPollTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -31,9 +30,9 @@ public class Crawler {
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");

private final SaasClient client;
private final CrawlerClient client;

public Crawler(SaasClient client) {
public Crawler(CrawlerClient client) {
this.client = client;
this.crawlingTime = pluginMetrics.timer("crawlingTime");
}
Expand All @@ -44,28 +43,26 @@ public Instant crawl(Instant lastPollTime,
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
long updatedPollTime = 0;
Instant updatedPollTime = Instant.ofEpochMilli(0);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) {
ItemInfo nextItem = itemInfoIterator.next();
if(nextItem==null) {
//we don't expect null items, but just in case, we'll skip them
log.info("Unexpected encounter of a null item.");
continue;
}
itemInfoList.add(nextItem);
Map<String, Object> metadata = nextItem.getMetadata();
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? (String)metadata.get(CREATED):"0");
long niUpdated = Long.parseLong(metadata.get(UPDATED)!=null? (String)metadata.get(UPDATED):"0");
updatedPollTime = Math.max(updatedPollTime, niCreated);
updatedPollTime = Math.max(updatedPollTime, niUpdated);
Instant lastModifiedTime = nextItem.getLastModifiedAt();
updatedPollTime = updatedPollTime.isAfter(lastModifiedTime) ? updatedPollTime : lastModifiedTime;
}
createPartition(itemInfoList, coordinator);
}while (itemInfoIterator.hasNext());
long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTime.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
return Instant.ofEpochMilli(updatedPollTime != 0 ? updatedPollTime : startTime);
return updatedPollTime;
}

public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, SaasSourceConfig sourceConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import java.util.Iterator;

/**
* Interface for saas client. This interface can be implemented by different saas clients.
* Interface for Crawler client. This interface can be implemented by different saas clients.
* For example, Jira, Salesforce, ServiceNow, etc.
*/
public interface SaasClient {
public interface CrawlerClient {


/**
* This will be the main API called by crawler. This method assumes that {@link
* SaasSourceConfig} is available as a member to {@link SaasClient}, as a result of
* SaasSourceConfig} is available as a member to {@link CrawlerClient}, as a result of
* which, other scanning properties will also be available to this method
*
* @return returns an {@link Iterator} of {@link ItemInfo}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;


import lombok.Getter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -44,20 +43,22 @@ public abstract class SaasSourcePlugin implements Source<Record<Event>>, UsesEnh

private EnhancedSourceCoordinator coordinator;

@Getter
private final SaasSourceConfig sourceConfig;

private Buffer<Record<Event>> buffer;
private final Crawler crawler;
private final String sourcePluginName;


public SaasSourcePlugin(final PluginMetrics pluginMetrics,
public SaasSourcePlugin(final String sourcePluginName,
final PluginMetrics pluginMetrics,
final SaasSourceConfig sourceConfig,
final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final Crawler crawler,
final SaasPluginExecutorServiceProvider executorServiceProvider) {
log.info("Create SaaS Source Connector");
log.debug("Creating {} Source Plugin", sourcePluginName);
this.sourcePluginName = sourcePluginName;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
Expand All @@ -71,7 +72,7 @@ public SaasSourcePlugin(final PluginMetrics pluginMetrics,
@Override
public void start(Buffer<Record<Event>> buffer) {
Objects.requireNonNull(coordinator);
log.info("Starting SaaS Source Plugin... ");
log.info("Starting {} Source Plugin", sourcePluginName);
this.buffer = buffer;

boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void run() {
try {
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
} catch (final Exception e) {
LOG.error("Failed to save Leader partition state. Retrying...");
LOG.error("Failed to save Leader partition state. This process will retry.");
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ public class WorkerScheduler implements Runnable {

private static final Logger log = LoggerFactory.getLogger(WorkerScheduler.class);
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;
private static final long SLEEP_DURATION_MILLIS = 10000;

private final EnhancedSourceCoordinator sourceCoordinator;

private final SaasSourceConfig sourceConfig;
private final Crawler crawler;
private final Buffer<Record<Event>> buffer;


public WorkerScheduler(Buffer<Record<Event>> buffer,
EnhancedSourceCoordinator sourceCoordinator,
SaasSourceConfig sourceConfig,
Expand All @@ -46,15 +47,16 @@ public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// Get the next available partition from the coordinator
Optional<EnhancedSourcePartition> partition = sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
Optional<EnhancedSourcePartition> partition =
sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
if (partition.isPresent()) {
// Process the partition (source extraction logic)
processPartition(partition.get(), buffer, sourceConfig);

} else {
log.debug("No partition available. Going to Sleep for a while ");
log.debug("No partition available. This thread will sleep for {}", SLEEP_DURATION_MILLIS);
try {
Thread.sleep(10000);
Thread.sleep(SLEEP_DURATION_MILLIS);
} catch (final InterruptedException e) {
log.info("InterruptedException occurred");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ public interface ItemInfo {
* @return A map of key attributes of this Item.
*/
Map<String, String> getKeyAttributes();

/**
* Service specific Item's last modified time
*
* @return Instant when this item was created
*/
Instant getLastModifiedAt();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ public class CrawlerTest {
private Buffer<Record<Event>> buffer;

@Mock
private SaasClient client;
private CrawlerClient client;

@Mock
private SaasWorkerProgressState state;

@Mock
private ItemInfo item;

private Crawler crawler;

@BeforeEach
Expand Down Expand Up @@ -83,7 +80,7 @@ void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessExcep
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(item);
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
Expand All @@ -97,7 +94,7 @@ void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAcce
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
for (int i = 0; i < maxItemsPerPage + 1; i++) {
itemInfoList.add(item);
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
Expand All @@ -112,7 +109,7 @@ void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessEx
int maxItemsPerPage = getMaxItemsPerPage();
itemInfoList.add(null);
for (int i = 0; i < maxItemsPerPage-1; i++) {
itemInfoList.add(item);
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public class SaasSourcePluginTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
SourcePartitionStoreItem mockItem;

@Mock
EnhancedSourcePartition mockPartition;

@Mock
private EnhancedSourceCoordinator sourceCoordinator;
Expand All @@ -75,7 +70,7 @@ public testSaasSourcePlugin(final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final Crawler crawler,
final SaasPluginExecutorServiceProvider executorServiceProvider) {
super(pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
}
}

Expand All @@ -102,10 +97,6 @@ void areAcknowledgementsEnabledTest() {
assertFalse(saasSourcePlugin.areAcknowledgementsEnabled());
}

@Test
void saasSourceConfigGetterTest() {
assertNotNull(saasSourcePlugin.getSourceConfig());
}

@Test
void startTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

import java.time.Instant;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;


public class LeaderProgressStateTest {

Expand All @@ -15,15 +20,15 @@ public class LeaderProgressStateTest {
void testDefaultValues() throws JsonProcessingException {
String state = "{}";
LeaderProgressState leaderProgressState = objectMapper.readValue(state, LeaderProgressState.class);
assert leaderProgressState.getLastPollTime() == null;
assert !leaderProgressState.isInitialized();
assertNull(leaderProgressState.getLastPollTime());
assertFalse(leaderProgressState.isInitialized());
}

@Test
void testInitializedValues() throws JsonProcessingException {
String state = "{\"last_poll_time\":1729391235717, \"initialized\": true}";
LeaderProgressState leaderProgressState = objectMapper.readValue(state, LeaderProgressState.class);
assert Instant.ofEpochMilli(1729391235717L).equals(leaderProgressState.getLastPollTime());
assert leaderProgressState.isInitialized();
assertEquals(Instant.ofEpochMilli(1729391235717L), leaderProgressState.getLastPollTime());
assertTrue(leaderProgressState.isInitialized());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,36 @@
import java.util.Map;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ItemInfoTest {

@Test
void testItemInfoSimpleConstructor() {
String itemId = UUID.randomUUID().toString();
TestItemInfo itemInfo = new TestItemInfo(itemId);
assert itemInfo.getItemId().equals(itemId);
assert itemInfo.getMetadata() == null;
assert itemInfo.getPartitionKey().equals("partitionKey");
assert itemInfo.getId().equals("id");
assert itemInfo.getKeyAttributes().isEmpty();
assertEquals(itemId, itemInfo.getItemId());
assertNull(itemInfo.getMetadata());
assertEquals("partitionKey", itemInfo.getPartitionKey());
assertEquals("id", itemInfo.getId());
assertTrue(itemInfo.getKeyAttributes().isEmpty());
}

@Test
void testItemInfo() {
String itemId = UUID.randomUUID().toString();
TestItemInfo itemInfo = new TestItemInfo(itemId, Map.of("k1", "v1"), Instant.ofEpochMilli(1L));

assert itemInfo.getItemId().equals(itemId);
assert !itemInfo.getMetadata().isEmpty();
assert itemInfo.getMetadata().get("k1").equals("v1");
assert Instant.ofEpochMilli(1L).equals(itemInfo.getEventTime());
assert itemInfo.getPartitionKey().equals("partitionKey");
assert itemInfo.getId().equals("id");
assert itemInfo.getKeyAttributes().isEmpty();
assertEquals(itemId, itemInfo.getItemId());
assertFalse(itemInfo.getMetadata().isEmpty());
assertEquals("v1", itemInfo.getMetadata().get("k1"));
assertEquals(Instant.ofEpochMilli(1L), itemInfo.getEventTime());
assertEquals("partitionKey", itemInfo.getPartitionKey());
assertEquals("id", itemInfo.getId());
assertTrue(itemInfo.getKeyAttributes().isEmpty());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public String getId() {
public Map<String, String> getKeyAttributes() {
return Map.of();
}

@Override
public Instant getLastModifiedAt() {
return Instant.ofEpochMilli(10);
}
}

0 comments on commit 0a0a507

Please sign in to comment.