Skip to content

Commit

Permalink
Merge branch 'master' into timmartin/set-up-follow-redirects-for-asyn…
Browse files Browse the repository at this point in the history
…c-http-client
  • Loading branch information
crioux-stripe authored Sep 12, 2024
2 parents fb499f9 + 99ddbe9 commit 1eca63a
Show file tree
Hide file tree
Showing 57 changed files with 480 additions and 121 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/nebula-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ jobs:
java: [ 8 ]
name: CI with Java ${{ matrix.java }}
steps:
- name: Setup Git
run: |
git config --global user.name "Mantis OSS Maintainers"
git config --global user.email "[email protected]"
- uses: actions/checkout@v1
- name: Setup jdk
uses: actions/setup-java@v1
Expand Down Expand Up @@ -55,7 +59,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}
4 changes: 4 additions & 0 deletions .github/workflows/nebula-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Setup Git
run: |
git config --global user.name "Mantis OSS Maintainers"
git config --global user.email "[email protected]"
- uses: actions/checkout@v2
- name: Setup jdk
uses: actions/setup-java@v2
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/nebula-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ jobs:
environment:
name: Integrate Pull Request # Our protected environment variable
steps:
- name: Setup Git
run: |
git config --global user.name "Mantis OSS Maintainers"
git config --global user.email "[email protected]"
- name: Checkout PR
uses: actions/checkout@v3
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/push-docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ jobs:
packages: write

steps:
- name: Setup Git
run: |
git config --global user.name "Mantis OSS Maintainers"
git config --global user.email "[email protected]"
- name: Checkout PR
uses: actions/checkout@v3
- name: Setup jdk
Expand Down
20 changes: 10 additions & 10 deletions baseline.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

allprojects {
apply plugin: 'com.palantir.baseline-idea'
}
// allprojects {
// apply plugin: 'com.palantir.baseline-idea'
// }

subprojects {
// Currently, if any subproject applies the blanket Baseline plugin, it forces the Baseline plugin
// to be applied to ALL projects. And we are not prepared to address all of the build errors that
// occur as a result at this time.
// subprojects {
// // Currently, if any subproject applies the blanket Baseline plugin, it forces the Baseline plugin
// // to be applied to ALL projects. And we are not prepared to address all of the build errors that
// // occur as a result at this time.

apply plugin: 'com.palantir.baseline-exact-dependencies'
apply plugin: 'com.palantir.baseline-format'
}
// apply plugin: 'com.palantir.baseline-exact-dependencies'
// apply plugin: 'com.palantir.baseline-format'
// }
16 changes: 8 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ buildscript {
maven { url 'https://artifacts-oss.netflix.net/maven-oss-releases' }
}
dependencies {
classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:10.6.0'
classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:11.5.0'
classpath 'com.netflix.nebula:nebula-dependency-recommender:11.+'
classpath 'io.mantisrx:mantis-gradle-plugin:1.2.+'
classpath "io.freefair.gradle:lombok-plugin:5.3.3.3"
classpath 'io.mantisrx:mantis-gradle-plugin:1.2.7'
classpath "io.freefair.gradle:lombok-plugin:6.+"
classpath 'eu.appsatori:gradle-fatjar-plugin:0.3'
classpath("gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0")
classpath("com.github.johnrengelman:shadow:8.1.1")
classpath 'gradle.plugin.org.inferred:gradle-processors:3.3.0'
classpath 'com.palantir.baseline:gradle-baseline-java:4.0.0'
// todo: baseline is disabled due to no working version on java 8
// classpath 'com.palantir.baseline:gradle-baseline-java:4.+'
classpath 'com.bmuschko:gradle-docker-plugin:6.7.0'
classpath "com.palantir.gradle.gitversion:gradle-git-version:3.0.0"
}
Expand Down Expand Up @@ -104,9 +105,8 @@ project.snapshot.configure { finalizedBy printAllReleasedArtifacts }
subprojects {
apply plugin: 'java-library'

// Apply lombok plugin and disabled the default config file generation.
// Apply lombok plugin.
apply plugin: "io.freefair.lombok"
generateLombokConfig.enabled = false
lombok {
version = "1.18.20"
}
Expand Down Expand Up @@ -186,4 +186,4 @@ subprojects {
}
}

apply from: file('baseline.gradle')
// apply from: file('baseline.gradle')
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
org.gradle.parallel=true
org.gradle.caching=false
org.gradle.configureondemand=true
org.gradle.jvmargs=-Xmx1G "-XX:MaxMetaspaceSize=384m"
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public MantisClient(MasterClientWrapper clientWrapper, boolean disablePingFilter
this.clientWrapper = clientWrapper;
}

public MantisClient(HighAvailabilityServices haServices) {
haServices.awaitRunning();
clientWrapper = new MasterClientWrapper(haServices.getMasterClientApi());
this.disablePingFiltering = false;
}

public MantisClient(MasterClientWrapper clientWrapper) {
this(clientWrapper, false);
}
Expand Down
15 changes: 10 additions & 5 deletions mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.master.client.ConditionalRetry;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.NoSuchJobException;
import io.reactivx.mantis.operators.DropOperator;
import java.io.Closeable;
Expand Down Expand Up @@ -179,10 +180,6 @@ public static class Builder {
private Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver = null;
private long dataRecvTimeoutSecs = 5;

public Builder(Properties properties) {
this(new MantisClient(properties));
}

public Builder() {
Properties properties = new Properties();
properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000");
Expand All @@ -191,10 +188,18 @@ public Builder() {
properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString"));
properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root"));
properties.setProperty("mantis.zookeeper.leader.announcement.path",
System.getenv("mantis.zookeeper.leader.announcement.path"));
System.getenv("mantis.zookeeper.leader.announcement.path"));
mantisClient = new MantisClient(properties);
}

public Builder(HighAvailabilityServices haServices) {
this(new MantisClient(haServices));
}

public Builder(Properties properties) {
this(new MantisClient(properties));
}

public Builder(MantisClient mantisClient) {
this.mantisClient = mantisClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

apply plugin: 'mantis'

ext {
gsonVersion = '2.8.+'
}
Expand All @@ -24,6 +22,7 @@ dependencies {
implementation project(":mantis-runtime")
implementation project(":mantis-client")
implementation project(":mantis-control-plane:mantis-control-plane-core")
implementation project(":mantis-control-plane:mantis-control-plane-client")
implementation project(":mantis-publish:mantis-publish-core")

implementation "com.google.code.gson:gson:$gsonVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observer;
Expand Down Expand Up @@ -59,12 +61,17 @@ public class MantisSourceJobConnector {
private static final String ZK_ROOT = "mantis.zookeeper.root";
private static final String ZK_LEADER_PATH = "mantis.zookeeper.leader.announcement.path";

public MantisSourceJobConnector(Properties props) {
this.props = props;
public MantisSourceJobConnector(boolean configureDefaults) {
if (configureDefaults) {
props = defaultProperties();
} else {
props = null;
}
}

public MantisSourceJobConnector() {
props = new Properties();
// todo(kmg-stripe): Can we remove this? It seems it is only used by main in this class for testing.
private static Properties defaultProperties() {
Properties props = new Properties();

final String defaultZkConnect = "127.0.0.1:2181";
final String defaultZkRoot = "/mantis/master";
Expand Down Expand Up @@ -99,6 +106,7 @@ public MantisSourceJobConnector() {
}

LOGGER.info("Mantis Zk settings used for Source Job connector: connectString {} root {} path {}", connectString, zookeeperRoot, zookeeperLeaderAnnouncementPath);
return props;
}

@Deprecated
Expand Down Expand Up @@ -130,7 +138,8 @@ public MantisSSEJob connectToJob(
String jobName,
SinkParameters params,
Observer<SinkConnectionsStatus> sinkObserver) {
return new MantisSSEJob.Builder(props)
MantisSSEJob.Builder builder = props != null ? new MantisSSEJob.Builder(props) : new MantisSSEJob.Builder(HighAvailabilityServicesUtil.get());
return builder
.name(jobName)
.sinkConnectionsStatusObserver(sinkObserver)
.onConnectionReset(throwable -> LOGGER.error("Reconnecting due to error: " + throwable.getMessage()))
Expand Down Expand Up @@ -163,7 +172,7 @@ public static void main(String[] args) {
Args.parse(MantisSourceJobConnector.class, args);

final CountDownLatch latch = new CountDownLatch(20);
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector();
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(true);
MantisSSEJob job = sourceJobConnector.connectToJob("TestSourceJob", params);
Subscription subscription = job.connectAndGetObservable()
.doOnNext(o -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
public class MantisSourceJobConnectorFactory {

public static MantisSourceJobConnector getConnector() {
return new MantisSourceJobConnector();
return new MantisSourceJobConnector(false);
}
}
2 changes: 0 additions & 2 deletions mantis-connectors/mantis-connector-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

apply plugin: 'mantis'

ext {
archaiusVersion = '2.3.+'
spectatorVersion = '0.82.+'
Expand Down
7 changes: 7 additions & 0 deletions mantis-connectors/mantis-connector-publish/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ dependencies {
test {
useJUnitPlatform()
}

tasks.named('compileJava') {
dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar')
}
tasks.named('delombok') {
dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar')
}
7 changes: 7 additions & 0 deletions mantis-control-plane/mantis-control-plane-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ dependencies {
testImplementation libraries.spectatorApi
testImplementation(testFixtures(project(":mantis-common")))
}

tasks.named('compileJava') {
dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar')
}
tasks.named('delombok') {
dependsOn project(':mantis-control-plane:mantis-control-plane-core').tasks.named('jar')
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.mantisrx.server.master.client;

import com.mantisrx.common.utils.Services;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
Expand Down Expand Up @@ -82,6 +83,17 @@ public static HighAvailabilityServices createHAServices(CoreConfiguration config
return HAServiceInstanceRef.get();
}

// This getter is used in situations where the context does not know the core configuration. For example, this
// is used to create a MantisClient when configuring a JobSource, where a job instance does not know how Mantis
// is configured.
// Note that in this context, the agent should have configured HighAvailabilityServices.
public static HighAvailabilityServices get() {
if (HAServiceInstanceRef.get() == null) {
throw new RuntimeException("HighAvailabilityServices have not been initialized");
}
return HAServiceInstanceRef.get();
}

private static class LocalHighAvailabilityServices extends AbstractIdleService implements HighAvailabilityServices {
private final MasterMonitor masterMonitor;
private final CoreConfiguration configuration;
Expand Down Expand Up @@ -131,6 +143,7 @@ private static class HighAvailabilityServicesImpl extends AbstractIdleService im
private final MasterMonitor masterMonitor;
private final Counter resourceLeaderChangeCounter;
private final Counter resourceLeaderAlreadyRegisteredCounter;
private final Counter resourceLeaderIsEmptyCounter;
private final AtomicInteger rmConnections = new AtomicInteger(0);
private final CoreConfiguration configuration;

Expand All @@ -152,9 +165,11 @@ public HighAvailabilityServicesImpl(CoreConfiguration configuration) {
.name(metricsGroup)
.addCounter("resourceLeaderChangeCounter")
.addCounter("resourceLeaderAlreadyRegisteredCounter")
.addCounter("resourceLeaderIsEmptyCounter")
.build());
resourceLeaderChangeCounter = metrics.getCounter("resourceLeaderChangeCounter");
resourceLeaderAlreadyRegisteredCounter = metrics.getCounter("resourceLeaderAlreadyRegisteredCounter");
resourceLeaderIsEmptyCounter = metrics.getCounter("resourceLeaderIsEmptyCounter");

}

Expand Down Expand Up @@ -209,7 +224,12 @@ public void register(ResourceLeaderChangeListener<ResourceClusterGateway> change
.subscribe(nextDescription -> {
log.info("nextDescription={}", nextDescription);

if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
// We do not want to update if the master is set to null. This is usually due to a newly
// initialized master monitor.
if (nextDescription.equals(MasterDescription.MASTER_NULL)) {
resourceLeaderIsEmptyCounter.increment();
return;
} else if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
resourceLeaderAlreadyRegisteredCounter.increment();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
JobIdNotFoundException notFoundException = new JobIdNotFoundException(jobId);
retryObject.setErrorRef(notFoundException);
return Observable.error(notFoundException);
} else if (HttpResponseStatus.BAD_REQUEST.equals(response.getStatus())) {
logger.error("GET assignmentresults bad request: {}", response.getStatus());
Exception ex = new Exception(response.getStatus().reasonPhrase());
retryObject.setErrorRef(ex);
return Observable.error(ex);
} else if (!HttpResponseStatus.OK.equals(response.getStatus())) {
logger.error("GET assignmentresults failed: {}", response.getStatus());
return Observable.error(new Exception(response.getStatus().reasonPhrase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ task copyLibs(type: Copy) {
}

jar.dependsOn copyLibs
compileTestFixturesJava.dependsOn copyLibs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ExecuteStageRequest implements Serializable {
@Nullable
private final String nameOfJobProviderClass;
private final String user;
private final String jobVersion;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -102,7 +103,8 @@ public ExecuteStageRequest(
@JsonProperty("minRuntimeSecs") long minRuntimeSecs,
@JsonProperty("workerPorts") WorkerPorts workerPorts,
@JsonProperty("nameOfJobProviderClass") Optional<String> nameOfJobProviderClass,
@JsonProperty("user") String user) {
@JsonProperty("user") String user,
@JsonProperty("jobVersion") String jobVersion) {
this.jobName = jobName;
this.jobId = jobId;
this.workerIndex = workerIndex;
Expand All @@ -128,6 +130,7 @@ public ExecuteStageRequest(
this.subscriptionTimeoutSecs = subscriptionTimeoutSecs;
this.minRuntimeSecs = minRuntimeSecs;
this.workerPorts = workerPorts;
this.jobVersion = jobVersion;
}

public boolean getHasJobMaster() {
Expand Down
Loading

0 comments on commit 1eca63a

Please sign in to comment.