Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jira source plugin #5125

Merged
merged 100 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
57b8ae4
removed the name saas in the file names and package names
san81 Oct 28, 2024
9a7e8fb
moved these two classes
san81 Oct 28, 2024
1e4abc2
adding jira source code
san81 Oct 28, 2024
ad57856
master merge
san81 Oct 28, 2024
f512f68
test classes
san81 Oct 28, 2024
4677ddc
jira client test
Galactus22625 Oct 28, 2024
8c8a269
current jira tests
Galactus22625 Oct 28, 2024
ad18c94
map name changes
Galactus22625 Oct 28, 2024
6539bb9
change package name
Galactus22625 Oct 28, 2024
d18b212
some cleanup
Galactus22625 Oct 28, 2024
9da83ae
address comments
Galactus22625 Oct 28, 2024
14a9328
address more comments
Galactus22625 Oct 29, 2024
deb8198
test cleanup
Galactus22625 Oct 29, 2024
6199581
remove import
Galactus22625 Oct 29, 2024
2b41090
more comments
Galactus22625 Oct 29, 2024
053a7d3
source config test specific
Galactus22625 Oct 29, 2024
702ad64
Jira Iterator Tests
Galactus22625 Oct 29, 2024
7fff2a2
Jira Config Helper Test and JIraServiceTest bug fix
Galactus22625 Oct 29, 2024
2679c8e
Merge branch 'opensearch-project:main' into jira-source
san81 Oct 29, 2024
13e3fc6
JiraItemInfoTests
Galactus22625 Oct 29, 2024
1542658
clean up jira service tests
Galactus22625 Oct 29, 2024
25ca17b
jira package tests
Galactus22625 Oct 29, 2024
00fb4ee
cleanup
Galactus22625 Oct 29, 2024
ea456e4
removed unirest references and Gson references
san81 Oct 30, 2024
c29d656
new metric to track the search results found
san81 Oct 30, 2024
8fa8cee
removed the need for spring retryable. Handling the retry part with i…
san81 Oct 30, 2024
0ad89b2
removing spring retryable dependency
san81 Oct 30, 2024
f1c265f
retry attempt sleep added
san81 Oct 30, 2024
844d20c
removed unused model object
san81 Oct 30, 2024
6ad0ac1
Jira source config object validation annotations added. Need to write…
san81 Oct 30, 2024
86778af
moved config helper to util package
san81 Oct 30, 2024
391fe37
doc strings
san81 Oct 30, 2024
cf805ad
removiong jsonTypeBean tests and refereneces
Galactus22625 Oct 30, 2024
a067769
Merge branch 'jira-source' into jira-tests
Galactus22625 Oct 30, 2024
95a1ffd
fix changes
Galactus22625 Oct 30, 2024
df6eeca
JiraService and Jira Source Tests Incomplete
Galactus22625 Oct 30, 2024
205ffc1
fix merge issues
Galactus22625 Oct 30, 2024
2cee7bc
Merge pull request #9 from Galactus22625/jira-tests
san81 Oct 30, 2024
dd90582
Merge branch 'opensearch-project:main' into jira-source
san81 Oct 30, 2024
b323f0a
fixed some of the failing tests
san81 Oct 30, 2024
17202dc
a couple of log lines added
san81 Oct 30, 2024
e848c3a
code formatting
san81 Oct 30, 2024
6cd8279
public to private
san81 Oct 30, 2024
249c0c7
removed unwanted headers
san81 Oct 30, 2024
9db74e2
missing import
san81 Oct 30, 2024
e644fe3
expire time attribute added to OAuth handling
san81 Oct 31, 2024
2bf5e0e
Fixed issue_type status and projects filters and added associated tes…
Galactus22625 Oct 31, 2024
782ea38
Merge branch 'jira-source' into Yaml-Filters
Galactus22625 Oct 31, 2024
97b55c7
additional test cases
san81 Oct 31, 2024
d4e0cdb
Merge pull request #10 from Galactus22625/Yaml-Filters
san81 Oct 31, 2024
f6dd991
Merge branch 'opensearch-project:main' into jira-source
san81 Oct 31, 2024
3a615e6
addition tests
san81 Oct 31, 2024
e3ac8bc
additional test coverage
san81 Oct 31, 2024
e18a2c1
cleaned up JiraOauthConfig file
san81 Oct 31, 2024
cf2684f
Merge pull request #2 from san81/jira-source
Galactus22625 Oct 31, 2024
a0c854b
Merge branch 'opensearch-project:main' into jira-source
san81 Oct 31, 2024
ab615d5
addressing review comments and simplifying the exception handling
san81 Oct 31, 2024
1dd613f
Merge remote-tracking branch 'origin/jira-source' into jira-source
san81 Oct 31, 2024
cc7ddec
Merge pull request #3 from san81/jira-source
Galactus22625 Oct 31, 2024
658bbf5
partial
Galactus22625 Oct 31, 2024
28aaa06
fix merge issues
Galactus22625 Oct 31, 2024
9d698e8
moved the wait block out of the catch block
san81 Oct 31, 2024
a8d8dcc
update
Galactus22625 Oct 31, 2024
1553022
Merge branch 'Yaml-Filters' into jira-source
Galactus22625 Oct 31, 2024
06d64ef
Renewal logic adjusted
san81 Nov 1, 2024
6e2cc23
Merge remote-tracking branch 'origin/jira-source' into jira-source
san81 Nov 1, 2024
58dd7e7
Merge branch 'opensearch-project:main' into jira-source
san81 Nov 1, 2024
d45dfc5
fixes related to source config properties change
san81 Nov 1, 2024
4f5bf1f
removed future handling for loop based operations
san81 Nov 1, 2024
d33c493
additional test cases
san81 Nov 1, 2024
e3843d5
addressing review comments
san81 Nov 1, 2024
8386377
Jira Service Test coverage
Galactus22625 Nov 1, 2024
a719188
jirasourceconfigTest comments
Galactus22625 Nov 1, 2024
e760d3a
Merge pull request #11 from Galactus22625/jira-source
san81 Nov 1, 2024
4270988
JiraSourceTests
Galactus22625 Nov 1, 2024
6d4e3cf
JiraItemInfo coverage
Galactus22625 Nov 1, 2024
1600e9c
jira service branch coverage
Galactus22625 Nov 1, 2024
3e5d83a
branch coverage jira service
Galactus22625 Nov 1, 2024
d9c555d
move add Items to queue logic into JiraItemInfo
Galactus22625 Nov 1, 2024
db0d26e
introduced RestClient and moved rest template interactions to there. …
san81 Nov 1, 2024
d3c3a33
backingoff for any kind of exception.
san81 Nov 1, 2024
1590886
restructured constants file
san81 Nov 1, 2024
bea45c8
fixing regex and adding date time formatter
Galactus22625 Nov 1, 2024
6e70427
Merge branch 'jira-source' into jira-source
Galactus22625 Nov 1, 2024
13f56d2
Revert "Jira source"
Galactus22625 Nov 1, 2024
a8c3218
Merge pull request #13 from san81/revert-12-jira-source
Galactus22625 Nov 1, 2024
3c15223
re add changes and fix issues
Galactus22625 Nov 1, 2024
51c24c1
unneeded comment
Galactus22625 Nov 1, 2024
80f6e19
Merge pull request #16 from Galactus22625/fix-revert-merge-issue
san81 Nov 1, 2024
7f1dfae
using issue bean methods to simplify the logic
san81 Nov 1, 2024
9a7b3ad
issuebean branches test coverage
Galactus22625 Nov 1, 2024
21a7c3a
one more additional test
san81 Nov 2, 2024
9336a1b
added issue bean tests and fixed return bug in issue bean
Galactus22625 Nov 2, 2024
f564a98
get updated test
Galactus22625 Nov 2, 2024
545bdf1
diffferent numbers for created and updated
Galactus22625 Nov 2, 2024
58675e7
remove sysouts
san81 Nov 3, 2024
8237a45
Merge pull request #17 from Galactus22625/jira-source-m
san81 Nov 3, 2024
1b59110
improved test coverage
san81 Nov 3, 2024
816f5b8
simplified crawler logic
san81 Nov 3, 2024
b5945a2
making it to 100% coverage
san81 Nov 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@

# Metrics

### Counter

- `issuesRequested`: measures total number of issue Requests sent.

### Timer

- `requestProcessDuration`: measures latency of requests processed by the jira source plugin.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4'

implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
package org.opensearch.dataprepper.plugins.source.jira;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT;

/**
* This class represents a Jira client.
Expand All @@ -21,25 +39,91 @@
public class JiraClient implements CrawlerClient {

private static final Logger log = LoggerFactory.getLogger(JiraClient.class);
private final JiraService service;
private final JiraIterator jiraIterator;
private final ExecutorService executorService;
private final CrawlerSourceConfig configuration;
private final int bufferWriteTimeoutInSeconds = 10;
private ObjectMapper objectMapper = new ObjectMapper();
private Instant lastPollTime;

public JiraClient() {
public JiraClient(JiraService service,
JiraIterator jiraIterator,
PluginExecutorServiceProvider executorServiceProvider,
JiraSourceConfig sourceConfig) {
this.service = service;
this.jiraIterator = jiraIterator;
this.executorService = executorServiceProvider.get();
this.configuration = sourceConfig;
}


@Override
public Iterator<ItemInfo> listItems() {
return null;
jiraIterator.initialize(lastPollTime);
return jiraIterator;
}

@Override
public void setLastPollTime(Instant lastPollTime) {
log.info("Setting the lastPollTime: {}", lastPollTime);
log.trace("Setting the lastPollTime: {}", lastPollTime);
this.lastPollTime = lastPollTime;
}

@VisibleForTesting
void injectObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig configuration) {
log.info("Logic for executing the partitions");
public void executePartition(SaasWorkerProgressState state,
Buffer<Record<Event>> buffer,
CrawlerSourceConfig configuration) {
log.trace("Executing the partition: {} with {} ticket(s)",
state.getKeyAttributes(), state.getItemIds().size());
List<String> itemIds = state.getItemIds();
Map<String, Object> keyAttributes = state.getKeyAttributes();
String project = (String) keyAttributes.get(PROJECT);
Instant eventTime = state.getExportStartTime();
List<ItemInfo> itemInfos = new ArrayList<>();
for (String itemId : itemIds) {
if (itemId == null) {
continue;
}
ItemInfo itemInfo = JiraItemInfo.builder()
.withItemId(itemId)
.withId(itemId)
.withProject(project)
.withEventTime(eventTime)
.withMetadata(keyAttributes).build();
itemInfos.add(itemInfo);
}

String eventType = EventType.DOCUMENT.toString();
List<Record<Event>> recordsToWrite = itemInfos
.parallelStream()
.map(t -> (Supplier<String>) (() -> service.getIssue(t.getId())))
.map(supplier -> supplyAsync(supplier, this.executorService))
.map(CompletableFuture::join)
.map(ticketJson -> {
try {
return objectMapper.readValue(ticketJson, new TypeReference<>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.map(t -> (Event) JacksonEvent.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are moving toward EventFactory. Please fix this in a follow-on PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok 👍

.withEventType(eventType)
.withData(t)
.build())
.map(event -> new Record<>(event))
.collect(Collectors.toList());

try {
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
} catch (Exception e) {
throw new RuntimeException(e);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.opensearch.dataprepper.plugins.source.jira;

import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.jira.models.IssueBean;
import org.opensearch.dataprepper.plugins.source.jira.utils.Constants;
import org.opensearch.dataprepper.plugins.source.jira.utils.JiraContentType;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.opensearch.dataprepper.plugins.source.jira.JiraService.CONTENT_TYPE;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.CREATED;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.ISSUE_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_NAME;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.UPDATED;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants._ISSUE;

@Setter
@Getter
public class JiraItemInfo implements ItemInfo {
private String project;
private String issueType;
private String id;
private String itemId;
private Map<String, Object> metadata;
private Instant eventTime;

public JiraItemInfo(String id,
String itemId,
String project,
String issueType,
Map<String, Object> metadata,
Instant eventTime
) {
this.id = id;
this.project = project;
this.issueType = issueType;
this.itemId = itemId;
this.metadata = metadata;
this.eventTime = eventTime;
}

public static JiraItemInfoBuilder builder() {
return new JiraItemInfoBuilder();
}

@Override
public String getPartitionKey() {
return project + "|" + issueType + "|" + UUID.randomUUID();
}

@Override
public String getId() {
return id;
}

@Override
public Map<String, Object> getKeyAttributes() {
return Map.of(Constants.PROJECT, project);
}

@Override
public Instant getLastModifiedAt() {
long updatedAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.UPDATED, "0"));
long createdAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.CREATED, "0"));
return createdAtMillis > updatedAtMillis ?
Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis);
}

public static class JiraItemInfoBuilder {
private Map<String, Object> metadata;
private Instant eventTime;
private String id;
private String itemId;
private String project;
private String issueType;

public JiraItemInfoBuilder() {
}

public JiraItemInfo build() {
return new JiraItemInfo(id, itemId, project, issueType, metadata, eventTime);
}

public JiraItemInfoBuilder withMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
return this;
}

public JiraItemInfoBuilder withEventTime(Instant eventTime) {
this.eventTime = eventTime;
return this;
}

public JiraItemInfoBuilder withItemId(String itemId) {
this.itemId = itemId;
return this;
}

public JiraItemInfoBuilder withId(String id) {
this.id = id;
return this;
}

public JiraItemInfoBuilder withProject(String project) {
this.project = project;
return this;
}

public JiraItemInfoBuilder withIssueType(String issueType) {
this.issueType = issueType;
return this;
}

public JiraItemInfoBuilder withIssueBean(IssueBean issue) {
Map<String, Object> issueMetadata = new HashMap<>();
issueMetadata.put(PROJECT_KEY, issue.getProject());
issueMetadata.put(PROJECT_NAME, issue.getProjectName());
issueMetadata.put(CREATED, issue.getCreatedTimeMillis());
issueMetadata.put(UPDATED, issue.getUpdatedTimeMillis());
issueMetadata.put(ISSUE_KEY, issue.getKey());
issueMetadata.put(CONTENT_TYPE, JiraContentType.ISSUE.getType());

this.project = issue.getProject();
this.id = issue.getKey();
this.issueType = JiraContentType.ISSUE.getType();
this.itemId = _ISSUE + issueMetadata.get(PROJECT_KEY) + "-" + issue.getKey();
this.metadata = issueMetadata;
return this;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.opensearch.dataprepper.plugins.source.jira;


import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@Named
public class JiraIterator implements Iterator<ItemInfo> {

private static final int HAS_NEXT_TIMEOUT = 60;
private static final Logger log = LoggerFactory.getLogger(JiraIterator.class);
private final JiraSourceConfig sourceConfig;
private final JiraService service;
private final ExecutorService crawlerTaskExecutor;
private final List<Future<Boolean>> futureList = new ArrayList<>();
private Queue<ItemInfo> itemInfoQueue;
private Instant lastPollTime;
private boolean firstTime = true;

public JiraIterator(final JiraService service,
PluginExecutorServiceProvider executorServiceProvider,
JiraSourceConfig sourceConfig) {
this.service = service;
this.crawlerTaskExecutor = executorServiceProvider.get();
this.sourceConfig = sourceConfig;
}

@Override
public boolean hasNext() {
if (firstTime) {
log.info("Crawling has been started");
startCrawlerThreads();
firstTime = false;
}
int timeout = HAS_NEXT_TIMEOUT;
while (isCrawlerRunning()
&& itemInfoQueue.isEmpty()
&& (timeout != 0)) {
try {
log.trace("Waiting for crawling queue to be filled for next 2 seconds.");
Thread.sleep(2000);
timeout--;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this "timeout" or "number of retries"? Looks like number of retries to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not retries. It is waiting for the Crawler thread to complete

} catch (InterruptedException e) {
log.error("An exception has occurred while checking for next document in crawling queue.");
Thread.currentThread().interrupt();
}
}

return !this.itemInfoQueue.isEmpty();
}

private boolean isCrawlerRunning() {
boolean isRunning = false;
if (!futureList.isEmpty()) {
for (Future<Boolean> future : futureList) {
if (!future.isDone()) {
isRunning = true;
break;
}
}
}
return isRunning;
}


private void startCrawlerThreads() {
futureList.add(crawlerTaskExecutor.submit(
() -> service.getJiraEntities(sourceConfig, lastPollTime, itemInfoQueue), false));
}

@Override
public ItemInfo next() {
return this.itemInfoQueue.remove();
}

/**
* Initialize.
*
* @param jiraChangeLogToken the jira change log token
*/
public void initialize(Instant jiraChangeLogToken) {
this.itemInfoQueue = new ConcurrentLinkedQueue<>();
this.lastPollTime = jiraChangeLogToken;
this.firstTime = true;
}

}
Loading
Loading