Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding DynamoDB high availability features #675

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
209e2c1
refactor project
mcowgill-stripe May 15, 2024
23ae2da
preparing to master monitor
mcowgill-stripe May 17, 2024
da0337f
adding monitor tests
mcowgill-stripe May 17, 2024
8316eb7
initial passing test for leader elector
mcowgill-stripe May 17, 2024
fce8c04
initial high availability services for dynamo db
mcowgill-stripe May 22, 2024
c3e2453
changes after gradle format
mcowgill-stripe May 22, 2024
8b05cb4
refactoring design for singleton use case and test
mcowgill-stripe May 22, 2024
7380cb2
resolve race condition on global vars for tests
mcowgill-stripe May 22, 2024
6ba4453
comment change
mcowgill-stripe May 23, 2024
deb8636
reorder public/private functions
mcowgill-stripe May 23, 2024
b24b867
linting fix
mcowgill-stripe May 24, 2024
82e0b0b
revert pointless change in dynamostoretest
mcowgill-stripe May 24, 2024
c3ca083
FileBasedStore must inherit from the deprecated KeyValueStore interfa…
mcowgill-stripe May 24, 2024
5389d89
renaming core.KeyValueStore to core.IKeyValueStore
mcowgill-stripe May 24, 2024
30909b6
this is a marker commit to refactor leader elector and monitors into …
mcowgill-stripe May 28, 2024
d357c45
failing container test likely issues with serialized hack on WorkerCo…
mcowgill-stripe May 29, 2024
185156e
props cannot be final since it is an arg
mcowgill-stripe May 29, 2024
8ed304e
working test point
mcowgill-stripe May 29, 2024
bb56961
refactoring for injected leadership services
mcowgill-stripe Jun 4, 2024
e9b5549
dynamodb leadership factory
mcowgill-stripe Jun 4, 2024
fbad7ba
final mods
mcowgill-stripe Jun 4, 2024
d2437ca
add test for dynamodb factory class
mcowgill-stripe Jun 5, 2024
d365098
removing class based config injection
mcowgill-stripe Jun 11, 2024
8744c4e
Merge branch 'master' into mcowgill-add-dynamo-leader
crioux-stripe Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,5 @@ bin/
build
site

# ignore asdf files for jdk manager
.tool-versions
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.ILeaderMonitorFactory;
import io.mantisrx.server.core.master.LocalLeaderFactory;
import io.mantisrx.server.core.master.LocalMasterMonitor;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.core.zookeeper.CuratorService;
import io.mantisrx.server.core.master.ZookeeperLeaderMonitorFactory;
import io.mantisrx.server.core.master.ZookeeperMasterMonitor;
import io.mantisrx.server.core.utils.ConfigUtils;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGatewayClient;
Expand Down Expand Up @@ -71,7 +75,7 @@ public static HighAvailabilityServices createHAServices(CoreConfiguration config
}
else {
if (HAServiceInstanceRef.get() == null) {
HAServiceInstanceRef.compareAndSet(null, new ZkHighAvailabilityServices(configuration));
HAServiceInstanceRef.compareAndSet(null, new HighAvailabilityServicesImpl(configuration));
}
}

Expand Down Expand Up @@ -121,99 +125,104 @@ protected void startUp() throws Exception {
protected void shutDown() throws Exception {
}
}
private static class HighAvailabilityServicesImpl extends AbstractIdleService implements
HighAvailabilityServices {

private final MasterMonitor masterMonitor;
private final Counter resourceLeaderChangeCounter;
private final Counter resourceLeaderAlreadyRegisteredCounter;
private final AtomicInteger rmConnections = new AtomicInteger(0);
private final CoreConfiguration configuration;

public HighAvailabilityServicesImpl(CoreConfiguration configuration) {
this.configuration = configuration;
final ILeaderMonitorFactory factory = ConfigUtils.createInstance(configuration.getLeaderMonitorFactory(), ILeaderMonitorFactory.class);
if(factory instanceof LocalLeaderFactory) {
log.warn("using default non-local Zookeeper leader monitoring you should set: "+
"mantis.leader.monitor.factory=io.mantisrx.server.core.master.ZookeeperLeaderMonitorFactory");
masterMonitor = new ZookeeperLeaderMonitorFactory().createLeaderMonitor(configuration);
} else {
masterMonitor = factory.createLeaderMonitor(configuration);
}
// this for backward compatibility, but should be modified to "HighAvailabilityServices" in the future
final String metricsGroup = masterMonitor instanceof ZookeeperMasterMonitor ? "ZkHighAvailabilityServices" :
"HighAvailabilityServices";

final Metrics metrics = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder()
.name(metricsGroup)
.addCounter("resourceLeaderChangeCounter")
.addCounter("resourceLeaderAlreadyRegisteredCounter")
.build());
resourceLeaderChangeCounter = metrics.getCounter("resourceLeaderChangeCounter");
resourceLeaderAlreadyRegisteredCounter = metrics.getCounter("resourceLeaderAlreadyRegisteredCounter");

/**
* Zookeeper based implementation of HighAvailabilityServices that finds the various leader instances
* through metadata stored on zookeeper.
*/
private static class ZkHighAvailabilityServices extends AbstractIdleService implements
HighAvailabilityServices {

private final CuratorService curatorService;
private final Counter resourceLeaderChangeCounter;
private final Counter resourceLeaderAlreadyRegisteredCounter;
private final AtomicInteger rmConnections = new AtomicInteger(0);
private final CoreConfiguration configuration;

public ZkHighAvailabilityServices(CoreConfiguration configuration) {
curatorService = new CuratorService(configuration);
final Metrics metrics = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder()
.name("ZkHighAvailabilityServices")
.addCounter("resourceLeaderChangeCounter")
.addCounter("resourceLeaderAlreadyRegisteredCounter")
.build());
resourceLeaderChangeCounter = metrics.getCounter("resourceLeaderChangeCounter");
resourceLeaderAlreadyRegisteredCounter = metrics.getCounter("resourceLeaderAlreadyRegisteredCounter");
this.configuration = configuration;
}

@Override
protected void startUp() throws Exception {
curatorService.start();
}

@Override
protected void shutDown() throws Exception {
curatorService.shutdown();
}

@Override
public MantisMasterGateway getMasterClientApi() {
return new MantisMasterClientApi(curatorService.getMasterMonitor());
}

@Override
public MasterMonitor getMasterMonitor() {
return curatorService.getMasterMonitor();
}

@Override
public ResourceLeaderConnection<ResourceClusterGateway> connectWithResourceManager(
ClusterID clusterID) {
return new ResourceLeaderConnection<ResourceClusterGateway>() {
final MasterMonitor masterMonitor = curatorService.getMasterMonitor();
}

ResourceClusterGateway currentResourceClusterGateway =
new ResourceClusterGatewayClient(clusterID, masterMonitor.getLatestMaster(), configuration);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the file is a confusing unified diff, but this is moved not deleted. If you find changes here please flag them as the intent was no logic code change, just moving.

@Override
protected void startUp() throws Exception {
masterMonitor.start();
}

final String nameFormat =
"ResourceClusterGatewayCxn (" + rmConnections.getAndIncrement() + ")-%d";
final Scheduler scheduler =
Schedulers
.from(
Executors
.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(nameFormat).build()));
@Override
protected void shutDown() throws Exception {
masterMonitor.shutdown();
}

final List<Subscription> subscriptions = new ArrayList<>();
@Override
public MantisMasterGateway getMasterClientApi() {
return new MantisMasterClientApi(masterMonitor);
}

@Override
public ResourceClusterGateway getCurrent() {
return currentResourceClusterGateway;
public MasterMonitor getMasterMonitor() {
return masterMonitor;
}

@Override
public void register(ResourceLeaderChangeListener<ResourceClusterGateway> changeListener) {
Subscription subscription = masterMonitor
.getMasterObservable()
.observeOn(scheduler)
.subscribe(nextDescription -> {
log.info("nextDescription={}", nextDescription);

if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
resourceLeaderAlreadyRegisteredCounter.increment();
return;
public ResourceLeaderConnection<ResourceClusterGateway> connectWithResourceManager(
ClusterID clusterID) {
return new ResourceLeaderConnection<ResourceClusterGateway>() {
ResourceClusterGateway currentResourceClusterGateway =
new ResourceClusterGatewayClient(clusterID, masterMonitor.getLatestMaster(), configuration);

final String nameFormat =
"ResourceClusterGatewayCxn (" + rmConnections.getAndIncrement() + ")-%d";
final Scheduler scheduler =
Schedulers
.from(
Executors
.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(nameFormat).build()));

final List<Subscription> subscriptions = new ArrayList<>();

@Override
public ResourceClusterGateway getCurrent() {
return currentResourceClusterGateway;
}
ResourceClusterGateway previous = currentResourceClusterGateway;
currentResourceClusterGateway = new ResourceClusterGatewayClient(clusterID, nextDescription, configuration);

resourceLeaderChangeCounter.increment();
changeListener.onResourceLeaderChanged(previous, currentResourceClusterGateway);
});

subscriptions.add(subscription);
@Override
public void register(ResourceLeaderChangeListener<ResourceClusterGateway> changeListener) {
Subscription subscription = masterMonitor
.getMasterObservable()
.observeOn(scheduler)
.subscribe(nextDescription -> {
log.info("nextDescription={}", nextDescription);

if (nextDescription.equals(((ResourceClusterGatewayClient)currentResourceClusterGateway).getMasterDescription())) {
resourceLeaderAlreadyRegisteredCounter.increment();
return;
}
ResourceClusterGateway previous = currentResourceClusterGateway;
currentResourceClusterGateway = new ResourceClusterGatewayClient(clusterID, nextDescription, configuration);

resourceLeaderChangeCounter.increment();
changeListener.onResourceLeaderChanged(previous, currentResourceClusterGateway);
});

subscriptions.add(subscription);
}
};
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.core.zookeeper.CuratorService;
import io.mantisrx.server.core.master.ZookeeperLeaderMonitorFactory;
import io.mantisrx.server.master.client.config.StaticPropertiesConfigurationFactory;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -56,8 +56,7 @@ public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(5);
StaticPropertiesConfigurationFactory configurationFactory = new StaticPropertiesConfigurationFactory(properties);
CoreConfiguration config = configurationFactory.getConfig();
final CuratorService curatorService = new CuratorService(config);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change that might be controversial. The CuratorService and really the CuratorFramework from IMO is really Zookeeper. In order to make the abstraction around Zookeeper I chose to reduce the exposure and knowledge of the curator to inside the ZK leadership functions.

MasterMonitor masterMonitor = curatorService.getMasterMonitor();
MasterMonitor masterMonitor = new ZookeeperLeaderMonitorFactory().createLeaderMonitor(config);
masterMonitor.getMasterObservable()
.filter(new Func1<MasterDescription, Boolean>() {
@Override
Expand All @@ -73,7 +72,7 @@ public void call(MasterDescription masterDescription) {
}
})
.subscribe();
curatorService.start();
masterMonitor.start();
try {
latch.await();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class MasterClientWrapperTest {
zkProps.put("mantis.zookeeper.leader.announcement.path", "/leader");
zkProps.put("mantis.zookeeper.root", "/mantis/master");
zkProps.put("mantis.localmode", "false");
zkProps.put("mantis.leader.monitor.factory","io.mantisrx.server.core.master.ZookeeperLeaderMonitorFactory");
zkProps.put("mantis.leader.elector.factory", "io.mantisrx.master.zk.ZookeeperLeadershipFactory");
}

MasterClientWrapper clientWrapper = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
api libraries.flinkRpcApi
api libraries.flinkCore
implementation libraries.commonsIo
implementation libraries.vavr
compileOnly libraries.jsr305
compileOnly libraries.flinkRpcImpl // Provided by copyLibs task

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ public interface CoreConfiguration {
@Config("mantis.asyncHttpClient.readTimeoutMs")
@Default("10000")
int getAsyncHttpClientReadTimeoutMs();

@Config("mantis.leader.monitor.factory")
@Default("io.mantisrx.server.core.master.LocalLeaderFactory")
String getLeaderMonitorFactory();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getLeaderMonitorFactoryName

}
Loading
Loading