From 1f51ebf1530aa3b620fbc41196111a32de35fb52 Mon Sep 17 00:00:00 2001 From: Chris Slater <6004777+electricsam@users.noreply.github.com> Date: Tue, 24 Sep 2024 12:29:38 -0600 Subject: [PATCH] CAMEL-21256: use most recent sha when startingSha=last, prevent CommitConsumer.poll() logic from executing until after CommitConsume.doStart() method completes. --- .../github/consumer/CommitConsumer.java | 144 +++++++++--------- .../consumer/CommitConsumerLastTest.java | 101 ++++++++++++ 2 files changed, 175 insertions(+), 70 deletions(-) create mode 100644 components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java index 0aa19ffe099d3..c03068c9ab29e 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java @@ -43,9 +43,10 @@ public class CommitConsumer extends AbstractGitHubConsumer { // keep a chunk of the last 100 hashes, so we can filter out duplicates private final Queue commitHashes = new LinkedBlockingQueue<>(CAPACITY); private volatile String lastSha; + private boolean started = false; public CommitConsumer(GitHubEndpoint endpoint, Processor processor, String branchName, - String startingSha) throws Exception { + String startingSha) throws Exception { super(endpoint, processor); this.branchName = branchName; this.startingSha = startingSha; @@ -74,98 +75,101 @@ protected void doInit() throws Exception { @Override protected void doStart() throws Exception { - super.doStart(); + synchronized (this) { + super.doStart(); - // ensure we start from clean - commitHashes.clear(); - lastSha = null; + // ensure we start from clean + commitHashes.clear(); + lastSha = null; - if (startingSha.equals("last")) { - LOG.info("Indexing current commits on: {}/{}@{}", getEndpoint().getRepoOwner(), getEndpoint().getRepoName(), + if (startingSha.equals("last")) { + LOG.info("Indexing current commits on: {}/{}@{}", getEndpoint().getRepoOwner(), getEndpoint().getRepoName(), branchName); - List commits = commitService.getCommits(getRepository(), branchName, null); - for (RepositoryCommit commit : commits) { - String sha = commit.getSha(); - if (!commitHashes.contains(sha)) { - // make room when adding new elements - while (commitHashes.size() > CAPACITY - 1) { - commitHashes.remove(); - } - commitHashes.add(sha); + List commits = commitService.getCommits(getRepository(), branchName, null); + if (!commits.isEmpty()) { + lastSha = commits.get(0).getSha(); } + LOG.info("Starting from last sha: {}", lastSha); + } else if (!startingSha.equals("beginning")) { + lastSha = startingSha; + LOG.info("Starting from sha: {}", lastSha); + } else { + LOG.info("Starting from beginning"); } - if (!commitHashes.isEmpty()) { - lastSha = commitHashes.peek(); - } - LOG.info("Starting from last sha: {}", lastSha); - } else if (!startingSha.equals("beginning")) { - lastSha = startingSha; - LOG.info("Starting from sha: {}", lastSha); - } else { - LOG.info("Starting from beginning"); + started = true; } } @Override protected void doStop() throws Exception { - super.doStop(); + synchronized (this) { + super.doStop(); - commitHashes.clear(); - lastSha = null; + commitHashes.clear(); + lastSha = null; + started = false; + } } @Override protected int poll() throws Exception { - List commits = commitService.getCommits(getRepository(), branchName, null); - - // clip the list after the last sha - if (lastSha != null) { - int pos = -1; - for (int i = 0; i < commits.size(); i++) { - RepositoryCommit commit = commits.get(i); - if (lastSha.equals(commit.getSha())) { - pos = i; - break; - } - } - if (pos != -1) { - commits = commits.subList(0, pos); + synchronized (this) { + + if (!started) { + return 0; } - } - // In the end, we want tags oldest to newest. - ArrayDeque newCommits = new ArrayDeque<>(); - for (RepositoryCommit commit : commits) { - String sha = commit.getSha(); - if (!commitHashes.contains(sha)) { - newCommits.push(commit); - // make room when adding new elements - while (commitHashes.size() > CAPACITY - 1) { - commitHashes.remove(); + List commits = commitService.getCommits(getRepository(), branchName, null); + + // clip the list after the last sha + if (lastSha != null) { + int pos = -1; + for (int i = 0; i < commits.size(); i++) { + RepositoryCommit commit = commits.get(i); + if (lastSha.equals(commit.getSha())) { + pos = i; + break; + } + } + if (pos != -1) { + commits = commits.subList(0, pos); } - commitHashes.add(sha); } - } - int counter = 0; - while (!newCommits.isEmpty()) { - RepositoryCommit newCommit = newCommits.pop(); - lastSha = newCommit.getSha(); - Exchange e = createExchange(true); - if (newCommit.getAuthor() != null) { - e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, newCommit.getAuthor().getName()); + // In the end, we want tags oldest to newest. + ArrayDeque newCommits = new ArrayDeque<>(); + for (RepositoryCommit commit : commits) { + String sha = commit.getSha(); + if (!commitHashes.contains(sha)) { + newCommits.push(commit); + // make room when adding new elements + while (commitHashes.size() > CAPACITY - 1) { + commitHashes.remove(); + } + commitHashes.add(sha); + } } - if (newCommit.getCommitter() != null) { - e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, newCommit.getCommitter().getName()); + + int counter = 0; + while (!newCommits.isEmpty()) { + RepositoryCommit newCommit = newCommits.pop(); + lastSha = newCommit.getSha(); + Exchange e = createExchange(true); + if (newCommit.getAuthor() != null) { + e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, newCommit.getAuthor().getName()); + } + if (newCommit.getCommitter() != null) { + e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, newCommit.getCommitter().getName()); + } + e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, newCommit.getSha()); + e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, newCommit.getUrl()); + e.getMessage().setBody(newCommit.getCommit().getMessage()); + getProcessor().process(e); + counter++; } - e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, newCommit.getSha()); - e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, newCommit.getUrl()); - e.getMessage().setBody(newCommit.getCommit().getMessage()); - getProcessor().process(e); - counter++; + LOG.debug("Last sha: {}", lastSha); + return counter; } - LOG.debug("Last sha: {}", lastSha); - return counter; } } diff --git a/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java b/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java new file mode 100644 index 0000000000000..d7d5320c5aae7 --- /dev/null +++ b/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.github.consumer; + + +import org.apache.camel.BindToRegistry; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.github.GitHubComponentTestBase; +import org.apache.camel.component.github.GitHubConstants; +import org.apache.camel.support.DefaultScheduledPollConsumerScheduler; +import org.junit.jupiter.api.Test; + +public class CommitConsumerLastTest extends GitHubComponentTestBase { + + @BindToRegistry("myScheduler") + private final MyScheduler scheduler = createScheduler(); + + private MyScheduler createScheduler() { + MyScheduler scheduler = new MyScheduler(); + scheduler.setDelay(100); + scheduler.setInitialDelay(0); + return scheduler; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from("github://commit/master?startingSha=last&repoOwner=anotherguy&repoName=somerepo&scheduler=#myScheduler") + .routeId("foo").noAutoStartup() + .process(new GitHubCommitProcessor()) + .to(mockResultEndpoint); + } + }; + } + + @Test + public void commitConsumerLongHistoryLastShaTest() throws Exception { + for (int i = 0; i < 2000; i++) { + commitService.addRepositoryCommit("existing commit " + i); + } + + mockResultEndpoint.setAssertPeriod(500); + mockResultEndpoint.expectedBodiesReceived("new commit 1", "new commit 2"); + + context.getRouteController().startAllRoutes(); + + commitService.addRepositoryCommit("new commit 1"); + commitService.addRepositoryCommit("new commit 2"); + + mockResultEndpoint.assertIsSatisfied(); + } + + public class GitHubCommitProcessor implements Processor { + @Override + public void process(Exchange exchange) { + String author = exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, String.class); + String sha = exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_SHA, String.class); + if (log.isDebugEnabled()) { + System.out.println(sha); + log.debug("Got commit with author: {}: SHA {}", author, sha); + } + } + } + + private static final class MyScheduler extends DefaultScheduledPollConsumerScheduler { + + @Override + public void startScheduler() { + super.startScheduler(); + try { + /* + adding a delay to the CommitConsumer.doStart() method to force the CommitConsumer.poll() + method to be called before the CommitConsumer.doStart() finishes which could leave the + lastSha variable null + */ + Thread.sleep(200); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +}