Skip to content

Commit

Permalink
Merge branch 'main' into build-cluster-by-info
Browse files Browse the repository at this point in the history
  • Loading branch information
chaohengstudent committed Mar 29, 2023
2 parents 6f67000 + cfe2bce commit 5831866
Show file tree
Hide file tree
Showing 30 changed files with 432 additions and 1,928 deletions.
76 changes: 14 additions & 62 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.astraea.app.web;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
Expand All @@ -37,7 +36,6 @@
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;
Expand All @@ -51,7 +49,6 @@
import org.astraea.common.cost.HasMoveCost;
import org.astraea.common.json.TypeRef;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricCollector;
import org.astraea.common.metrics.collector.MetricSensor;
import org.astraea.common.metrics.collector.MetricsStore;

Expand All @@ -63,33 +60,30 @@ class BalancerHandler implements Handler, AutoCloseable {
private final Map<String, CompletionStage<Balancer.Plan>> planGenerations =
new ConcurrentHashMap<>();
private final Map<String, CompletionStage<Void>> planExecutions = new ConcurrentHashMap<>();
private final Function<Integer, Optional<Integer>> jmxPortMapper;

private final Collection<MetricSensor> sensors = new ConcurrentLinkedQueue<>();

private final MetricsStore metricsStore;

BalancerHandler(Admin admin) {
this(admin, (ignore) -> Optional.empty());
}

BalancerHandler(Admin admin, Function<Integer, Optional<Integer>> jmxPortMapper) {
BalancerHandler(Admin admin, Function<Integer, Integer> jmxPortMapper) {
this.admin = admin;
this.jmxPortMapper = jmxPortMapper;
this.balancerConsole = BalancerConsole.create(admin);
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
() ->
admin
.brokers()
.thenApply(
brokers ->
brokers.stream()
.collect(
Collectors.toUnmodifiableMap(
NodeInfo::id,
b -> MBeanClient.jndi(b.host(), jmxPortMapper.apply(b.id())))));
this.metricsStore =
MetricsStore.builder()
.beanExpiration(Duration.ofSeconds(1))
.localReceiver(
() ->
freshJmxAddresses().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
e ->
MBeanClient.jndi(
e.getValue().getHostName(), e.getValue().getPort()))))
.sensorSupplier(
.localReceiver(clientSupplier)
.sensorsSupplier(
() ->
sensors.stream()
.collect(
Expand Down Expand Up @@ -287,48 +281,6 @@ private static List<MigrationCost> migrationCosts(Balancer.Plan solution) {
.collect(Collectors.toList());
}

private Balancer.Plan metricContext(
Collection<MetricSensor> metricSensors,
Function<Supplier<ClusterBean>, Balancer.Plan> execution) {
// TODO: use a global metric collector when we are ready to enable long-run metric sampling
// https://github.com/skiptests/astraea/pull/955#discussion_r1026491162
try (var collector =
MetricCollector.local()
.registerJmxs(freshJmxAddresses())
.addMetricSensors(metricSensors)
.interval(Duration.ofSeconds(1))
.build()) {
return execution.apply(collector::clusterBean);
}
}

// visible for test
Map<Integer, InetSocketAddress> freshJmxAddresses() {
var brokers = admin.brokers().toCompletableFuture().join();
var jmxAddresses =
brokers.stream()
.flatMap(
broker -> jmxPortMapper.apply(broker.id()).map(p -> Map.entry(broker, p)).stream())
.collect(
Collectors.toUnmodifiableMap(
e -> e.getKey().id(),
e -> InetSocketAddress.createUnresolved(e.getKey().host(), e.getValue())));

// JMX is disabled
if (jmxAddresses.size() == 0) return Map.of();

// JMX is partially enabled, forbidden this use case since it is probably a bad idea
if (brokers.size() != jmxAddresses.size())
throw new IllegalArgumentException(
"Some brokers has no JMX port specified in the web service argument: "
+ brokers.stream()
.map(NodeInfo::id)
.filter(id -> !jmxAddresses.containsKey(id))
.collect(Collectors.toUnmodifiableSet()));

return jmxAddresses;
}

// visible for test
static PostRequestWrapper parsePostRequestWrapper(
BalancerPostRequest balancerPostRequest, ClusterInfo currentClusterInfo) {
Expand Down
29 changes: 8 additions & 21 deletions app/src/main/java/org/astraea/app/web/BeanHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.astraea.app.web;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -29,9 +27,9 @@

public class BeanHandler implements Handler {
private final Admin admin;
private final Function<Integer, Optional<Integer>> jmxPorts;
private final Function<Integer, Integer> jmxPorts;

BeanHandler(Admin admin, Function<Integer, Optional<Integer>> jmxPorts) {
BeanHandler(Admin admin, Function<Integer, Integer> jmxPorts) {
this.admin = admin;
this.jmxPorts = jmxPorts;
}
Expand All @@ -43,28 +41,17 @@ public CompletionStage<Response> get(Channel channel) {
.brokers()
.thenApply(
brokers ->
brokers.stream()
.flatMap(
b ->
jmxPorts
.apply(b.id())
.map(port -> Map.entry(b.host(), MBeanClient.jndi(b.host(), port)))
.stream())
.collect(Collectors.toUnmodifiableList()))
.thenApply(
clients ->
new NodeBeans(
clients.stream()
brokers.stream()
.map(
entry -> {
try {
b -> {
try (var client =
MBeanClient.jndi(b.host(), jmxPorts.apply(b.id()))) {
return new NodeBean(
entry.getKey(),
entry.getValue().beans(builder.build()).stream()
b.host(),
client.beans(builder.build()).stream()
.map(Bean::new)
.collect(Collectors.toUnmodifiableList()));
} finally {
entry.getValue().close();
}
})
.collect(Collectors.toUnmodifiableList())));
Expand Down
15 changes: 7 additions & 8 deletions app/src/main/java/org/astraea/app/web/WebService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class WebService implements AutoCloseable {
private final HttpServer server;
private final Admin admin;

public WebService(Admin admin, int port, Function<Integer, Optional<Integer>> brokerIdToJmxPort) {
public WebService(Admin admin, int port, Function<Integer, Integer> brokerIdToJmxPort) {
this.admin = admin;
server = Utils.packException(() -> HttpServer.create(new InetSocketAddress(port), 0));
server.createContext("/topics", to(new TopicHandler(admin)));
Expand Down Expand Up @@ -65,13 +65,12 @@ public void close() {

public static void main(String[] args) throws Exception {
var arg = org.astraea.app.argument.Argument.parse(new Argument(), args);
Function<Integer, Optional<Integer>> brokerIdToPort =
id -> {
var r = Optional.ofNullable(arg.jmxPorts.get(String.valueOf(id))).map(Integer::parseInt);
if (r.isPresent()) return r;
if (arg.jmxPort > 0) return Optional.of(arg.jmxPort);
return Optional.empty();
};
Function<Integer, Integer> brokerIdToPort =
id ->
Optional.ofNullable(arg.jmxPorts.get(String.valueOf(id)))
.map(Integer::parseInt)
.orElseThrow(
() -> new IllegalArgumentException("failed to get jmx port for broker: " + id));
try (var service = new WebService(Admin.of(arg.configs()), arg.port, brokerIdToPort)) {
if (arg.ttl == null) {
System.out.println("enter ctrl + c to terminate web service");
Expand Down
54 changes: 18 additions & 36 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static void closeService() {
void testReport() {
var topics = createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
// make sure all replicas have
admin
.clusterInfo(Set.copyOf(topics))
Expand Down Expand Up @@ -160,7 +160,7 @@ void testReport() {
void testTopics() {
var topicNames = createAndProduceTopic(5);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
// For all 5 topics, we only allow the first two topics can be altered.
// We apply this limitation to test if the BalancerHandler.TOPICS_KEY works correctly.
var allowedTopics = List.copyOf(topicNames).subList(0, 2);
Expand Down Expand Up @@ -343,7 +343,7 @@ void testBestPlan() {
void testMoveCost(String leaderLimit, String sizeLimit) {
createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var request = new BalancerHandler.BalancerPostRequest();
request.moveCosts =
Set.of(
Expand Down Expand Up @@ -387,7 +387,7 @@ void testMoveCost(String leaderLimit, String sizeLimit) {
void testNoReport() {
var topic = Utils.randomString(10);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
admin.creator().topic(topic).numberOfPartitions(1).run().toCompletableFuture().join();
Utils.sleep(Duration.ofSeconds(1));
var post =
Expand Down Expand Up @@ -435,7 +435,7 @@ void testPut() {
// arrange
createAndProduceTopic(3, 10, (short) 2, false);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var request = new BalancerHandler.BalancerPostRequest();
request.balancerConfig = Map.of("iteration", "100");
var progress = submitPlanGeneration(handler, request);
Expand Down Expand Up @@ -464,7 +464,7 @@ void testPut() {
void testBadPut() {
createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {

// no id offered
Assertions.assertThrows(
Expand All @@ -485,7 +485,7 @@ void testBadPut() {
void testSubmitRebalancePlanThreadSafe() {
var topic = Utils.randomString();
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
admin.creator().topic(topic).numberOfPartitions(30).run().toCompletableFuture().join();
Utils.sleep(Duration.ofSeconds(3));
admin
Expand Down Expand Up @@ -533,7 +533,7 @@ void testSubmitRebalancePlanThreadSafe() {
@Timeout(value = 60)
void testRebalanceDetectOngoing() {
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var theTopic = Utils.randomString();
admin.creator().topic(theTopic).numberOfPartitions(1).run().toCompletableFuture().join();
try (var producer = Producer.of(SERVICE.bootstrapServers())) {
Expand Down Expand Up @@ -617,7 +617,7 @@ void testGenerationDetectOngoing() {
.thenAnswer((invoke) -> CompletableFuture.completedFuture(List.of()));
Mockito.when(admin.topicNames(Mockito.anyBoolean()))
.thenAnswer((invoke) -> CompletableFuture.completedFuture(Set.of("A", "B", "C")));
try (var handler = new BalancerHandler(admin)) {
try (var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
Mockito.when(admin.clusterInfo(Mockito.any()))
.thenAnswer((invoke) -> CompletableFuture.completedFuture(clusterHasFuture));
var task0 =
Expand Down Expand Up @@ -664,7 +664,7 @@ void testGenerationDetectOngoing() {
void testPutSanityCheck() {
var topic = createAndProduceTopic(1).iterator().next();
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var request = new BalancerHandler.BalancerPostRequest();
request.topics = Set.of(topic);
var theProgress = submitPlanGeneration(handler, request);
Expand Down Expand Up @@ -703,7 +703,7 @@ void testPutSanityCheck() {
void testLookupRebalanceProgress() {
createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var progress = submitPlanGeneration(handler, new BalancerPostRequest());
Assertions.assertEquals(Searched, progress.phase);

Expand Down Expand Up @@ -757,7 +757,7 @@ void testLookupRebalanceProgress() {
void testLookupBadExecutionProgress() {
createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var post =
Assertions.assertInstanceOf(
BalancerHandler.PostPlanResponse.class,
Expand Down Expand Up @@ -811,7 +811,7 @@ void testLookupBadExecutionProgress() {
void testBadLookupRequest() {
createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
Assertions.assertEquals(
404, handler.get(Channel.ofTarget("no such plan")).toCompletableFuture().join().code());

Expand All @@ -828,7 +828,7 @@ void testBadLookupRequest() {
void testPutIdempotent() {
var topics = createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var request = new BalancerHandler.BalancerPostRequest();
request.topics = topics;
var progress = submitPlanGeneration(handler, request);
Expand Down Expand Up @@ -861,7 +861,7 @@ void testPutIdempotent() {
void testCustomBalancer() {
var topics = createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var balancer = SpyBalancer.class.getName();
var balancerConfig =
Map.of(
Expand Down Expand Up @@ -982,9 +982,7 @@ void testTimeout() {
createAndProduceTopic(5);
var costFunction = Collections.singleton(costWeight(TimeoutCost.class.getName(), 1));
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler =
new BalancerHandler(
admin, (ignore) -> Optional.of(SERVICE.jmxServiceURL().getPort()))) {
var handler = new BalancerHandler(admin, (ignore) -> SERVICE.jmxServiceURL().getPort())) {
var channel = httpRequest(Map.of(TIMEOUT_KEY, "10", CLUSTER_COSTS_KEY, costFunction));
var post =
(BalancerHandler.PostPlanResponse) handler.post(channel).toCompletableFuture().join();
Expand All @@ -1003,9 +1001,7 @@ void testTimeout() {
void testCostWithSensor() {
var topics = createAndProduceTopic(3);
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler =
new BalancerHandler(
admin, (ignore) -> Optional.of(SERVICE.jmxServiceURL().getPort())); ) {
var handler = new BalancerHandler(admin, (ignore) -> SERVICE.jmxServiceURL().getPort())) {
var invoked = new AtomicBoolean();
SensorAndCost.callback.set(
(clusterBean) -> {
Expand Down Expand Up @@ -1159,25 +1155,11 @@ void testChangeOrder() {
NoSuchElementException.class, () -> BalancerHandler.Change.from(Set.of(), Set.of()));
}

@Test
void testFreshJmxAddress() {
try (var admin = Admin.of(SERVICE.bootstrapServers());
var noJmx = new BalancerHandler(admin, (id) -> Optional.empty());
var withJmx = new BalancerHandler(admin, (id) -> Optional.of(5566));
var partialJmx =
new BalancerHandler(admin, (id) -> Optional.ofNullable(id != 0 ? 1000 : null))) {

Assertions.assertEquals(0, noJmx.freshJmxAddresses().size());
Assertions.assertEquals(3, withJmx.freshJmxAddresses().size());
Assertions.assertThrows(IllegalArgumentException.class, partialJmx::freshJmxAddresses);
}
}

@Test
void testExecutorConfig() {
var topic = createAndProduceTopic(1).iterator().next();
try (var admin = Admin.of(SERVICE.bootstrapServers());
var handler = new BalancerHandler(admin)) {
var handler = new BalancerHandler(admin, id -> SERVICE.jmxServiceURL().getPort())) {
var request = new BalancerHandler.BalancerPostRequest();
request.topics = Set.of(topic);
var theProgress = submitPlanGeneration(handler, request);
Expand Down
Loading

0 comments on commit 5831866

Please sign in to comment.