Skip to content

Commit

Permalink
Fixes to Source Job Connector and Master Monitor Logging
Browse files Browse the repository at this point in the history
1. All source job connectors have ZK config hard-coded due to parameter-less
MantisSourceJobConnectorFactory.  Configuring a MantisClient in this context
doesn't make sense, since we can reference the configured HighAvailabilityServices

2. DynamoDBMasterMonitor was erroneuously logging parse errors when reading
leadership info from the DB.

3. HighAvailabilityServicesImpl was setting the leader state to NULL, which lead
to ResourceClusterGatewayClient constructing an invalid URI.  Updating to not
update the ResourceClusterGatewayClient when master is NULL.
  • Loading branch information
kmg-stripe committed Aug 15, 2024
1 parent a293400 commit 1bda05c
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public MantisClient(MasterClientWrapper clientWrapper, boolean disablePingFilter
this.clientWrapper = clientWrapper;
}

public MantisClient(HighAvailabilityServices haServices) {
haServices.awaitRunning();
clientWrapper = new MasterClientWrapper(haServices.getMasterClientApi());
this.disablePingFiltering = false;
}

public MantisClient(MasterClientWrapper clientWrapper) {
this(clientWrapper, false);
}
Expand Down
15 changes: 10 additions & 5 deletions mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.master.client.ConditionalRetry;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.NoSuchJobException;
import io.reactivx.mantis.operators.DropOperator;
import java.io.Closeable;
Expand Down Expand Up @@ -179,10 +180,6 @@ public static class Builder {
private Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver = null;
private long dataRecvTimeoutSecs = 5;

public Builder(Properties properties) {
this(new MantisClient(properties));
}

public Builder() {
Properties properties = new Properties();
properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000");
Expand All @@ -191,10 +188,18 @@ public Builder() {
properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString"));
properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root"));
properties.setProperty("mantis.zookeeper.leader.announcement.path",
System.getenv("mantis.zookeeper.leader.announcement.path"));
System.getenv("mantis.zookeeper.leader.announcement.path"));
mantisClient = new MantisClient(properties);
}

public Builder(HighAvailabilityServices haServices) {
this(new MantisClient(haServices));
}

public Builder(Properties properties) {
this(new MantisClient(properties));
}

public Builder(MantisClient mantisClient) {
this.mantisClient = mantisClient;
}
Expand Down
1 change: 1 addition & 0 deletions mantis-connectors/mantis-connector-job-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation project(":mantis-runtime")
implementation project(":mantis-client")
implementation project(":mantis-control-plane:mantis-control-plane-core")
implementation project(":mantis-control-plane:mantis-control-plane-client")
implementation project(":mantis-publish:mantis-publish-core")

implementation "com.google.code.gson:gson:$gsonVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observer;
Expand Down Expand Up @@ -59,12 +61,17 @@ public class MantisSourceJobConnector {
private static final String ZK_ROOT = "mantis.zookeeper.root";
private static final String ZK_LEADER_PATH = "mantis.zookeeper.leader.announcement.path";

public MantisSourceJobConnector(Properties props) {
this.props = props;
public MantisSourceJobConnector(boolean configureDefaults) {
if (configureDefaults) {
props = defaultProperties();
} else {
props = null;
}
}

public MantisSourceJobConnector() {
props = new Properties();
// todo(kmg-stripe): Can we remove this? It seems it is only used by main in this class for testing.
private static Properties defaultProperties() {
Properties props = new Properties();

final String defaultZkConnect = "127.0.0.1:2181";
final String defaultZkRoot = "/mantis/master";
Expand Down Expand Up @@ -99,6 +106,7 @@ public MantisSourceJobConnector() {
}

LOGGER.info("Mantis Zk settings used for Source Job connector: connectString {} root {} path {}", connectString, zookeeperRoot, zookeeperLeaderAnnouncementPath);
return props;
}

@Deprecated
Expand Down Expand Up @@ -130,7 +138,8 @@ public MantisSSEJob connectToJob(
String jobName,
SinkParameters params,
Observer<SinkConnectionsStatus> sinkObserver) {
return new MantisSSEJob.Builder(props)
MantisSSEJob.Builder builder = props != null ? new MantisSSEJob.Builder(props) : new MantisSSEJob.Builder(HighAvailabilityServicesUtil.get());
return builder
.name(jobName)
.sinkConnectionsStatusObserver(sinkObserver)
.onConnectionReset(throwable -> LOGGER.error("Reconnecting due to error: " + throwable.getMessage()))
Expand Down Expand Up @@ -163,7 +172,7 @@ public static void main(String[] args) {
Args.parse(MantisSourceJobConnector.class, args);

final CountDownLatch latch = new CountDownLatch(20);
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector();
MantisSourceJobConnector sourceJobConnector = new MantisSourceJobConnector(true);
MantisSSEJob job = sourceJobConnector.connectToJob("TestSourceJob", params);
Subscription subscription = job.connectAndGetObservable()
.doOnNext(o -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
public class MantisSourceJobConnectorFactory {

public static MantisSourceJobConnector getConnector() {
return new MantisSourceJobConnector();
return new MantisSourceJobConnector(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.mantisrx.server.master.client;

import com.mantisrx.common.utils.Services;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
Expand Down Expand Up @@ -82,6 +83,17 @@ public static HighAvailabilityServices createHAServices(CoreConfiguration config
return HAServiceInstanceRef.get();
}

// This getter is used in situations where the context does not know the core configuration. For example, this
// is used to create a MantisClient when configuring a JobSource, where a job instance does not know how Mantis
// is configured.
// Note that in this context, the agent should have configured HighAvailabilityServices.
public static HighAvailabilityServices get() {
if (HAServiceInstanceRef.get() == null) {
throw new RuntimeException("HighAvailabilityServices have not been initialized");
}
return HAServiceInstanceRef.get();
}

private static class LocalHighAvailabilityServices extends AbstractIdleService implements HighAvailabilityServices {
private final MasterMonitor masterMonitor;
private final CoreConfiguration configuration;
Expand Down Expand Up @@ -131,6 +143,7 @@ private static class HighAvailabilityServicesImpl extends AbstractIdleService im
private final MasterMonitor masterMonitor;
private final Counter resourceLeaderChangeCounter;
private final Counter resourceLeaderAlreadyRegisteredCounter;
private final Counter resourceLeaderIsEmptyCounter;
private final AtomicInteger rmConnections = new AtomicInteger(0);
private final CoreConfiguration configuration;

Expand All @@ -152,9 +165,11 @@ public HighAvailabilityServicesImpl(CoreConfiguration configuration) {
.name(metricsGroup)
.addCounter("resourceLeaderChangeCounter")
.addCounter("resourceLeaderAlreadyRegisteredCounter")
.addCounter("resourceLeaderIsEmptyCounter")
.build());
resourceLeaderChangeCounter = metrics.getCounter("resourceLeaderChangeCounter");
resourceLeaderAlreadyRegisteredCounter = metrics.getCounter("resourceLeaderAlreadyRegisteredCounter");
resourceLeaderIsEmptyCounter = metrics.getCounter("resourceLeaderIsEmptyCounter");

}

Expand Down Expand Up @@ -209,7 +224,12 @@ public void register(ResourceLeaderChangeListener<ResourceClusterGateway> change
.subscribe(nextDescription -> {
log.info("nextDescription={}", nextDescription);

if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
// We do not want to update if the master is set to null. This is usually due to a newly
// initialized master monitor.
if (nextDescription.equals(MasterDescription.MASTER_NULL)) {
resourceLeaderIsEmptyCounter.increment();
return;
} else if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
resourceLeaderAlreadyRegisteredCounter.increment();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class MasterDescription {
private final long createTime;
private final int consolePort;

public static final MasterDescription MASTER_NULL =
new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L);

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
public MasterDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
import rx.Observable;
import rx.subjects.BehaviorSubject;


@Slf4j
public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor {

private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);

public static final MasterDescription MASTER_NULL =
new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L);

private final ThreadFactory monitorThreadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
Expand Down Expand Up @@ -92,7 +92,7 @@ public DynamoDBMasterMonitor(
String partitionKey,
Duration pollInterval,
Duration gracefulShutdown) {
masterSubject = BehaviorSubject.create(MASTER_NULL);
masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL);
this.lockClient = lockClient;
this.partitionKey = partitionKey;
this.pollInterval = pollInterval;
Expand Down Expand Up @@ -147,8 +147,6 @@ private void getCurrentLeader() {
if (optionalLock.isPresent()) {
final LockItem lock = optionalLock.get();
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
logger.warn("failed to decode leader bytes");
this.lockDecodeFailedCounter.increment();
} else {
nextDescription = null;
logger.warn("no leader found");
Expand All @@ -163,8 +161,8 @@ private void getCurrentLeader() {
}

private void updateLeader(@Nullable MasterDescription nextDescription) {
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL);
final MasterDescription next = (nextDescription == null) ? MASTER_NULL : nextDescription;
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
final MasterDescription next = (nextDescription == null) ? MasterDescription.MASTER_NULL : nextDescription;
if (!prev.equals(next)) {
logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname());
masterSubject.onNext(next);
Expand All @@ -183,8 +181,9 @@ private MasterDescription bytesToMaster(ByteBuffer data) {
return jsonMapper.readValue(bytes, MasterDescription.class);
} catch (IOException e) {
logger.error("unable to parse master description bytes: {}", data, e);
this.lockDecodeFailedCounter.increment();
}
return MASTER_NULL;
return MasterDescription.MASTER_NULL;
}

@Override
Expand All @@ -201,6 +200,6 @@ public Observable<MasterDescription> getMasterObservable() {
@Override
@Nullable
public MasterDescription getLatestMaster() {
return Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL);
return Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void highAvailabilityServices() throws InterruptedException, IOException

// We can, depending on timing, sometimes get a MASTER_NULL value which is safe to ignore.
MasterDescription[] actualLeaders = testSubscriber.getOnNextEvents().stream()
.filter(md -> md != DynamoDBMasterMonitor.MASTER_NULL)
.filter(md -> md != MasterDescription.MASTER_NULL)
.collect(Collectors.toList())
.toArray(new MasterDescription[]{});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.mantisrx.extensions.dynamodb;

import static io.mantisrx.extensions.dynamodb.DynamoDBMasterMonitor.MASTER_NULL;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.*;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
TestSubscriber<MasterDescription> testSubscriber = new TestSubscriber<>();
m.getMasterObservable().subscribe(testSubscriber);
m.start();
assertEquals(MASTER_NULL, m.getLatestMaster());
assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster());
lockSupport.takeLock(lockKey, otherMaster);
await()
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
Expand All @@ -93,7 +92,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(m.getLatestMaster(), thatMaster));
testSubscriber.assertValues(MASTER_NULL, otherMaster, thatMaster);
testSubscriber.assertValues(MasterDescription.MASTER_NULL, otherMaster, thatMaster);
m.shutdown();
}

Expand Down Expand Up @@ -134,7 +133,7 @@ public void monitorDoesNotReturnNull() throws IOException, InterruptedException
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(MASTER_NULL, m.getLatestMaster()));
.untilAsserted(() -> assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster()));
lockSupport.releaseLock(lockKey);

m.shutdown();
Expand Down

0 comments on commit 1bda05c

Please sign in to comment.