From 3431246d57e0bc34074e3a8ac1d52f6ff1ac3559 Mon Sep 17 00:00:00 2001 From: crioux-stripe <115596126+crioux-stripe@users.noreply.github.com> Date: Wed, 17 Jul 2024 11:07:28 -0700 Subject: [PATCH 01/15] DynamoDB Leader Monitor: On Next Should Always Provide MasterDescription (#692) * DynamoDB Leader Monitor: On Next Should Always Provide MasterDescription --------- Co-authored-by: Kevin Greenan --- .../dynamodb/DynamoDBMasterMonitor.java | 14 +++--- .../dynamodb/DynamoDBClientSingletonTest.java | 13 +++++- .../dynamodb/DynamoDBMasterMonitorTest.java | 43 +++++++++++++++---- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java index f71df6f9f..fcb0ccec6 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java @@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -43,7 +42,7 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class); - private static final MasterDescription MASTER_NULL = + public static final MasterDescription MASTER_NULL = new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L); private final ThreadFactory monitorThreadFactory = r -> { Thread thread = new Thread(r); @@ -66,7 +65,6 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor private final Duration gracefulShutdown; private final BehaviorSubject masterSubject; - private final AtomicReference latestMaster = new AtomicReference<>(); private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance(); @@ -74,7 +72,7 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor * Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector} */ public DynamoDBMasterMonitor() { - masterSubject = BehaviorSubject.create(); + masterSubject = BehaviorSubject.create(MASTER_NULL); final DynamoDBConfig conf = DynamoDBClientSingleton.getDynamoDBConf(); pollInterval = Duration.parse(conf.getDynamoDBLeaderHeartbeatDuration()); gracefulShutdown = Duration.parse(conf.getDynamoDBMonitorGracefulShutdownDuration()); @@ -138,13 +136,11 @@ private void getCurrentLeader() { } private void updateLeader(@Nullable MasterDescription nextDescription) { - final MasterDescription previousDescription = latestMaster.getAndSet(nextDescription); + final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL); final MasterDescription next = (nextDescription == null) ? MASTER_NULL : nextDescription; - final MasterDescription prev = - (previousDescription == null) ? MASTER_NULL : previousDescription; if (!prev.equals(next)) { logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname()); - masterSubject.onNext(nextDescription); + masterSubject.onNext(next); } } @@ -178,6 +174,6 @@ public Observable getMasterObservable() { @Override @Nullable public MasterDescription getLatestMaster() { - return latestMaster.get(); + return Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL); } } diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java index 952d49865..192d92a51 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -128,7 +129,17 @@ public void highAvailabilityServices() throws InterruptedException, IOException .atMost(Duration.ofSeconds(3L)) .pollDelay(Duration.ofSeconds(1L)) .untilAsserted(() -> assertEquals(leaders[2], monitor.getLatestMaster())); - testSubscriber.assertValues(leaders); + + // We can, depending on timing, sometimes get a MASTER_NULL value which is safe to ignore. + MasterDescription[] actualLeaders = testSubscriber.getOnNextEvents().stream() + .filter(md -> md != DynamoDBMasterMonitor.MASTER_NULL) + .collect(Collectors.toList()) + .toArray(new MasterDescription[]{}); + + assertEquals(leaders.length, actualLeaders.length); + assertEquals(leaders[0], actualLeaders[0]); + assertEquals(leaders[1], actualLeaders[1]); + assertEquals(leaders[2], actualLeaders[2]); monitor.shutdown(); dynamoDb.createKVTable(table); diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java index 65b9bca25..64e1b72f2 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java @@ -16,8 +16,7 @@ package io.mantisrx.extensions.dynamodb; import static org.awaitility.Awaitility.await; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.*; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -28,15 +27,12 @@ import java.io.IOException; import java.time.Duration; import java.util.Optional; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import rx.Observable; import rx.observers.TestSubscriber; @RunWith(MockitoJUnitRunner.class) @@ -82,7 +78,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept TestSubscriber testSubscriber = new TestSubscriber<>(); m.getMasterObservable().subscribe(testSubscriber); m.start(); - assertNull(m.getLatestMaster()); + assertEquals(m.getLatestMaster(), DynamoDBMasterMonitor.MASTER_NULL); lockSupport.takeLock(lockKey, otherMaster); await() .atLeast(DynamoDBLockSupportRule.heartbeatDuration) @@ -117,4 +113,35 @@ public void runShutdown() throws IOException { verify(mockLockClient, times(1)).close(); } + + @Test + public void monitorDoesNotReturnNull() throws IOException, InterruptedException { + final String lockKey = "mantis-leader"; + final DynamoDBMasterMonitor m = new DynamoDBMasterMonitor( + lockSupport.getLockClient(), + lockKey, + DynamoDBLockSupportRule.heartbeatDuration, + GRACEFUL + ); + TestSubscriber testSubscriber = new TestSubscriber<>(); + m.getMasterObservable().subscribe(testSubscriber); + m.start(); + + // Write Null + lockSupport.takeLock(lockKey, null); + await() + .atLeast(DynamoDBLockSupportRule.heartbeatDuration) + .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) + .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) + .untilAsserted(() -> assertEquals(DynamoDBMasterMonitor.MASTER_NULL, m.getLatestMaster())); + lockSupport.releaseLock(lockKey); + + m.shutdown(); + + testSubscriber.assertNoTerminalEvent(); + testSubscriber.assertNotCompleted(); + testSubscriber.assertNoErrors(); + Observable.from(testSubscriber.getOnNextEvents()) + .forEach(Assert::assertNotNull); + } } From c0d03706fd35014a5b235f2193354ff1309e3d5e Mon Sep 17 00:00:00 2001 From: sarahwada-stripe <139389030+sarahwada-stripe@users.noreply.github.com> Date: Wed, 17 Jul 2024 12:36:26 -0700 Subject: [PATCH 02/15] add jobVersion metadata to ExecuteStageRequest (#689) * add jobVersion metadata to ExecuteStageRequest --- .../java/io/mantisrx/server/core/ExecuteStageRequest.java | 5 ++++- .../java/io/mantisrx/server/core/domain/JobMetadata.java | 3 +++ .../io/mantisrx/server/core/ExecuteStageRequestTest.java | 8 ++++++-- .../io/mantisrx/server/core/domain/JobMetadataTest.java | 2 +- .../java/io/mantisrx/master/jobcluster/job/JobActor.java | 1 + .../server/master/ExecuteStageRequestFactory.java | 3 ++- .../com/netflix/mantis/master/scheduler/TestHelpers.java | 1 + .../mantisrx/master/jobcluster/job/JobTestLifecycle.java | 2 +- .../main/java/io/mantisrx/runtime/loader/RuntimeTask.java | 1 + .../server/agent/RuntimeTaskImplExecutorTest.java | 3 ++- 10 files changed, 22 insertions(+), 7 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java index ef696d107..349e6ca3a 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java @@ -80,6 +80,7 @@ public class ExecuteStageRequest implements Serializable { @Nullable private final String nameOfJobProviderClass; private final String user; + private final String jobVersion; @JsonCreator @JsonIgnoreProperties(ignoreUnknown = true) @@ -102,7 +103,8 @@ public ExecuteStageRequest( @JsonProperty("minRuntimeSecs") long minRuntimeSecs, @JsonProperty("workerPorts") WorkerPorts workerPorts, @JsonProperty("nameOfJobProviderClass") Optional nameOfJobProviderClass, - @JsonProperty("user") String user) { + @JsonProperty("user") String user, + @JsonProperty("jobVersion") String jobVersion) { this.jobName = jobName; this.jobId = jobId; this.workerIndex = workerIndex; @@ -128,6 +130,7 @@ public ExecuteStageRequest( this.subscriptionTimeoutSecs = subscriptionTimeoutSecs; this.minRuntimeSecs = minRuntimeSecs; this.workerPorts = workerPorts; + this.jobVersion = jobVersion; } public boolean getHasJobMaster() { diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java index b4aa67eed..dcb86a527 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/domain/JobMetadata.java @@ -27,6 +27,7 @@ public class JobMetadata { private final String jobId; private final URL jobJarUrl; + private final String jobVersion; private final int totalStages; private final String user; private final SchedulingInfo schedulingInfo; @@ -37,6 +38,7 @@ public class JobMetadata { public JobMetadata(final String jobId, final URL jobJarUrl, + final String jobVersion, final int totalStages, final String user, final SchedulingInfo schedulingInfo, @@ -46,6 +48,7 @@ public JobMetadata(final String jobId, final long minRuntimeSecs) { this.jobId = jobId; this.jobJarUrl = jobJarUrl; + this.jobVersion = jobVersion; this.totalStages = totalStages; this.user = user; this.schedulingInfo = schedulingInfo; diff --git a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java index 2eb1f22e2..81b5bd85d 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java +++ b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/ExecuteStageRequestTest.java @@ -55,7 +55,8 @@ public void setup() throws Exception { 1L, new WorkerPorts(2, 3, 4, 5, 6), java.util.Optional.of("className"), - "user1"); + "user1", + "111"); example2 = new ExecuteStageRequest("jobName", "jobId-0", 0, 1, new URL("http://datamesh/whatever"), 1, 1, @@ -70,7 +71,8 @@ public void setup() throws Exception { 1L, new WorkerPorts(2, 3, 4, 5, 6), java.util.Optional.empty(), - "user1"); + "user1", + "111"); } @Test @@ -148,6 +150,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJackson( " },\n" + " \"nameOfJobProviderClass\": \"className\",\n" + " \"user\": \"user1\",\n" + + " \"jobVersion\": \"111\",\n" + " \"hasJobMaster\": false,\n" + " \"jobId\": \"jobId-0\",\n" + " \"workerId\":\n" + @@ -231,6 +234,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJacksonW " },\n" + " \"nameOfJobProviderClass\": null,\n" + " \"user\": \"user1\",\n" + + " \"jobVersion\": \"111\",\n" + " \"hasJobMaster\": false,\n" + " \"jobId\": \"jobId-0\",\n" + " \"workerId\":\n" + diff --git a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java index ce245323c..979db516e 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java +++ b/mantis-control-plane/mantis-control-plane-core/src/test/java/io/mantisrx/server/core/domain/JobMetadataTest.java @@ -34,7 +34,7 @@ public void testGetJobArtifact() throws Exception { Lists.newArrayList(), Lists.newArrayList()).build(); JobMetadata jobMetadata = new JobMetadata( - "testId", new URL("http://artifact.zip"),1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0); + "testId", new URL("http://artifact.zip"), "111", 1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0); assertEquals(jobMetadata.getJobArtifact(), ArtifactID.of("artifact.zip")); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index 8fee1981f..0f77e9cf3 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -1630,6 +1630,7 @@ private ScheduleRequest createSchedulingRequest( JobMetadata jobMetadata = new JobMetadata( mantisJobMetaData.getJobId().getId(), mantisJobMetaData.getJobJarUrl(), + mantisJobMetaData.getJobDefinition().getVersion(), mantisJobMetaData.getTotalStages(), mantisJobMetaData.getUser(), mantisJobMetaData.getSchedulingInfo(), diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java index 6b4ded76d..960e41e60 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/ExecuteStageRequestFactory.java @@ -49,6 +49,7 @@ public ExecuteStageRequest of( scheduleRequest.getJobMetadata().getMinRuntimeSecs() - (System.currentTimeMillis() - scheduleRequest.getJobMetadata().getMinRuntimeSecs()), matchedTaskExecutorInfo.getWorkerPorts(), Optional.empty(), - scheduleRequest.getJobMetadata().getUser()); + scheduleRequest.getJobMetadata().getUser(), + scheduleRequest.getJobMetadata().getJobVersion()); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java index 5379d4862..5982d1fad 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/com/netflix/mantis/master/scheduler/TestHelpers.java @@ -62,6 +62,7 @@ public static ScheduleRequest createFakeScheduleRequest(final WorkerId workerId, stageNum, new JobMetadata(mantisJobMetadata.getJobId().getId(), mantisJobMetadata.getJobJarUrl(), + mantisJobMetadata.getJobDefinition().getVersion(), mantisJobMetadata.getTotalStages(), mantisJobMetadata.getUser(), mantisJobMetadata.getSchedulingInfo(), diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java index 1aaba8e12..588299c8f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java @@ -305,7 +305,7 @@ public void testJobSubmitPerpetual() { verify(schedulerMock,times(1)).scheduleWorkers(any()); JobMetadata jobMetadata = new JobMetadata(jobId, new URL("http://myart" + - ""),1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0); + ""),"111", 1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0); ScheduleRequest scheduleRequest = new ScheduleRequest( workerId, 1, jobMetadata,MantisJobDurationType.Perpetual, SchedulingConstraints.of(machineDefinition),0); BatchScheduleRequest expectedRequest = new BatchScheduleRequest(Collections.singletonList(scheduleRequest)); diff --git a/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java b/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java index ee49dead0..d41557d12 100644 --- a/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java +++ b/mantis-runtime-loader/src/main/java/io/mantisrx/runtime/loader/RuntimeTask.java @@ -33,4 +33,5 @@ void initialize( UserCodeClassLoader userCodeClassLoader); String getWorkerId(); + } diff --git a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java index e02e9f318..9829ed0ad 100644 --- a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java +++ b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java @@ -218,7 +218,8 @@ public void testTaskExecutorEndToEndWithASingleStageJobByLoadingFromClassLoader( 1L, new WorkerPorts(2, 3, 4, 5, 6), Optional.of(SineFunctionJobProvider.class.getName()), - "user")), Time.seconds(1)); + "user", + "111")), Time.seconds(1)); wait.get(); Assert.assertTrue(startedSignal.await(5, TimeUnit.SECONDS)); Subscription subscription = HttpSources.source(HttpClientFactories.sseClientFactory(), From dd5a89e0fe614e116d341a17554a0979d627d370 Mon Sep 17 00:00:00 2001 From: rfradkin-stripe Date: Fri, 19 Jul 2024 16:36:52 -0400 Subject: [PATCH 03/15] Rename LeadershipManagerZkImpl to LeadershipManagerImpl (#694) --- ...dershipManagerZkImpl.java => LeadershipManagerImpl.java} | 6 +++--- .../src/main/java/io/mantisrx/server/master/MasterMain.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) rename mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/{LeadershipManagerZkImpl.java => LeadershipManagerImpl.java} (96%) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/LeadershipManagerZkImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/LeadershipManagerImpl.java similarity index 96% rename from mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/LeadershipManagerZkImpl.java rename to mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/LeadershipManagerImpl.java index ff0efbf03..41926e109 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/LeadershipManagerZkImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/LeadershipManagerImpl.java @@ -34,9 +34,9 @@ import rx.functions.Func0; -public class LeadershipManagerZkImpl implements ILeadershipManager { +public class LeadershipManagerImpl implements ILeadershipManager { - private static final Logger logger = LoggerFactory.getLogger(LeadershipManagerZkImpl.class); + private static final Logger logger = LoggerFactory.getLogger(LeadershipManagerImpl.class); private final Gauge isLeaderGauge; private final Gauge isLeaderReadyGauge; private final AtomicBoolean firstTimeLeaderMode = new AtomicBoolean(false); @@ -46,7 +46,7 @@ public class LeadershipManagerZkImpl implements ILeadershipManager { private volatile boolean isReady = false; private volatile Instant becameLeaderAt; - public LeadershipManagerZkImpl(final MasterConfiguration config, + public LeadershipManagerImpl(final MasterConfiguration config, final ServiceLifecycle serviceLifecycle) { this.config = config; this.serviceLifecycle = serviceLifecycle; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java index 71c8b3a1f..0f2bd6b35 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java @@ -119,7 +119,7 @@ public MasterMain( try { ConfigurationProvider.initialize(configFactory); this.config = ConfigurationProvider.getConfig(); - leadershipManager = new LeadershipManagerZkImpl(config, mantisServices); + leadershipManager = new LeadershipManagerImpl(config, mantisServices); Thread t = new Thread(this::shutdown); t.setDaemon(true); From c7c82244f33fe79fc43f2ae1e900f358f7b7a825 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Fri, 19 Jul 2024 16:04:37 -0700 Subject: [PATCH 04/15] Fix maven missing deps: upgrade nebula + gradle (#693) * upgrade nebula+gradle * fix git config on workflows * fix git setup * fix typo * gradle plugin rc * rc3 * fix plugin * fix server-agent build --- .github/workflows/nebula-ci.yml | 4 ++++ .github/workflows/nebula-publish.yml | 4 ++++ .github/workflows/nebula-snapshot.yml | 4 ++++ .github/workflows/push-docker-image.yml | 4 ++++ baseline.gradle | 20 +++++++++---------- build.gradle | 16 +++++++-------- gradle/wrapper/gradle-wrapper.properties | 2 +- .../mantis-connector-job/build.gradle | 2 -- .../mantis-connector-kafka/build.gradle | 2 -- .../mantis-connector-publish/build.gradle | 7 +++++++ .../mantis-control-plane-client/build.gradle | 7 +++++++ .../mantis-control-plane-core/build.gradle | 1 + .../build.gradle | 7 +++++++ .../mantis-control-plane-server/build.gradle | 7 +++++++ .../build.gradle | 7 +++++++ .../mantis-publish-netty/build.gradle | 1 - .../mantis-server-agent/build.gradle | 1 + mantis-shaded/build.gradle | 1 - 18 files changed, 72 insertions(+), 25 deletions(-) diff --git a/.github/workflows/nebula-ci.yml b/.github/workflows/nebula-ci.yml index 4c430cb7c..aa9616e5e 100644 --- a/.github/workflows/nebula-ci.yml +++ b/.github/workflows/nebula-ci.yml @@ -16,6 +16,10 @@ jobs: java: [ 8 ] name: CI with Java ${{ matrix.java }} steps: + - name: Setup Git + run: | + git config --global user.name "Mantis OSS Maintainers" + git config --global user.email "mantis-oss-dev@googlegroups.com" - uses: actions/checkout@v1 - name: Setup jdk uses: actions/setup-java@v1 diff --git a/.github/workflows/nebula-publish.yml b/.github/workflows/nebula-publish.yml index 0985a4256..0c1cfcb4c 100644 --- a/.github/workflows/nebula-publish.yml +++ b/.github/workflows/nebula-publish.yml @@ -12,6 +12,10 @@ jobs: build: runs-on: ubuntu-latest steps: + - name: Setup Git + run: | + git config --global user.name "Mantis OSS Maintainers" + git config --global user.email "mantis-oss-dev@googlegroups.com" - uses: actions/checkout@v2 - name: Setup jdk uses: actions/setup-java@v2 diff --git a/.github/workflows/nebula-snapshot.yml b/.github/workflows/nebula-snapshot.yml index 6ecc6c47c..1afd917a0 100644 --- a/.github/workflows/nebula-snapshot.yml +++ b/.github/workflows/nebula-snapshot.yml @@ -15,6 +15,10 @@ jobs: environment: name: Integrate Pull Request # Our protected environment variable steps: + - name: Setup Git + run: | + git config --global user.name "Mantis OSS Maintainers" + git config --global user.email "mantis-oss-dev@googlegroups.com" - name: Checkout PR uses: actions/checkout@v3 with: diff --git a/.github/workflows/push-docker-image.yml b/.github/workflows/push-docker-image.yml index 96b6c3fc3..9d6801be6 100644 --- a/.github/workflows/push-docker-image.yml +++ b/.github/workflows/push-docker-image.yml @@ -39,6 +39,10 @@ jobs: packages: write steps: + - name: Setup Git + run: | + git config --global user.name "Mantis OSS Maintainers" + git config --global user.email "mantis-oss-dev@googlegroups.com" - name: Checkout PR uses: actions/checkout@v3 - name: Setup jdk diff --git a/baseline.gradle b/baseline.gradle index 26d9eb005..27de3c874 100644 --- a/baseline.gradle +++ b/baseline.gradle @@ -14,15 +14,15 @@ * limitations under the License. */ -allprojects { - apply plugin: 'com.palantir.baseline-idea' -} +// allprojects { +// apply plugin: 'com.palantir.baseline-idea' +// } -subprojects { - // Currently, if any subproject applies the blanket Baseline plugin, it forces the Baseline plugin - // to be applied to ALL projects. And we are not prepared to address all of the build errors that - // occur as a result at this time. +// subprojects { +// // Currently, if any subproject applies the blanket Baseline plugin, it forces the Baseline plugin +// // to be applied to ALL projects. And we are not prepared to address all of the build errors that +// // occur as a result at this time. - apply plugin: 'com.palantir.baseline-exact-dependencies' - apply plugin: 'com.palantir.baseline-format' -} +// apply plugin: 'com.palantir.baseline-exact-dependencies' +// apply plugin: 'com.palantir.baseline-format' +// } diff --git a/build.gradle b/build.gradle index fb4324e6c..26b565902 100644 --- a/build.gradle +++ b/build.gradle @@ -22,16 +22,17 @@ buildscript { url "https://plugins.gradle.org/m2/" } maven { url 'https://artifacts-oss.netflix.net/maven-oss-releases' } + maven { url 'https://artifacts-oss.netflix.net/maven-oss-candidates' } } dependencies { - classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:10.6.0' + classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:11.5.0' classpath 'com.netflix.nebula:nebula-dependency-recommender:11.+' - classpath 'io.mantisrx:mantis-gradle-plugin:1.2.+' - classpath "io.freefair.gradle:lombok-plugin:5.3.3.3" + classpath 'io.mantisrx:mantis-gradle-plugin:1.2.7-rc.4' + classpath "io.freefair.gradle:lombok-plugin:6.+" classpath 'eu.appsatori:gradle-fatjar-plugin:0.3' - classpath("gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0") + classpath("com.github.johnrengelman:shadow:8.1.1") classpath 'gradle.plugin.org.inferred:gradle-processors:3.3.0' - classpath 'com.palantir.baseline:gradle-baseline-java:4.0.0' + // classpath 'com.palantir.baseline:gradle-baseline-java:4.+' classpath 'com.bmuschko:gradle-docker-plugin:6.7.0' classpath "com.palantir.gradle.gitversion:gradle-git-version:3.0.0" } @@ -104,9 +105,8 @@ project.snapshot.configure { finalizedBy printAllReleasedArtifacts } subprojects { apply plugin: 'java-library' - // Apply lombok plugin and disabled the default config file generation. + // Apply lombok plugin. apply plugin: "io.freefair.lombok" - generateLombokConfig.enabled = false lombok { version = "1.18.20" } @@ -186,4 +186,4 @@ subprojects { } } -apply from: file('baseline.gradle') +// apply from: file('baseline.gradle') diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 8049c684f..0d1842103 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/mantis-connectors/mantis-connector-job/build.gradle b/mantis-connectors/mantis-connector-job/build.gradle index 5cea170dd..a6cded0ae 100644 --- a/mantis-connectors/mantis-connector-job/build.gradle +++ b/mantis-connectors/mantis-connector-job/build.gradle @@ -14,8 +14,6 @@ * limitations under the License. */ -apply plugin: 'mantis' - ext { gsonVersion = '2.8.+' } diff --git a/mantis-connectors/mantis-connector-kafka/build.gradle b/mantis-connectors/mantis-connector-kafka/build.gradle index 375bd8be6..58d623b50 100644 --- a/mantis-connectors/mantis-connector-kafka/build.gradle +++ b/mantis-connectors/mantis-connector-kafka/build.gradle @@ -14,8 +14,6 @@ * limitations under the License. */ -apply plugin: 'mantis' - ext { archaiusVersion = '2.3.+' spectatorVersion = '0.82.+' diff --git a/mantis-connectors/mantis-connector-publish/build.gradle b/mantis-connectors/mantis-connector-publish/build.gradle index 782b352f6..34a5469eb 100644 --- a/mantis-connectors/mantis-connector-publish/build.gradle +++ b/mantis-connectors/mantis-connector-publish/build.gradle @@ -27,3 +27,10 @@ dependencies { test { useJUnitPlatform() } + +tasks.named('compileJava') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} +tasks.named('delombok') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} diff --git a/mantis-control-plane/mantis-control-plane-client/build.gradle b/mantis-control-plane/mantis-control-plane-client/build.gradle index bb93e33ff..fb4764d44 100644 --- a/mantis-control-plane/mantis-control-plane-client/build.gradle +++ b/mantis-control-plane/mantis-control-plane-client/build.gradle @@ -26,3 +26,10 @@ dependencies { testImplementation libraries.spectatorApi testImplementation(testFixtures(project(":mantis-common"))) } + +tasks.named('compileJava') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} +tasks.named('delombok') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} diff --git a/mantis-control-plane/mantis-control-plane-core/build.gradle b/mantis-control-plane/mantis-control-plane-core/build.gradle index 41c7dcc52..87cb1d6e7 100644 --- a/mantis-control-plane/mantis-control-plane-core/build.gradle +++ b/mantis-control-plane/mantis-control-plane-core/build.gradle @@ -59,3 +59,4 @@ task copyLibs(type: Copy) { } jar.dependsOn copyLibs +compileTestFixturesJava.dependsOn copyLibs diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/build.gradle b/mantis-control-plane/mantis-control-plane-dynamodb/build.gradle index a881d6398..4c2808d5f 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/build.gradle +++ b/mantis-control-plane/mantis-control-plane-dynamodb/build.gradle @@ -68,3 +68,10 @@ test { maxRetries = 1 } } + +tasks.named('compileJava') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} +tasks.named('delombok') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} diff --git a/mantis-control-plane/mantis-control-plane-server/build.gradle b/mantis-control-plane/mantis-control-plane-server/build.gradle index 10f467c79..a2e2ab2a2 100644 --- a/mantis-control-plane/mantis-control-plane-server/build.gradle +++ b/mantis-control-plane/mantis-control-plane-server/build.gradle @@ -120,3 +120,10 @@ test { maxRetries = 1 } } + +tasks.named('compileJava') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} +tasks.named('delombok') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} diff --git a/mantis-control-plane/mantis-control-plane-store/mantis-control-plane-store-dynamodb/build.gradle b/mantis-control-plane/mantis-control-plane-store/mantis-control-plane-store-dynamodb/build.gradle index e324329d3..16d55999a 100644 --- a/mantis-control-plane/mantis-control-plane-store/mantis-control-plane-store-dynamodb/build.gradle +++ b/mantis-control-plane/mantis-control-plane-store/mantis-control-plane-store-dynamodb/build.gradle @@ -65,3 +65,10 @@ test { maxRetries = 1 } } + +tasks.named('compileJava') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} +tasks.named('delombok') { + dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar') +} diff --git a/mantis-publish/mantis-publish-netty/build.gradle b/mantis-publish/mantis-publish-netty/build.gradle index cd6d7420e..e346a7018 100644 --- a/mantis-publish/mantis-publish-netty/build.gradle +++ b/mantis-publish/mantis-publish-netty/build.gradle @@ -40,7 +40,6 @@ test { } shadowJar { - classifier = null relocate('com.fasterxml', 'io.mantisrx.shaded.com.fasterxml') relocate('io.netty', 'io.mantisrx.shaded.io.netty') relocate('META-INF/native/libnetty', 'META-INF/native/libio_mantisrx_shaded_netty') diff --git a/mantis-server/mantis-server-agent/build.gradle b/mantis-server/mantis-server-agent/build.gradle index 245062235..22898c28d 100644 --- a/mantis-server/mantis-server-agent/build.gradle +++ b/mantis-server/mantis-server-agent/build.gradle @@ -87,5 +87,6 @@ docker { dockerSyncBuildContext.dependsOn(pushServerJob.getTasksByName("mantisZipArtifact", false)) dockerSyncBuildContext.dependsOn(mantisExamplesSineFunctionMantisZipArtifact) dockerSyncBuildContext.dependsOn(installDist) +dockerSyncBuildContext.dependsOn(project.tasks.processTestResources) mainClassName = "io.mantisrx.server.agent.AgentV2Main" diff --git a/mantis-shaded/build.gradle b/mantis-shaded/build.gradle index 083df3651..3e6eee034 100644 --- a/mantis-shaded/build.gradle +++ b/mantis-shaded/build.gradle @@ -71,7 +71,6 @@ dependencies { } shadowJar { - classifier = null configurations = [project.configurations.shaded] exclude 'META-INF/LICENSE' From a61cf1dd6a4a4b536cb52fac4b18770be9acce75 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Sat, 20 Jul 2024 00:25:43 -0700 Subject: [PATCH 05/15] Rename mantis-connector-job to mantis-connector-job-source (#695) * rename connector-job * Update build.gradle Co-authored-by: hmitnflx <100323213+hmitnflx@users.noreply.github.com> --------- Co-authored-by: hmitnflx <100323213+hmitnflx@users.noreply.github.com> --- build.gradle | 4 ++-- .../build.gradle | 0 .../dependencies.lock | 0 .../io/mantisrx/connector/job/core/AbstractJobSource.java | 0 .../mantisrx/connector/job/core/AbstractSourceJobSource.java | 0 .../job/core/DefaultSinkConnectionStatusObserver.java | 0 .../mantisrx/connector/job/core/MantisSourceJobConnector.java | 0 .../connector/job/core/MantisSourceJobConnectorFactory.java | 0 .../connector/job/core/MultiSinkConnectionStatusObserver.java | 0 .../connector/job/core/SinkConnectionStatusObserver.java | 0 .../main/java/io/mantisrx/connector/job/source/JobSource.java | 0 .../mantis-examples-jobconnector-sample/build.gradle | 2 +- settings.gradle | 2 +- 13 files changed, 4 insertions(+), 4 deletions(-) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/build.gradle (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/dependencies.lock (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/AbstractJobSource.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/AbstractSourceJobSource.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/DefaultSinkConnectionStatusObserver.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/MultiSinkConnectionStatusObserver.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/core/SinkConnectionStatusObserver.java (100%) rename mantis-connectors/{mantis-connector-job => mantis-connector-job-source}/src/main/java/io/mantisrx/connector/job/source/JobSource.java (100%) diff --git a/build.gradle b/build.gradle index 26b565902..858c9f102 100644 --- a/build.gradle +++ b/build.gradle @@ -22,16 +22,16 @@ buildscript { url "https://plugins.gradle.org/m2/" } maven { url 'https://artifacts-oss.netflix.net/maven-oss-releases' } - maven { url 'https://artifacts-oss.netflix.net/maven-oss-candidates' } } dependencies { classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:11.5.0' classpath 'com.netflix.nebula:nebula-dependency-recommender:11.+' - classpath 'io.mantisrx:mantis-gradle-plugin:1.2.7-rc.4' + classpath 'io.mantisrx:mantis-gradle-plugin:1.2.7' classpath "io.freefair.gradle:lombok-plugin:6.+" classpath 'eu.appsatori:gradle-fatjar-plugin:0.3' classpath("com.github.johnrengelman:shadow:8.1.1") classpath 'gradle.plugin.org.inferred:gradle-processors:3.3.0' + // todo: baseline is disabled due to no working version on java 8 // classpath 'com.palantir.baseline:gradle-baseline-java:4.+' classpath 'com.bmuschko:gradle-docker-plugin:6.7.0' classpath "com.palantir.gradle.gitversion:gradle-git-version:3.0.0" diff --git a/mantis-connectors/mantis-connector-job/build.gradle b/mantis-connectors/mantis-connector-job-source/build.gradle similarity index 100% rename from mantis-connectors/mantis-connector-job/build.gradle rename to mantis-connectors/mantis-connector-job-source/build.gradle diff --git a/mantis-connectors/mantis-connector-job/dependencies.lock b/mantis-connectors/mantis-connector-job-source/dependencies.lock similarity index 100% rename from mantis-connectors/mantis-connector-job/dependencies.lock rename to mantis-connectors/mantis-connector-job-source/dependencies.lock diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/AbstractJobSource.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/AbstractJobSource.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/AbstractJobSource.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/AbstractJobSource.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/AbstractSourceJobSource.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/AbstractSourceJobSource.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/AbstractSourceJobSource.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/AbstractSourceJobSource.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/DefaultSinkConnectionStatusObserver.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/DefaultSinkConnectionStatusObserver.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/DefaultSinkConnectionStatusObserver.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/DefaultSinkConnectionStatusObserver.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/MultiSinkConnectionStatusObserver.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MultiSinkConnectionStatusObserver.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/MultiSinkConnectionStatusObserver.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MultiSinkConnectionStatusObserver.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/SinkConnectionStatusObserver.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/SinkConnectionStatusObserver.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/core/SinkConnectionStatusObserver.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/SinkConnectionStatusObserver.java diff --git a/mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/source/JobSource.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java similarity index 100% rename from mantis-connectors/mantis-connector-job/src/main/java/io/mantisrx/connector/job/source/JobSource.java rename to mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java diff --git a/mantis-examples/mantis-examples-jobconnector-sample/build.gradle b/mantis-examples/mantis-examples-jobconnector-sample/build.gradle index 7ce3fd451..607910e49 100644 --- a/mantis-examples/mantis-examples-jobconnector-sample/build.gradle +++ b/mantis-examples/mantis-examples-jobconnector-sample/build.gradle @@ -17,7 +17,7 @@ dependencies { api libraries.mantisShaded implementation project(':mantis-runtime') - implementation project(':mantis-connectors:mantis-connector-job') + implementation project(':mantis-connectors:mantis-connector-job-source') implementation libraries.slf4jApi implementation libraries.slf4jLog4j12 diff --git a/settings.gradle b/settings.gradle index 3820f96fc..752591de9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,7 +25,7 @@ include 'mantis-common' include 'mantis-common-serde' include 'mantis-connectors:mantis-connector-iceberg' -include 'mantis-connectors:mantis-connector-job' +include 'mantis-connectors:mantis-connector-job-source' include 'mantis-connectors:mantis-connector-kafka' include 'mantis-connectors:mantis-connector-publish' From d265bfb6f40d979fdcf3fb5b64b355f4449aeea9 Mon Sep 17 00:00:00 2001 From: crioux-stripe <115596126+crioux-stripe@users.noreply.github.com> Date: Tue, 30 Jul 2024 10:57:10 -0700 Subject: [PATCH 06/15] DynamoDBMasterMonitor: Don't publish MASTER_NULL on Dynamo failure (#696) --- gradle.properties | 1 + .../dynamodb/DynamoDBMasterMonitor.java | 43 +++++++++++++++---- .../dynamodb/DynamoDBMasterMonitorTest.java | 7 +-- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/gradle.properties b/gradle.properties index a23ccd6ea..ecbb0efc8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,4 @@ org.gradle.parallel=true org.gradle.caching=false org.gradle.configureondemand=true +org.gradle.jvmargs=-Xmx1G "-XX:MaxMetaspaceSize=384m" diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java index fcb0ccec6..90733883f 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java @@ -17,6 +17,9 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; import com.amazonaws.services.dynamodbv2.LockItem; +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.server.core.BaseService; import io.mantisrx.server.core.json.DefaultObjectMapper; import io.mantisrx.server.core.master.MasterDescription; @@ -68,16 +71,20 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance(); + private final Metrics metrics; + + private final Counter noLockPresentCounter; + private final Counter lockDecodeFailedCounter; + private final Counter nullNextLeaderCounter; + /** * Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector} */ public DynamoDBMasterMonitor() { - masterSubject = BehaviorSubject.create(MASTER_NULL); - final DynamoDBConfig conf = DynamoDBClientSingleton.getDynamoDBConf(); - pollInterval = Duration.parse(conf.getDynamoDBLeaderHeartbeatDuration()); - gracefulShutdown = Duration.parse(conf.getDynamoDBMonitorGracefulShutdownDuration()); - lockClient = DynamoDBClientSingleton.getLockClient(); - partitionKey = DynamoDBClientSingleton.getPartitionKey(); + this(DynamoDBClientSingleton.getLockClient(), + DynamoDBClientSingleton.getPartitionKey(), + Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()), + Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration())); } public DynamoDBMasterMonitor( @@ -85,11 +92,23 @@ public DynamoDBMasterMonitor( String partitionKey, Duration pollInterval, Duration gracefulShutdown) { - masterSubject = BehaviorSubject.create(); + masterSubject = BehaviorSubject.create(MASTER_NULL); this.lockClient = lockClient; this.partitionKey = partitionKey; this.pollInterval = pollInterval; this.gracefulShutdown = gracefulShutdown; + + Metrics m = new Metrics.Builder() + .id("DynamoDBMasterMonitor") + .addCounter("no_lock_present") + .addCounter("lock_decode_failed") + .addCounter("null_next_leader") + .build(); + this.metrics = MetricsRegistry.getInstance().registerAndGet(m); + + this.noLockPresentCounter = metrics.getCounter("no_lock_present"); + this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed"); + this.nullNextLeaderCounter = metrics.getCounter("null_next_leader"); } @Override @@ -128,11 +147,19 @@ private void getCurrentLeader() { if (optionalLock.isPresent()) { final LockItem lock = optionalLock.get(); nextDescription = lock.getData().map(this::bytesToMaster).orElse(null); + logger.warn("failed to decode leader bytes"); + this.lockDecodeFailedCounter.increment(); } else { nextDescription = null; logger.warn("no leader found"); + this.noLockPresentCounter.increment(); + } + + if (nextDescription != null) { + updateLeader(nextDescription); + } else { + this.nullNextLeaderCounter.increment(); } - updateLeader(nextDescription); } private void updateLeader(@Nullable MasterDescription nextDescription) { diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java index 64e1b72f2..9b15ad094 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java @@ -15,6 +15,7 @@ */ package io.mantisrx.extensions.dynamodb; +import static io.mantisrx.extensions.dynamodb.DynamoDBMasterMonitor.MASTER_NULL; import static org.awaitility.Awaitility.await; import static org.junit.Assert.*; import static org.mockito.Mockito.times; @@ -78,7 +79,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept TestSubscriber testSubscriber = new TestSubscriber<>(); m.getMasterObservable().subscribe(testSubscriber); m.start(); - assertEquals(m.getLatestMaster(), DynamoDBMasterMonitor.MASTER_NULL); + assertEquals(MASTER_NULL, m.getLatestMaster()); lockSupport.takeLock(lockKey, otherMaster); await() .atLeast(DynamoDBLockSupportRule.heartbeatDuration) @@ -92,7 +93,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) .untilAsserted(() -> assertEquals(m.getLatestMaster(), thatMaster)); - testSubscriber.assertValues(otherMaster, thatMaster); + testSubscriber.assertValues(MASTER_NULL, otherMaster, thatMaster); m.shutdown(); } @@ -133,7 +134,7 @@ public void monitorDoesNotReturnNull() throws IOException, InterruptedException .atLeast(DynamoDBLockSupportRule.heartbeatDuration) .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) - .untilAsserted(() -> assertEquals(DynamoDBMasterMonitor.MASTER_NULL, m.getLatestMaster())); + .untilAsserted(() -> assertEquals(MASTER_NULL, m.getLatestMaster())); lockSupport.releaseLock(lockKey); m.shutdown(); From ef3ad829e2988f26aa33ed89615f48f4756a5bf6 Mon Sep 17 00:00:00 2001 From: kmg-stripe Date: Tue, 30 Jul 2024 11:13:42 -0700 Subject: [PATCH 07/15] Add Pagination to DynamoDB Query Calls (#697) * Add Pagination to DynamoDB Query Calls Also setting explicit pagination limit. * Remove Unnecessary newlines * Refactor pagination loops --- .../extensions/dynamodb/DynamoDBStore.java | 115 +++++++++++++----- .../dynamodb/DynamoDBStoreTest.java | 32 +++++ 2 files changed, 115 insertions(+), 32 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBStore.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBStore.java index 629671a49..74c9c9fb1 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBStore.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBStore.java @@ -29,6 +29,17 @@ @Slf4j public class DynamoDBStore implements IKeyValueStore { + // Helper class to track pagination with DynamoDB queries + // lastEventuatedKey is used as a pagination cursor + private class DynamoPaginationResult { + public T result; + public Map lastEvaluatedKey; + + DynamoPaginationResult(T result, Map lastEvaluatedKey) { + this.result = result; + this.lastEvaluatedKey = lastEvaluatedKey; + } + } public static final String PK = "PK"; public static final String SK = "SK"; @@ -48,6 +59,7 @@ public class DynamoDBStore implements IKeyValueStore { private static final String MPK_E = "#MPK"; private static final int MAX_ITEMS = 25; + public static final int QUERY_LIMIT = 100; private final String mantisTable; private final DynamoDbClient client; @@ -65,6 +77,32 @@ public DynamoDBStore(DynamoDbClient client, String tableName ) { this.client = client; this.mantisTable = tableName; } + + private DynamoPaginationResult> _getAllPartitionKeys(String tableName, Map lastEvaluatedKey) { + Map expressionAttributesNames = new HashMap<>(); + expressionAttributesNames.put(PK_E, PK); + expressionAttributesNames.put(MPK_E, PARTITION_KEY); + Map expressionAttributeValues = new HashMap<>(); + expressionAttributeValues.put(PK_V, AttributeValue.builder().s(tableName).build()); + + QueryRequest.Builder builder = QueryRequest.builder() + .tableName(this.mantisTable) + .keyConditionExpression(String.format("%s = %s", PK_E, PK_V)) + .expressionAttributeNames(expressionAttributesNames) + .expressionAttributeValues(expressionAttributeValues) + .projectionExpression(MPK_E).limit(QUERY_LIMIT); + + if(lastEvaluatedKey != null) { + builder = builder.exclusiveStartKey(lastEvaluatedKey); + } + final QueryRequest request = builder.build(); + + final QueryResponse response = this.client.query(request); + final Map pks = new HashMap<>(); + response.items().forEach(v -> pks.put(v.get(PARTITION_KEY).s(), "")); + return new DynamoPaginationResult<>(new ArrayList<>(pks.keySet()), response.lastEvaluatedKey()); + } + /** * Gets all partition keys from the table. * This could be beneficial to call instead of getAllRows @@ -78,26 +116,45 @@ public DynamoDBStore(DynamoDbClient client, String tableName ) { * @return list of all partition keys */ @Override - public List getAllPartitionKeys(String tableName) throws IOException { + public List getAllPartitionKeys(String tableName) { + final List results = new ArrayList<>(); + Map lastEvaluatedKey = null; + + while(true) { + DynamoPaginationResult> result = this._getAllPartitionKeys(tableName, lastEvaluatedKey); + results.addAll(result.result); + if (!result.lastEvaluatedKey.isEmpty()) { + log.info("partial result for all partition keys query, left off at partitionKey={} of table={}", result.lastEvaluatedKey.get("SK").s(), tableName); + lastEvaluatedKey = result.lastEvaluatedKey; + } else { + break; + } + } + log.info("found {} items when querying for all partition keys in table={}", results.size(), tableName); + return results; + } + + private DynamoPaginationResult>> _getAll(String tableName, String partitionKey, Map lastEvaluatedKey) { Map expressionAttributesNames = new HashMap<>(); expressionAttributesNames.put(PK_E, PK); - expressionAttributesNames.put(MPK_E, PARTITION_KEY); + expressionAttributesNames.put(SK_E, SK); Map expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(PK_V, AttributeValue.builder().s(tableName).build()); + expressionAttributeValues.put(SK_V, AttributeValue.builder().s(String.format("%s#", partitionKey)).build()); - final QueryRequest request = QueryRequest.builder() - .tableName(this.mantisTable) - .keyConditionExpression(String.format("%s = %s", PK_E, PK_V)) - .expressionAttributeNames(expressionAttributesNames) - .expressionAttributeValues(expressionAttributeValues) - .projectionExpression(MPK_E) - .build(); + QueryRequest.Builder builder = QueryRequest.builder() + .tableName(this.mantisTable) + .keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)", PK_E, PK_V, SK_E, SK_V)) + .expressionAttributeNames(expressionAttributesNames) + .expressionAttributeValues(expressionAttributeValues).limit(QUERY_LIMIT); + + if(lastEvaluatedKey != null) { + builder = builder.exclusiveStartKey(lastEvaluatedKey); + } + final QueryRequest request = builder.build(); - log.info("querying for all partition keys in table {}", tableName); final QueryResponse response = this.client.query(request); - final Map pks = new HashMap<>(); - response.items().forEach(v -> pks.put(v.get(PARTITION_KEY).s(), "")); - return new ArrayList<>(pks.keySet()); + return new DynamoPaginationResult<>(response.items(), response.lastEvaluatedKey()); } /** @@ -109,26 +166,20 @@ public List getAllPartitionKeys(String tableName) throws IOException { */ @Override public Map getAll(String tableName, String partitionKey) throws IOException { - - Map expressionAttributesNames = new HashMap<>(); - expressionAttributesNames.put(PK_E, PK); - expressionAttributesNames.put(SK_E, SK); - Map expressionAttributeValues = new HashMap<>(); - expressionAttributeValues.put(PK_V, AttributeValue.builder().s(tableName).build()); - expressionAttributeValues.put(SK_V, AttributeValue.builder().s(String.format("%s#", partitionKey)).build()); - - final QueryRequest request = QueryRequest.builder() - .tableName(this.mantisTable) - .keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)", PK_E, PK_V, SK_E, SK_V)) - .expressionAttributeNames(expressionAttributesNames) - .expressionAttributeValues(expressionAttributeValues) - .build(); - - log.info("querying for all items in partition {} in table {}", partitionKey, tableName); - final QueryResponse response = this.client.query(request); final Map items = new HashMap<>(); - response.items() - .forEach(v -> items.put(v.get(SECONDARY_KEY).s(), v.get(DATA_KEY).s())); + Map lastEvaluatedKey = null; + while(true) { + DynamoPaginationResult>> result = this._getAll(tableName, partitionKey, lastEvaluatedKey); + result.result.forEach(v -> items.put(v.get(SECONDARY_KEY).s(), v.get(DATA_KEY).s())); + if (!result.lastEvaluatedKey.isEmpty()) { + log.info("partial result for get all query, left off at SK={} of table={}", result.lastEvaluatedKey.get("SK").s(), tableName); + lastEvaluatedKey = result.lastEvaluatedKey; + } else { + break; + } + } + + log.info("found {} items when querying for all items in partition {} in table {}", items.size(), partitionKey, tableName); return items; } diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBStoreTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBStoreTest.java index f0f998e40..03cadc0fe 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBStoreTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBStoreTest.java @@ -159,8 +159,40 @@ public void testInsertAndGetAllMoreThan25() throws Exception { assertEquals(skData1.size(), itemsPK1.size()); final Map itemsPK3 = store.getAll(table, pks.get(2)); assertEquals(skData3.size(), itemsPK3.size()); + } + @Test + public void testInsertAndGetAllMoreThanLimit() throws Exception { + IKeyValueStore store = new DynamoDBStore(client, table); + int numRows = DynamoDBStore.QUERY_LIMIT * 2 + 1; + final List pks = makePKs(1); + final Map skData1 = new HashMap<>(); + for (int i = 0; i < numRows; i++) { + skData1.put(String.valueOf(i), V1); + } + assertTrue(store.upsertAll(table, pks.get(0), skData1)); + final Map itemsPK1 = store.getAll(table, pks.get(0)); + assertEquals(skData1.size(), itemsPK1.size()); } + + @Test + public void testUpsertAndGetAllPkMoreThanLimit() throws Exception { + IKeyValueStore store = new DynamoDBStore(client, table); + int numRows = DynamoDBStore.QUERY_LIMIT * 2 + 1; + final List pks = new ArrayList<>(); + for (int i = 0; i < numRows; i++) { + pks.add(UUID.randomUUID().toString()); + } + Collections.sort(pks); + final Map skData1 = new HashMap<>(); + for(int i=0; i< numRows; i++) { + store.upsert(table, pks.get(i), String.valueOf(i), V1); + } + final List allPKs = store.getAllPartitionKeys(table); + Collections.sort(allPKs); + assertEquals(pks,allPKs); + } + private List makePKs(int num) { final List pks = new ArrayList<>(); for(int i = 0; i<3; i++) { From a2934008e3976209673c5d1239fb2d5ea4924f91 Mon Sep 17 00:00:00 2001 From: kmg-stripe Date: Wed, 31 Jul 2024 16:10:22 -0700 Subject: [PATCH 08/15] Fix WorkersByJobId: Filtered Workers Not Empty Check (#699) --- .../persistence/KeyValueBasedPersistenceProvider.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java index 4e48f7a29..41a6dcd35 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java @@ -386,9 +386,11 @@ private Map> getAllWorkersByJobId(fin }) .filter(Objects::nonNull) .collect(Collectors.toList()); - workersByJobId - .computeIfAbsent(workers.get(0).getJobId(), k -> Lists.newArrayList()) - .addAll(workers); + if(!workers.isEmpty()) { + workersByJobId + .computeIfAbsent(workers.get(0).getJobId(), k -> Lists.newArrayList()) + .addAll(workers); + } } return workersByJobId; } From 87388d35a23939658d14b058f5ce1e0c3736ebb9 Mon Sep 17 00:00:00 2001 From: sundargates Date: Wed, 7 Aug 2024 16:26:24 -0700 Subject: [PATCH 09/15] Adding some debug log to mantis scheduling to understand what is slow (#701) --- .../java/io/mantisrx/master/events/WorkerMetricsCollector.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/events/WorkerMetricsCollector.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/events/WorkerMetricsCollector.java index 5412aec09..67d0e8fc1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/events/WorkerMetricsCollector.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/events/WorkerMetricsCollector.java @@ -134,6 +134,8 @@ public void process(WorkerStatusEvent workerStatusEvent) { // do nothing; This is the initial state break; case Launched: + log.debug("Worker {} launched with scheduling time: {}", + workerId, Math.max(0L, workerStatusEvent.getTimestamp() - metadata.getAcceptedAt())); // this represents the scheduling time workerMetrics.reportSchedulingDuration( Math.max(0L, workerStatusEvent.getTimestamp() - metadata.getAcceptedAt())); From 25895095963a8bfe566ec8583e8da2a68282644b Mon Sep 17 00:00:00 2001 From: fdc-ntflx <103213338+fdc-ntflx@users.noreply.github.com> Date: Wed, 14 Aug 2024 14:20:27 -0400 Subject: [PATCH 10/15] Add debug logs on scheduling logic (#702) --- .../ExecutorStateManagerImpl.java | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index fc80491c2..02489a5d0 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -338,9 +338,9 @@ private Optional> findBestFitFor(TaskExec return Optional.empty(); } - log.info("Applying assignment request: {} to constraints {}.", request, bestFitTeGroupKey); + log.info("Applying assignment request: {} to best fit TE group {}.", request, bestFitTeGroupKey); if (!this.executorsByGroup.containsKey(bestFitTeGroupKey.get())) { - log.warn("No available TE found for constraints: {}, request: {}", bestFitTeGroupKey.get(), request); + log.warn("No available TE found for best fit TE group: {}, request: {}", bestFitTeGroupKey.get(), request); return Optional.empty(); } @@ -508,6 +508,13 @@ private Optional> findTaskExecutorsFor(Ta return taskExecutors; } else { log.warn("Not enough available TEs found for scheduling constraints {}, request: {}", schedulingConstraints, request); + if (taskExecutors.isPresent()) { + log.debug("Found {} Task Executors: {} for request: {} with constraints: {}", + taskExecutors.get().size(), taskExecutors.get(), request, schedulingConstraints); + } else { + log.warn("No suitable Task Executors found for request: {} with constraints: {}", + request, schedulingConstraints); + } // If there are not enough workers with the given spec then add the request the pending ones if (!isJobIdAlreadyPending && request.getAllocationRequests().size() > 2) { @@ -581,8 +588,10 @@ private Optional findBestGroupBySizeNameMatch(SchedulingCo */ private Optional findBestGroupByFitnessCalculator(SchedulingConstraints requestedConstraints) { log.info("Falling back to find best group by fitness calculator for constraints: {}", requestedConstraints); + log.debug("All present executor groups: {}", executorsByGroup.keySet()); - return executorsByGroup.keySet() + // Filter and sort Task Executor Group Keys based on fitness score + final List> groupFitnessList = executorsByGroup.keySet() .stream() // Filter out if both sizeName exist and are different (ie. small vs large) .filter(taskExecutorGroupKey -> { @@ -618,8 +627,18 @@ private Optional findBestGroupByFitnessCalculator(Scheduli return Comparator.nullsLast(Comparator.reverseOrder()).compare(generation1, generation2); } }) - .map(AbstractMap.SimpleEntry::getKey) - .findFirst(); + .collect(Collectors.toList()); + + if (groupFitnessList.isEmpty()) { + log.debug("No suitable Task Executor Groups found for constraints: {}", requestedConstraints); + } else { + log.debug("Fitness calculation results for the Task Executor Groups:"); + for (Map.Entry entry : groupFitnessList) { + log.debug("TaskExecutorGroupKey: {}, Fitness Score: {}", entry.getKey(), entry.getValue()); + } + } + + return groupFitnessList.stream().map(Map.Entry::getKey).findFirst(); } /** From 46e8c23c52137028ef80e953f3ab671f5c16c202 Mon Sep 17 00:00:00 2001 From: kmg-stripe Date: Wed, 21 Aug 2024 07:25:12 -0700 Subject: [PATCH 11/15] Fixes to Source Job Connector and Master Monitor Logging (#703) 1. All source job connectors have ZK config hard-coded due to parameter-less MantisSourceJobConnectorFactory. Configuring a MantisClient in this context doesn't make sense, since we can reference the configured HighAvailabilityServices 2. DynamoDBMasterMonitor was erroneuously logging parse errors when reading leadership info from the DB. 3. HighAvailabilityServicesImpl was setting the leader state to NULL, which lead to ResourceClusterGatewayClient constructing an invalid URI. Updating to not update the ResourceClusterGatewayClient when master is NULL. --- .../java/io/mantisrx/client/MantisClient.java | 6 +++++ .../java/io/mantisrx/client/MantisSSEJob.java | 15 ++++++++----- .../mantis-connector-job-source/build.gradle | 1 + .../job/core/MantisSourceJobConnector.java | 21 +++++++++++++----- .../core/MantisSourceJobConnectorFactory.java | 2 +- .../client/HighAvailabilityServicesUtil.java | 22 ++++++++++++++++++- .../server/core/master/MasterDescription.java | 3 +++ .../dynamodb/DynamoDBMasterMonitor.java | 17 +++++++------- .../dynamodb/DynamoDBClientSingletonTest.java | 2 +- .../dynamodb/DynamoDBMasterMonitorTest.java | 7 +++--- 10 files changed, 69 insertions(+), 27 deletions(-) diff --git a/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java b/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java index a61a2f220..79e75183a 100644 --- a/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java +++ b/mantis-client/src/main/java/io/mantisrx/client/MantisClient.java @@ -113,6 +113,12 @@ public MantisClient(MasterClientWrapper clientWrapper, boolean disablePingFilter this.clientWrapper = clientWrapper; } + public MantisClient(HighAvailabilityServices haServices) { + haServices.awaitRunning(); + clientWrapper = new MasterClientWrapper(haServices.getMasterClientApi()); + this.disablePingFiltering = false; + } + public MantisClient(MasterClientWrapper clientWrapper) { this(clientWrapper, false); } diff --git a/mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java b/mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java index 93ad5ef04..a000b7d00 100644 --- a/mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java +++ b/mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java @@ -23,6 +23,7 @@ import io.mantisrx.runtime.parameter.Parameter; import io.mantisrx.runtime.parameter.SinkParameters; import io.mantisrx.server.master.client.ConditionalRetry; +import io.mantisrx.server.master.client.HighAvailabilityServices; import io.mantisrx.server.master.client.NoSuchJobException; import io.reactivx.mantis.operators.DropOperator; import java.io.Closeable; @@ -179,10 +180,6 @@ public static class Builder { private Observer sinkConnectionsStatusObserver = null; private long dataRecvTimeoutSecs = 5; - public Builder(Properties properties) { - this(new MantisClient(properties)); - } - public Builder() { Properties properties = new Properties(); properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000"); @@ -191,10 +188,18 @@ public Builder() { properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString")); properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root")); properties.setProperty("mantis.zookeeper.leader.announcement.path", - System.getenv("mantis.zookeeper.leader.announcement.path")); + System.getenv("mantis.zookeeper.leader.announcement.path")); mantisClient = new MantisClient(properties); } + public Builder(HighAvailabilityServices haServices) { + this(new MantisClient(haServices)); + } + + public Builder(Properties properties) { + this(new MantisClient(properties)); + } + public Builder(MantisClient mantisClient) { this.mantisClient = mantisClient; } diff --git a/mantis-connectors/mantis-connector-job-source/build.gradle b/mantis-connectors/mantis-connector-job-source/build.gradle index a6cded0ae..efa9ff985 100644 --- a/mantis-connectors/mantis-connector-job-source/build.gradle +++ b/mantis-connectors/mantis-connector-job-source/build.gradle @@ -22,6 +22,7 @@ dependencies { implementation project(":mantis-runtime") implementation project(":mantis-client") implementation project(":mantis-control-plane:mantis-control-plane-core") + implementation project(":mantis-control-plane:mantis-control-plane-client") implementation project(":mantis-publish:mantis-publish-core") implementation "com.google.code.gson:gson:$gsonVersion" diff --git a/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java index b73c43c7d..0a0a76868 100644 --- a/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java +++ b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnector.java @@ -27,6 +27,8 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + +import io.mantisrx.server.master.client.HighAvailabilityServicesUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observer; @@ -59,12 +61,17 @@ public class MantisSourceJobConnector { private static final String ZK_ROOT = "mantis.zookeeper.root"; private static final String ZK_LEADER_PATH = "mantis.zookeeper.leader.announcement.path"; - public MantisSourceJobConnector(Properties props) { - this.props = props; + public MantisSourceJobConnector(boolean configureDefaults) { + if (configureDefaults) { + props = defaultProperties(); + } else { + props = null; + } } - public MantisSourceJobConnector() { - props = new Properties(); + // todo(kmg-stripe): Can we remove this? It seems it is only used by main in this class for testing. + private static Properties defaultProperties() { + Properties props = new Properties(); final String defaultZkConnect = "127.0.0.1:2181"; final String defaultZkRoot = "/mantis/master"; @@ -99,6 +106,7 @@ public MantisSourceJobConnector() { } LOGGER.info("Mantis Zk settings used for Source Job connector: connectString {} root {} path {}", connectString, zookeeperRoot, zookeeperLeaderAnnouncementPath); + return props; } @Deprecated @@ -130,7 +138,8 @@ public MantisSSEJob connectToJob( String jobName, SinkParameters params, Observer sinkObserver) { - return new MantisSSEJob.Builder(props) + MantisSSEJob.Builder builder = props != null ? new MantisSSEJob.Builder(props) : new MantisSSEJob.Builder(HighAvailabilityServicesUtil.get()); + return builder .name(jobName) .sinkConnectionsStatusObserver(sinkObserver) .onConnectionReset(throwable -> LOGGER.error("Reconnecting due to error: " + throwable.getMessage())) @@ -163,7 +172,7 @@ public static void main(String[] args) { Args.parse(MantisSourceJobConnector.class, args); final CountDownLatch latch = new CountDownLatch(20); - MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(); + MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(true); MantisSSEJob job = sourceJobConnector.connectToJob("TestSourceJob", params); Subscription subscription = job.connectAndGetObservable() .doOnNext(o -> { diff --git a/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java index 3f5ea975c..6a84d895b 100644 --- a/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java +++ b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/core/MantisSourceJobConnectorFactory.java @@ -19,6 +19,6 @@ public class MantisSourceJobConnectorFactory { public static MantisSourceJobConnector getConnector() { - return new MantisSourceJobConnector(); + return new MantisSourceJobConnector(false); } } diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java index 21adda463..609df82db 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java @@ -15,6 +15,7 @@ */ package io.mantisrx.server.master.client; +import com.mantisrx.common.utils.Services; import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; @@ -82,6 +83,17 @@ public static HighAvailabilityServices createHAServices(CoreConfiguration config return HAServiceInstanceRef.get(); } + // This getter is used in situations where the context does not know the core configuration. For example, this + // is used to create a MantisClient when configuring a JobSource, where a job instance does not know how Mantis + // is configured. + // Note that in this context, the agent should have configured HighAvailabilityServices. + public static HighAvailabilityServices get() { + if (HAServiceInstanceRef.get() == null) { + throw new RuntimeException("HighAvailabilityServices have not been initialized"); + } + return HAServiceInstanceRef.get(); + } + private static class LocalHighAvailabilityServices extends AbstractIdleService implements HighAvailabilityServices { private final MasterMonitor masterMonitor; private final CoreConfiguration configuration; @@ -131,6 +143,7 @@ private static class HighAvailabilityServicesImpl extends AbstractIdleService im private final MasterMonitor masterMonitor; private final Counter resourceLeaderChangeCounter; private final Counter resourceLeaderAlreadyRegisteredCounter; + private final Counter resourceLeaderIsEmptyCounter; private final AtomicInteger rmConnections = new AtomicInteger(0); private final CoreConfiguration configuration; @@ -152,9 +165,11 @@ public HighAvailabilityServicesImpl(CoreConfiguration configuration) { .name(metricsGroup) .addCounter("resourceLeaderChangeCounter") .addCounter("resourceLeaderAlreadyRegisteredCounter") + .addCounter("resourceLeaderIsEmptyCounter") .build()); resourceLeaderChangeCounter = metrics.getCounter("resourceLeaderChangeCounter"); resourceLeaderAlreadyRegisteredCounter = metrics.getCounter("resourceLeaderAlreadyRegisteredCounter"); + resourceLeaderIsEmptyCounter = metrics.getCounter("resourceLeaderIsEmptyCounter"); } @@ -209,7 +224,12 @@ public void register(ResourceLeaderChangeListener change .subscribe(nextDescription -> { log.info("nextDescription={}", nextDescription); - if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) { + // We do not want to update if the master is set to null. This is usually due to a newly + // initialized master monitor. + if (nextDescription.equals(MasterDescription.MASTER_NULL)) { + resourceLeaderIsEmptyCounter.increment(); + return; + } else if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) { resourceLeaderAlreadyRegisteredCounter.increment(); return; } diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/MasterDescription.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/MasterDescription.java index 8114a8dcb..bcfdd55bf 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/MasterDescription.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/MasterDescription.java @@ -49,6 +49,9 @@ public class MasterDescription { private final long createTime; private final int consolePort; + public static final MasterDescription MASTER_NULL = + new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L); + @JsonCreator @JsonIgnoreProperties(ignoreUnknown = true) public MasterDescription( diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java index 90733883f..a1be73355 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java @@ -40,13 +40,13 @@ import rx.Observable; import rx.subjects.BehaviorSubject; + @Slf4j public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor { private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class); - public static final MasterDescription MASTER_NULL = - new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L); + private final ThreadFactory monitorThreadFactory = r -> { Thread thread = new Thread(r); thread.setName("dynamodb-monitor-" + System.currentTimeMillis()); @@ -92,7 +92,7 @@ public DynamoDBMasterMonitor( String partitionKey, Duration pollInterval, Duration gracefulShutdown) { - masterSubject = BehaviorSubject.create(MASTER_NULL); + masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL); this.lockClient = lockClient; this.partitionKey = partitionKey; this.pollInterval = pollInterval; @@ -147,8 +147,6 @@ private void getCurrentLeader() { if (optionalLock.isPresent()) { final LockItem lock = optionalLock.get(); nextDescription = lock.getData().map(this::bytesToMaster).orElse(null); - logger.warn("failed to decode leader bytes"); - this.lockDecodeFailedCounter.increment(); } else { nextDescription = null; logger.warn("no leader found"); @@ -163,8 +161,8 @@ private void getCurrentLeader() { } private void updateLeader(@Nullable MasterDescription nextDescription) { - final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL); - final MasterDescription next = (nextDescription == null) ? MASTER_NULL : nextDescription; + final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL); + final MasterDescription next = (nextDescription == null) ? MasterDescription.MASTER_NULL : nextDescription; if (!prev.equals(next)) { logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname()); masterSubject.onNext(next); @@ -183,8 +181,9 @@ private MasterDescription bytesToMaster(ByteBuffer data) { return jsonMapper.readValue(bytes, MasterDescription.class); } catch (IOException e) { logger.error("unable to parse master description bytes: {}", data, e); + this.lockDecodeFailedCounter.increment(); } - return MASTER_NULL; + return MasterDescription.MASTER_NULL; } @Override @@ -201,6 +200,6 @@ public Observable getMasterObservable() { @Override @Nullable public MasterDescription getLatestMaster() { - return Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL); + return Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL); } } diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java index 192d92a51..fdee4315c 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBClientSingletonTest.java @@ -132,7 +132,7 @@ public void highAvailabilityServices() throws InterruptedException, IOException // We can, depending on timing, sometimes get a MASTER_NULL value which is safe to ignore. MasterDescription[] actualLeaders = testSubscriber.getOnNextEvents().stream() - .filter(md -> md != DynamoDBMasterMonitor.MASTER_NULL) + .filter(md -> md != MasterDescription.MASTER_NULL) .collect(Collectors.toList()) .toArray(new MasterDescription[]{}); diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java index 9b15ad094..5c525a07a 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java @@ -15,7 +15,6 @@ */ package io.mantisrx.extensions.dynamodb; -import static io.mantisrx.extensions.dynamodb.DynamoDBMasterMonitor.MASTER_NULL; import static org.awaitility.Awaitility.await; import static org.junit.Assert.*; import static org.mockito.Mockito.times; @@ -79,7 +78,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept TestSubscriber testSubscriber = new TestSubscriber<>(); m.getMasterObservable().subscribe(testSubscriber); m.start(); - assertEquals(MASTER_NULL, m.getLatestMaster()); + assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster()); lockSupport.takeLock(lockKey, otherMaster); await() .atLeast(DynamoDBLockSupportRule.heartbeatDuration) @@ -93,7 +92,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) .untilAsserted(() -> assertEquals(m.getLatestMaster(), thatMaster)); - testSubscriber.assertValues(MASTER_NULL, otherMaster, thatMaster); + testSubscriber.assertValues(MasterDescription.MASTER_NULL, otherMaster, thatMaster); m.shutdown(); } @@ -134,7 +133,7 @@ public void monitorDoesNotReturnNull() throws IOException, InterruptedException .atLeast(DynamoDBLockSupportRule.heartbeatDuration) .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) - .untilAsserted(() -> assertEquals(MASTER_NULL, m.getLatestMaster())); + .untilAsserted(() -> assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster())); lockSupport.releaseLock(lockKey); m.shutdown(); From fa55cb13cc74c4e59439f822c3eec244c6ef9ca7 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 5 Sep 2024 17:26:23 -0700 Subject: [PATCH 12/15] support fallback blobstore (#707) --- .../io/mantisrx/server/agent/BlobStore.java | 48 +++++++++++++++++++ .../mantisrx/server/agent/BlobStoreTest.java | 33 +++++++++++++ 2 files changed, 81 insertions(+) diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java index c077f4fe5..bb50be8f4 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import net.lingala.zip4j.ZipFile; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; @@ -43,6 +44,10 @@ default BlobStore withPrefix(URI prefixUri) { return new PrefixedBlobStore(prefixUri, this); } + default BlobStore withFallbackStore(BlobStore fallbackStore) { + return new FallbackEnabledBlobStore(this, fallbackStore); + } + /** * blob store that when downloading zip files, also unpacks them and returns the unpacked file/directory to the caller. * @@ -67,6 +72,23 @@ static BlobStore forHadoopFileSystem(URI clusterStoragePath, File localStoreDir) .withThreadSafeBlobStore(); } + static BlobStore forHadoopFileSystem(URI clusterStoragePath, URI fallbackStoragePath, File localStoreDir) + throws Exception { + final org.apache.hadoop.fs.FileSystem fileSystem = + FileSystemInitializer.create(clusterStoragePath); + + final org.apache.hadoop.fs.FileSystem fallbackFileSystem = + FileSystemInitializer.create(fallbackStoragePath); + + return + new HadoopFileSystemBlobStore(fileSystem, localStoreDir) + .withPrefix(clusterStoragePath) + .withFallbackStore( + new HadoopFileSystemBlobStore(fallbackFileSystem, localStoreDir).withPrefix(fallbackStoragePath)) + .withZipCapabilities() + .withThreadSafeBlobStore(); + } + @RequiredArgsConstructor(access = AccessLevel.PACKAGE) class PrefixedBlobStore implements BlobStore { private final URI rootUri; @@ -84,6 +106,32 @@ public void close() throws IOException { } } + @RequiredArgsConstructor(access = AccessLevel.PACKAGE) + @Slf4j + class FallbackEnabledBlobStore implements BlobStore { + private final BlobStore blobStore; + private final BlobStore fallbackBlobStore; + + @Override + public File get(URI blobUrl) throws IOException { + try + { + return blobStore.get(blobUrl); + } + catch (IOException e) { + log.error("Get blob error, fallback to next blobstore", e); + } + + return fallbackBlobStore.get(blobUrl); + } + + @Override + public void close() throws IOException { + blobStore.close(); + fallbackBlobStore.close(); + } + } + @RequiredArgsConstructor(access = AccessLevel.PACKAGE) class ZipHandlingBlobStore implements BlobStore { diff --git a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/BlobStoreTest.java b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/BlobStoreTest.java index fc7cb7d7e..e1e1371d6 100644 --- a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/BlobStoreTest.java +++ b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/BlobStoreTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.net.URI; import org.junit.Test; import org.mockito.Matchers; @@ -46,4 +47,36 @@ public void testPrefixedBlobStore() throws Exception { "https://mantisrx.region.prod.io.net/mantis-artifacts/sananthanarayanan-mantis-jobs-sine-function-thin-0.1.0.zip")); verify(blobStore, times(2)).get(Matchers.eq(expectedUri)); } + + @Test + public void testFallbackBlobStore() throws Exception { + final BlobStore blobStore = mock(BlobStore.class); + final BlobStore fallbackBlobStore = mock(BlobStore.class); + final File file = mock(File.class); + when(fallbackBlobStore.get(any())).thenReturn(file); + when(blobStore.get(any())).thenThrow(new IOException("mock io err")).thenReturn(file); + + final BlobStore prefixedBlobStore = + new BlobStore.PrefixedBlobStore(new URI("s3://mantisrx.s3.store/mantis/jobs/"), blobStore); + + final BlobStore fallbackPrefixedBlobStore = + new BlobStore.PrefixedBlobStore(new URI("s3://mantisrx.s3.store.fallback/mantis/jobs/"), fallbackBlobStore); + + final BlobStore fallbackEnabledBlobStore = prefixedBlobStore.withFallbackStore(fallbackPrefixedBlobStore); + fallbackEnabledBlobStore.get(new URI("http://sananthanarayanan-mantis-jobs-sine-function-thin-0.1.0.zip")); + + final URI expectedUri = + new URI("s3://mantisrx.s3.store/mantis/jobs/sananthanarayanan-mantis-jobs-sine-function-thin-0.1.0.zip"); + verify(blobStore, times(1)).get(Matchers.eq(expectedUri)); + + final URI expectedFallbackUri = + new URI("s3://mantisrx.s3.store.fallback/mantis/jobs/sananthanarayanan-mantis-jobs-sine-function-thin-0.1" + + ".0.zip"); + verify(fallbackBlobStore, times(1)).get(Matchers.eq(expectedFallbackUri)); + + // test non-fallback path + fallbackEnabledBlobStore.get(new URI("http://sananthanarayanan-mantis-jobs-sine-function-thin-0.1.0.zip")); + verify(blobStore, times(2)).get(Matchers.eq(expectedUri)); + verify(fallbackBlobStore, times(1)).get(Matchers.eq(expectedFallbackUri)); + } } From 1e29767d3517cd5763bf8c76cf707beb2e2c2760 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Mon, 9 Sep 2024 10:20:12 -0700 Subject: [PATCH 13/15] fix blobstore fallback (#710) --- .../src/main/java/io/mantisrx/server/agent/BlobStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java index bb50be8f4..d75bb02f0 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/BlobStore.java @@ -118,7 +118,7 @@ public File get(URI blobUrl) throws IOException { { return blobStore.get(blobUrl); } - catch (IOException e) { + catch (Exception e) { log.error("Get blob error, fallback to next blobstore", e); } From fe788560d33144c2a7498f9f2c94b0d53e5625c1 Mon Sep 17 00:00:00 2001 From: crioux-stripe <115596126+crioux-stripe@users.noreply.github.com> Date: Wed, 11 Sep 2024 10:29:57 -0700 Subject: [PATCH 14/15] MantisMasterClientApi: Handle HTTP 400 on GET assignmentresults (#711) --- .../mantisrx/server/master/client/MantisMasterClientApi.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java index 6aecc8889..7b9876dc0 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java @@ -715,6 +715,11 @@ public Observable schedulingChanges(final String jobId) { JobIdNotFoundException notFoundException = new JobIdNotFoundException(jobId); retryObject.setErrorRef(notFoundException); return Observable.error(notFoundException); + } else if (HttpResponseStatus.BAD_REQUEST.equals(response.getStatus())) { + logger.error("GET assignmentresults bad request: {}", response.getStatus()); + Exception ex = new Exception(response.getStatus().reasonPhrase()); + retryObject.setErrorRef(ex); + return Observable.error(ex); } else if (!HttpResponseStatus.OK.equals(response.getStatus())) { logger.error("GET assignmentresults failed: {}", response.getStatus()); return Observable.error(new Exception(response.getStatus().reasonPhrase())); From 99ddbe99a3ed344296ee9eefa8b3a93553998c7d Mon Sep 17 00:00:00 2001 From: crioux-stripe <115596126+crioux-stripe@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:35:15 -0700 Subject: [PATCH 15/15] Migrate CI upload-artifact action from v2 to v3 (#712) --- .github/workflows/nebula-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nebula-ci.yml b/.github/workflows/nebula-ci.yml index aa9616e5e..b8889389d 100644 --- a/.github/workflows/nebula-ci.yml +++ b/.github/workflows/nebula-ci.yml @@ -59,7 +59,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Upload - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: Event File path: ${{ github.event_path }}