Skip to content

Commit

Permalink
CAMEL-21256: use most recent sha when startingSha=last, prevent Commi…
Browse files Browse the repository at this point in the history
…tConsumer.poll() logic from executing until after CommitConsume.doStart() method completes.
  • Loading branch information
electricsam committed Sep 24, 2024
1 parent 2f01892 commit 1f51ebf
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ public class CommitConsumer extends AbstractGitHubConsumer {
// keep a chunk of the last 100 hashes, so we can filter out duplicates
private final Queue<String> commitHashes = new LinkedBlockingQueue<>(CAPACITY);
private volatile String lastSha;
private boolean started = false;

public CommitConsumer(GitHubEndpoint endpoint, Processor processor, String branchName,
String startingSha) throws Exception {
String startingSha) throws Exception {
super(endpoint, processor);
this.branchName = branchName;
this.startingSha = startingSha;
Expand Down Expand Up @@ -74,98 +75,101 @@ protected void doInit() throws Exception {

@Override
protected void doStart() throws Exception {
super.doStart();
synchronized (this) {
super.doStart();

// ensure we start from clean
commitHashes.clear();
lastSha = null;
// ensure we start from clean
commitHashes.clear();
lastSha = null;

if (startingSha.equals("last")) {
LOG.info("Indexing current commits on: {}/{}@{}", getEndpoint().getRepoOwner(), getEndpoint().getRepoName(),
if (startingSha.equals("last")) {
LOG.info("Indexing current commits on: {}/{}@{}", getEndpoint().getRepoOwner(), getEndpoint().getRepoName(),
branchName);
List<RepositoryCommit> commits = commitService.getCommits(getRepository(), branchName, null);
for (RepositoryCommit commit : commits) {
String sha = commit.getSha();
if (!commitHashes.contains(sha)) {
// make room when adding new elements
while (commitHashes.size() > CAPACITY - 1) {
commitHashes.remove();
}
commitHashes.add(sha);
List<RepositoryCommit> commits = commitService.getCommits(getRepository(), branchName, null);
if (!commits.isEmpty()) {
lastSha = commits.get(0).getSha();
}
LOG.info("Starting from last sha: {}", lastSha);
} else if (!startingSha.equals("beginning")) {
lastSha = startingSha;
LOG.info("Starting from sha: {}", lastSha);
} else {
LOG.info("Starting from beginning");
}
if (!commitHashes.isEmpty()) {
lastSha = commitHashes.peek();
}
LOG.info("Starting from last sha: {}", lastSha);
} else if (!startingSha.equals("beginning")) {
lastSha = startingSha;
LOG.info("Starting from sha: {}", lastSha);
} else {
LOG.info("Starting from beginning");
started = true;
}
}

@Override
protected void doStop() throws Exception {
super.doStop();
synchronized (this) {
super.doStop();

commitHashes.clear();
lastSha = null;
commitHashes.clear();
lastSha = null;
started = false;
}
}

@Override
protected int poll() throws Exception {
List<RepositoryCommit> commits = commitService.getCommits(getRepository(), branchName, null);

// clip the list after the last sha
if (lastSha != null) {
int pos = -1;
for (int i = 0; i < commits.size(); i++) {
RepositoryCommit commit = commits.get(i);
if (lastSha.equals(commit.getSha())) {
pos = i;
break;
}
}
if (pos != -1) {
commits = commits.subList(0, pos);
synchronized (this) {

if (!started) {
return 0;
}
}

// In the end, we want tags oldest to newest.
ArrayDeque<RepositoryCommit> newCommits = new ArrayDeque<>();
for (RepositoryCommit commit : commits) {
String sha = commit.getSha();
if (!commitHashes.contains(sha)) {
newCommits.push(commit);
// make room when adding new elements
while (commitHashes.size() > CAPACITY - 1) {
commitHashes.remove();
List<RepositoryCommit> commits = commitService.getCommits(getRepository(), branchName, null);

// clip the list after the last sha
if (lastSha != null) {
int pos = -1;
for (int i = 0; i < commits.size(); i++) {
RepositoryCommit commit = commits.get(i);
if (lastSha.equals(commit.getSha())) {
pos = i;
break;
}
}
if (pos != -1) {
commits = commits.subList(0, pos);
}
commitHashes.add(sha);
}
}

int counter = 0;
while (!newCommits.isEmpty()) {
RepositoryCommit newCommit = newCommits.pop();
lastSha = newCommit.getSha();
Exchange e = createExchange(true);
if (newCommit.getAuthor() != null) {
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, newCommit.getAuthor().getName());
// In the end, we want tags oldest to newest.
ArrayDeque<RepositoryCommit> newCommits = new ArrayDeque<>();
for (RepositoryCommit commit : commits) {
String sha = commit.getSha();
if (!commitHashes.contains(sha)) {
newCommits.push(commit);
// make room when adding new elements
while (commitHashes.size() > CAPACITY - 1) {
commitHashes.remove();
}
commitHashes.add(sha);
}
}
if (newCommit.getCommitter() != null) {
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, newCommit.getCommitter().getName());

int counter = 0;
while (!newCommits.isEmpty()) {
RepositoryCommit newCommit = newCommits.pop();
lastSha = newCommit.getSha();
Exchange e = createExchange(true);
if (newCommit.getAuthor() != null) {
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, newCommit.getAuthor().getName());
}
if (newCommit.getCommitter() != null) {
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, newCommit.getCommitter().getName());
}
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, newCommit.getSha());
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, newCommit.getUrl());
e.getMessage().setBody(newCommit.getCommit().getMessage());
getProcessor().process(e);
counter++;
}
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, newCommit.getSha());
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, newCommit.getUrl());
e.getMessage().setBody(newCommit.getCommit().getMessage());
getProcessor().process(e);
counter++;
LOG.debug("Last sha: {}", lastSha);
return counter;
}
LOG.debug("Last sha: {}", lastSha);
return counter;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.github.consumer;


import org.apache.camel.BindToRegistry;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.github.GitHubComponentTestBase;
import org.apache.camel.component.github.GitHubConstants;
import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
import org.junit.jupiter.api.Test;

public class CommitConsumerLastTest extends GitHubComponentTestBase {

@BindToRegistry("myScheduler")
private final MyScheduler scheduler = createScheduler();

private MyScheduler createScheduler() {
MyScheduler scheduler = new MyScheduler();
scheduler.setDelay(100);
scheduler.setInitialDelay(0);
return scheduler;
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {

@Override
public void configure() {
from("github://commit/master?startingSha=last&repoOwner=anotherguy&repoName=somerepo&scheduler=#myScheduler")
.routeId("foo").noAutoStartup()
.process(new GitHubCommitProcessor())
.to(mockResultEndpoint);
}
};
}

@Test
public void commitConsumerLongHistoryLastShaTest() throws Exception {
for (int i = 0; i < 2000; i++) {
commitService.addRepositoryCommit("existing commit " + i);
}

mockResultEndpoint.setAssertPeriod(500);
mockResultEndpoint.expectedBodiesReceived("new commit 1", "new commit 2");

context.getRouteController().startAllRoutes();

commitService.addRepositoryCommit("new commit 1");
commitService.addRepositoryCommit("new commit 2");

mockResultEndpoint.assertIsSatisfied();
}

public class GitHubCommitProcessor implements Processor {
@Override
public void process(Exchange exchange) {
String author = exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, String.class);
String sha = exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_SHA, String.class);
if (log.isDebugEnabled()) {
System.out.println(sha);
log.debug("Got commit with author: {}: SHA {}", author, sha);
}
}
}

private static final class MyScheduler extends DefaultScheduledPollConsumerScheduler {

@Override
public void startScheduler() {
super.startScheduler();
try {
/*
adding a delay to the CommitConsumer.doStart() method to force the CommitConsumer.poll()
method to be called before the CommitConsumer.doStart() finishes which could leave the
lastSha variable null
*/
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit 1f51ebf

Please sign in to comment.