From 5f97643f20b22706311e89576a31401c30a60c7b Mon Sep 17 00:00:00 2001 From: Ignas Mikalajunas Date: Mon, 12 Aug 2024 03:12:42 +0300 Subject: [PATCH] Added an integration test that validates my assumptions around the pack backed reftable implementation --- .../lt/pow/nukagit/db/dao/NukagitDfsDao.java | 14 +- .../nukagit/dfs/NukagitDfsObjDatabase.java | 5 +- .../resources/db/migration/V000__initial.sql | 2 +- .../integration/NukagitIntegrationTest.groovy | 120 ++++++++++++++++-- 4 files changed, 122 insertions(+), 19 deletions(-) diff --git a/src/main/java/lt/pow/nukagit/db/dao/NukagitDfsDao.java b/src/main/java/lt/pow/nukagit/db/dao/NukagitDfsDao.java index 8302146..2029040 100644 --- a/src/main/java/lt/pow/nukagit/db/dao/NukagitDfsDao.java +++ b/src/main/java/lt/pow/nukagit/db/dao/NukagitDfsDao.java @@ -5,10 +5,7 @@ import org.jdbi.v3.core.statement.UnableToExecuteStatementException; import org.jdbi.v3.sqlobject.customizer.Bind; import org.jdbi.v3.sqlobject.customizer.BindMethods; -import org.jdbi.v3.sqlobject.statement.BatchChunkSize; -import org.jdbi.v3.sqlobject.statement.SqlBatch; -import org.jdbi.v3.sqlobject.statement.SqlQuery; -import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.jdbi.v3.sqlobject.statement.*; import org.jdbi.v3.sqlobject.transaction.Transaction; import java.util.List; @@ -65,8 +62,13 @@ default UUID upsertRepositoryAndGetId(String name) { @SqlUpdate("UPDATE repositories SET push_id = :pushId WHERE id = :repositoryId") void setPush(@Bind("repositoryId") UUID repositoryId, @Bind("pushId") UUID pushId); + @SqlCall("SELECT * FROM repositories WHERE id = :repositoryId FOR UPDATE") + void lockRepository(@Bind("repositoryId") UUID repositoryId); + @Transaction default void commitPack(UUID repositoryId, List desc, List replace) throws NukagitDfsPackConflictException { + // For now use select for update to order the commits + lockRepository(repositoryId); // Get last push UUID lastPush = getLastPush(repositoryId); // Create a new push @@ -75,11 +77,11 @@ default void commitPack(UUID repositoryId, List desc, List replace) try { // Insert new packs insertPacks(pushId, desc); + // Copy over previous commit + copyPacks(lastPush, pushId); } catch (UnableToExecuteStatementException e) { throw new NukagitDfsPackConflictException(e); } - // Copy over previous commit - copyPacks(lastPush, pushId); // Remove replaced packs deletePacks(pushId, replace); // Set the new push as the last push diff --git a/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java b/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java index ca2f767..831f595 100644 --- a/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java +++ b/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java @@ -114,7 +114,8 @@ protected void commitPackImpl( try { dfsDao.commitPack(repositoryId, newPacks, removePacks); } catch (NukagitDfsPackConflictException e) { - LOGGER.warn("commitPackImpl: encountered conflict when committing packs", e); + // Conflicts in git happen + LOGGER.info("commitPackImpl: encountered conflict when committing packs", e); throw new IOException(e); } clearCache(); @@ -323,7 +324,7 @@ public int read(ByteBuffer dst) throws IOException { ((NukagitDfsRepositoryDescription) desc.getRepositoryDescription()).getRepositoryId(), blockNumber, desc.getFileName(ext)); - int bytesToRead = Math.min(blockSize - positionInBlock, dst.remaining()); + int bytesToRead = Math.min(Math.min(blockSize - positionInBlock, dst.remaining()), blockData.length); dst.put(blockData, positionInBlock, bytesToRead); totalBytesRead += bytesToRead; diff --git a/src/main/resources/db/migration/V000__initial.sql b/src/main/resources/db/migration/V000__initial.sql index f7b8089..3f8bcbf 100644 --- a/src/main/resources/db/migration/V000__initial.sql +++ b/src/main/resources/db/migration/V000__initial.sql @@ -45,4 +45,4 @@ ALTER TABLE packs GENERATED ALWAYS AS (IF(ext = 'ref', 1, NULL)) VIRTUAL; ALTER TABLE packs - ADD CONSTRAINT UNIQUE (max_update_index, ref_pack); \ No newline at end of file + ADD CONSTRAINT UNIQUE (push_id, max_update_index, ref_pack); \ No newline at end of file diff --git a/src/test/groovy/lt/pow/nukagit/integration/NukagitIntegrationTest.groovy b/src/test/groovy/lt/pow/nukagit/integration/NukagitIntegrationTest.groovy index 54c1301..844551b 100644 --- a/src/test/groovy/lt/pow/nukagit/integration/NukagitIntegrationTest.groovy +++ b/src/test/groovy/lt/pow/nukagit/integration/NukagitIntegrationTest.groovy @@ -20,17 +20,27 @@ import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter import org.eclipse.jgit.api.CloneCommand import org.eclipse.jgit.api.Git import org.eclipse.jgit.transport.CredentialsProvider +import org.eclipse.jgit.transport.RefSpec +import org.eclipse.jgit.transport.RemoteRefUpdate import org.eclipse.jgit.transport.SshSessionFactory +import org.eclipse.jgit.transport.URIish import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider +import org.jeasy.random.EasyRandom import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.MySQLContainer import org.testcontainers.spock.Testcontainers import org.testcontainers.utility.DockerImageName +import spock.lang.FailsWith import spock.lang.Specification import spock.lang.TempDir +import org.spockframework.runtime.ConditionNotSatisfiedError import java.nio.charset.StandardCharsets import java.security.KeyPair +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.Future import java.util.concurrent.TimeUnit @Testcontainers @@ -48,6 +58,7 @@ class NukagitIntegrationTest extends Specification { static final String USERNAME = "testuser" var component = DaggerTestComponent.create() + var random = new EasyRandom() SshClient sshClient KeyPair keyPair @@ -139,39 +150,128 @@ class NukagitIntegrationTest extends Specification { component.sshServer().stop() } - def cloneRepository(String path) { + def createRepository(String path) { repositoriesGrpcClient.createRepository(Repositories.CreateRepositoryRequest.newBuilder().setRepositoryName(path).build()) - var clonePath = new File(testDir, path.replace("/", "-")) + var randomPath = random.nextObject(String) + Git git = Git.init() + .setDirectory(new File(testDir, randomPath)) + .setInitialBranch("main") + .call() + git.remoteAdd() + .setName("origin") + .setUri(new URIish("ssh://git@localhost:${sshPort}/${path}")) + .call() + commitRandomFile(git) + git.checkout().setName("main").call() + git.push().setPushAll().call() + // This does not set HEAD, might be a bug in the server + return git + } + + def cloneRepository(String path) { + var randomPath = UUID.randomUUID().toString() + var clonePath = new File(testDir, randomPath) CloneCommand cloneCommand = Git.cloneRepository() cloneCommand.setURI("ssh://git@localhost:${sshPort}/${path}") cloneCommand.setDirectory(clonePath) + // For now set it to main explicitly, because HEAD does not exist in the remote repository + cloneCommand.setBranch("main") return cloneCommand.call() } + def commitRandomFile(Git git) { + var newFile = new File(git.repository.workTree, "test.txt") + newFile.write(random.nextObject(String.class)) + git.add().addFilepattern(".").call() + git.commit().setAuthor("test", "test@example.com").setMessage("Test Change").call() + } + def "test clone empty in-memory repo add file and push it back"() { given: + createRepository("memory/repo") var git = cloneRepository("memory/repo") when: - var newFile = new File(git.repository.directory, "test.txt") - newFile.write("Test Content") + commitRandomFile(git) then: - git.add().addFilepattern(".").call() - git.commit().setAuthor("test", "test@example.com").setMessage("Test Change").call() git.push().call() } def "test clone minio backed repo add file and push it back"() { given: + createRepository("minio/repo") var git = cloneRepository("minio/repo") when: - var newFile = new File(git.repository.directory, "test.txt") - newFile.write("Test Content") + commitRandomFile(git) then: - git.add().addFilepattern(".").call() - git.commit().setAuthor("test", "test@example.com").setMessage("Test Change").call() git.push().call() } + def "test pushing conflicting changes to main should fail"() { + given: + var repoName = "minio/repo" + createRepository(repoName) + var git1 = cloneRepository(repoName) + var git2 = cloneRepository(repoName) + commitRandomFile(git1) + commitRandomFile(git2) + when: + var pushResult1 = git1.push().call().asList() + var pushResult2 = git2.push().call().asList() + then: + pushResult1.size() == 1 + pushResult2.size() == 1 + pushResult1.get(0).getRemoteUpdate("refs/heads/main").getStatus() == RemoteRefUpdate.Status.OK + pushResult2.get(0).getRemoteUpdate("refs/heads/main").getStatus() == RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD + } + + @FailsWith(ConditionNotSatisfiedError) + def "test concurrent pushes to different branches should conflict and fail"() { + // For now this is a conflict causing situation, but I intend to implement + // a mysql native reftable that will handle individual branch updates + given: + createRepository("minio/repo") + + var nThreads = 10 + ArrayList repositories = [] + nThreads.times { + var git = cloneRepository("minio/repo") + commitRandomFile(git) + repositories.push(git) + } + when: + // concurrently run push + ExecutorService executorService = Executors.newFixedThreadPool(nThreads) + List> futures = [] + repositories.forEach {git -> + def closure = { + var branchName = UUID.randomUUID().toString() + var result = git.push() + .setRemote("origin") + .setRefSpecs(new RefSpec("main:${branchName}")) + .call() + .first() + .getRemoteUpdates() + .first() + .getStatus() + return result + } + Future future = executorService.submit(closure as Callable) + futures.add(future) + } + executorService.shutdown() + executorService.awaitTermination(1, TimeUnit.MINUTES) + then: + + futures.each { future -> + // This statement fails because push gets REJECTED_OTHER_REASON status + assert future.get() == RemoteRefUpdate.Status.OK + } + + var git = cloneRepository("minio/repo") + // This statement fails because not all branches have been pushed successfully + git.lsRemote().call().size() == nThreads + 1 + } + def sshRun(String command) { def session = sshClient.connect("git", "localhost", sshPort).verify().getSession() session.auth().verify()