diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index a220bc53a55d..477a8f02f0da 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -45,7 +45,6 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -56,6 +55,7 @@ import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -157,11 +157,11 @@ public Map>> getRoutingTable( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map, List>>> getRoutingTableWithOptionalSegments( + public Map> getRoutingTableWithOptionalSegments( @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); - Map, List>>> result = new TreeMap<>(); + Map> result = new TreeMap<>(); getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, routingTable.getServerInstanceToSegmentsMap())); if (!result.isEmpty()) { @@ -192,9 +192,9 @@ private void getRoutingTable(String tableName, BiConsumer } private static Map> removeOptionalSegments( - Map, List>> serverInstanceToSegmentsMap) { + Map serverInstanceToSegmentsMap) { Map> ret = new HashMap<>(); - serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getLeft())); + serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getSegments())); return ret; } @@ -231,7 +231,7 @@ public Map> getRoutingTableForQuery( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map, List>> getRoutingTableForQueryWithOptionalSegments( + public Map getRoutingTableForQueryWithOptionalSegments( @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, @Context HttpHeaders httpHeaders) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 7f41132a5a3b..21e5dc72e816 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -85,6 +85,7 @@ import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.GapfillUtils; @@ -617,8 +618,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // Calculate routing table for the query // TODO: Modify RoutingManager interface to directly take PinotQuery long routingStartTimeNs = System.nanoTime(); - Map, List>> offlineRoutingTable = null; - Map, List>> realtimeRoutingTable = null; + Map offlineRoutingTable = null; + Map realtimeRoutingTable = null; List unavailableSegments = new ArrayList<>(); int numPrunedSegmentsTotal = 0; boolean offlineTableDisabled = false; @@ -633,7 +634,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map, List>> serverInstanceToSegmentsMap = + Map serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { offlineRoutingTable = serverInstanceToSegmentsMap; @@ -654,7 +655,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map, List>> serverInstanceToSegmentsMap = + Map serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { realtimeRoutingTable = serverInstanceToSegmentsMap; @@ -1874,9 +1875,9 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception; @@ -1906,8 +1907,8 @@ private static class QueryServers { final String _query; final Set _servers = new HashSet<>(); - QueryServers(String query, @Nullable Map, List>> offlineRoutingTable, - @Nullable Map, List>> realtimeRoutingTable) { + QueryServers(String query, @Nullable Map offlineRoutingTable, + @Nullable Map realtimeRoutingTable) { _query = query; if (offlineRoutingTable != null) { _servers.addAll(offlineRoutingTable.keySet()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 0484476a410b..d6c2f3aaccc4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -38,6 +37,7 @@ import org.apache.pinot.common.utils.grpc.GrpcQueryClient; import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; @@ -76,9 +76,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { // TODO: Support failure detection @@ -106,12 +106,12 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques * Query pinot server for data table. */ private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest, - Map, List>> routingTable, + Map routingTable, Map> responseMap, boolean trace) { - for (Map.Entry, List>> routingEntry : routingTable.entrySet()) { + for (Map.Entry routingEntry : routingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); // TODO: support optional segments for GrpcQueryServer. - List segments = routingEntry.getValue().getLeft(); + List segments = routingEntry.getValue().getSegments(); String serverHost = serverInstance.getHostname(); int port = serverInstance.getGrpcPort(); // TODO: enable throttling on per host bases. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 7ad9268b0d04..6e8ecff9a5a5 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.failuredetector.FailureDetector; import org.apache.pinot.broker.failuredetector.FailureDetectorFactory; @@ -43,6 +42,7 @@ import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.AsyncQueryResponse; import org.apache.pinot.core.transport.QueryResponse; import org.apache.pinot.core.transport.QueryRouter; @@ -99,9 +99,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index b6f82e070521..e324f0ece3c4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -61,6 +61,7 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -635,15 +636,15 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments()); } - private Map, List>> getServerInstanceToSegmentsMap(String tableNameWithType, + private Map getServerInstanceToSegmentsMap(String tableNameWithType, InstanceSelector.SelectionResult selectionResult) { - Map, List>> merged = new HashMap<>(); + Map merged = new HashMap<>(); for (Map.Entry entry : selectionResult.getSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - Pair, List> pair = - merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>())); - pair.getLeft().add(entry.getKey()); + ServerRouteInfo serverRouteInfoInfo = + merged.computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), new ArrayList<>())); + serverRouteInfoInfo.getSegments().add(entry.getKey()); } else { // Should not happen in normal case unless encountered unexpected exception when updating routing entries _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L); @@ -652,12 +653,12 @@ private Map, List>> getServerInstanceT for (Map.Entry entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - Pair, List> pair = merged.get(serverInstance); + ServerRouteInfo serverRouteInfo = merged.get(serverInstance); // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments // to process, to be backward compatible. // TODO: allow servers only with optional segments - if (pair != null) { - pair.getRight().add(entry.getKey()); + if (serverRouteInfo != null) { + serverRouteInfo.getOptionalSegments().add(entry.getKey()); } } // TODO: Report missing server metrics when we allow servers only with optional segments. diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index a24b8ac79e9c..eb0d77a17b0d 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -172,7 +172,7 @@ public void testResourceAndTagAssignment() RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0); assertNotNull(routingTable); assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS); - assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getLeft().size(), + assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getSegments().size(), NUM_OFFLINE_SEGMENTS); assertTrue(routingTable.getUnavailableSegments().isEmpty()); @@ -182,7 +182,7 @@ public void testResourceAndTagAssignment() TestUtils.waitForCondition(aVoid -> routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next() - .getLeft().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, + .getSegments().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment " + "into the routing table"); // Add a new table with different broker tenant diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index df4b1b6bf8ca..3952b414f07b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import javax.annotation.Nullable; -import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; @@ -37,6 +36,7 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TenantConfig; @@ -167,8 +167,8 @@ public void testCancelQuery() { when(routingManager.routingExists(tableName)).thenReturn(true); when(routingManager.getQueryTimeoutMs(tableName)).thenReturn(10000L); RoutingTable rt = mock(RoutingTable.class); - when(rt.getServerInstanceToSegmentsMap()).thenReturn( - Map.of(new ServerInstance(new InstanceConfig("server01_9000")), Pair.of(List.of("segment01"), List.of()))); + when(rt.getServerInstanceToSegmentsMap()).thenReturn(Map.of(new ServerInstance(new InstanceConfig("server01_9000")), + new ServerRouteInfo(List.of("segment01"), List.of()))); when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt); QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); when(queryQuotaManager.acquire(anyString())).thenReturn(true); @@ -194,9 +194,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { testRequestId[0] = requestId; diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index 753b2c379494..603f25fc0565 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -18,13 +18,13 @@ */ package org.apache.pinot.connector.spark.common.reader -import org.apache.commons.lang3.tuple.Pair import org.apache.helix.model.InstanceConfig import org.apache.pinot.common.datatable.DataTable import org.apache.pinot.common.metrics.BrokerMetrics import org.apache.pinot.common.request.BrokerRequest import org.apache.pinot.connector.spark.common.partition.PinotSplit import org.apache.pinot.connector.spark.common.{Logging, PinotDataSourceReadOptions, PinotException} +import org.apache.pinot.core.routing.ServerRouteInfo import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, ServerInstance} import org.apache.pinot.spi.config.table.TableType @@ -32,7 +32,7 @@ import org.apache.pinot.spi.env.PinotConfiguration import org.apache.pinot.spi.metrics.PinotMetricUtils import org.apache.pinot.sql.parsers.CalciteSqlCompiler -import java.util.{Collections, List => JList, Map => JMap} +import java.util.{Map => JMap} import scala.collection.JavaConverters._ /** @@ -93,7 +93,7 @@ private[reader] class PinotServerDataFetcher( dataTables.filter(_.getNumberOfRows > 0) } - private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = { + private def createRoutingTableForRequest(): JMap[ServerInstance, ServerRouteInfo] = { val nullZkId: String = null val instanceConfig = new InstanceConfig(nullZkId) instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost) @@ -101,15 +101,15 @@ private[reader] class PinotServerDataFetcher( // TODO: support netty-sec val serverInstance = new ServerInstance(instanceConfig) Map( - serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) + serverInstance -> new ServerRouteInfo(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) ).asJava } private def submitRequestToPinotServer( offlineBrokerRequest: BrokerRequest, - offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]], + offlineRoutingTable: JMap[ServerInstance, ServerRouteInfo], realtimeBrokerRequest: BrokerRequest, - realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = { + realtimeRoutingTable: JMap[ServerInstance, ServerRouteInfo]): AsyncQueryResponse = { logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}") queryRouter.submitQuery( partitionId, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java index ccc6aedb81d1..5a7407805d6d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.transport.ServerInstance; @@ -29,18 +28,18 @@ public class RoutingTable { // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results. - private final Map, List/*optional segments*/>> _serverInstanceToSegmentsMap; + private final Map _serverInstanceToSegmentsMap; private final List _unavailableSegments; private final int _numPrunedSegments; - public RoutingTable(Map, List>> serverInstanceToSegmentsMap, + public RoutingTable(Map serverInstanceToSegmentsMap, List unavailableSegments, int numPrunedSegments) { _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap; _unavailableSegments = unavailableSegments; _numPrunedSegments = numPrunedSegments; } - public Map, List>> getServerInstanceToSegmentsMap() { + public Map getServerInstanceToSegmentsMap() { return _serverInstanceToSegmentsMap; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java new file mode 100644 index 000000000000..cd2b52053bee --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.routing; + +import java.util.List; + +/** + * Class representing the route information for a server. + * It contains the list of segments and optional segments assigned to the server. + */ +public class ServerRouteInfo { + private final List _segments; + private final List _optionalSegments; + + /** + * Constructor for ServerRouteInfo. + * + * @param segments List of segments assigned to the server. + * @param optionalSegments List of optional segments assigned to the server. + */ + public ServerRouteInfo(List segments, List optionalSegments) { + _segments = segments; + _optionalSegments = optionalSegments; + } + + /** + * Gets the list of segments assigned to the server. + * + * @return List of segments. + */ + public List getSegments() { + return _segments; + } + + /** + * Gets the list of optional segments assigned to the server. + * + * @return List of optional segments. + */ + public List getOptionalSegments() { + return _optionalSegments; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 2b7561186524..d0177e86b0f4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -19,14 +19,12 @@ package org.apache.pinot.core.transport; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.config.NettyConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datatable.DataTable; @@ -36,6 +34,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; @@ -89,9 +88,9 @@ public QueryRouter(String brokerId, BrokerMetrics brokerMetrics, @Nullable Netty public AsyncQueryResponse submitQuery(long requestId, String rawTableName, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs) { + @Nullable Map realtimeRoutingTable, long timeoutMs) { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; // can prefer but not require TLS until all servers guaranteed to be on TLS @@ -104,7 +103,7 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, Map requestMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - for (Map.Entry, List>> entry : offlineRoutingTable.entrySet()) { + for (Map.Entry entry : offlineRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue()); @@ -113,7 +112,7 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - for (Map.Entry, List>> entry : realtimeRoutingTable.entrySet()) { + for (Map.Entry entry : realtimeRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); @@ -213,7 +212,7 @@ void markQueryDone(long requestId) { } private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, - Pair, List> segments) { + ServerRouteInfo segments) { InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(requestId); instanceRequest.setQuery(brokerRequest); @@ -221,13 +220,13 @@ private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerR if (queryOptions != null) { instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE))); } - instanceRequest.setSearchSegments(segments.getLeft()); + instanceRequest.setSearchSegments(segments.getSegments()); instanceRequest.setBrokerId(_brokerId); - if (CollectionUtils.isNotEmpty(segments.getRight())) { + if (CollectionUtils.isNotEmpty(segments.getOptionalSegments())) { // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward // compatible, as there are places like in multi-stage query engine where this field is not set today when // creating the InstanceRequest. - instanceRequest.setOptionalSegments(segments.getRight()); + instanceRequest.setOptionalSegments(segments.getOptionalSegments()); } return instanceRequest; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 1b32149d064d..da9c98205684 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -22,9 +22,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.exception.QueryException; @@ -36,6 +34,7 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.scheduler.QueryScheduler; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.spi.config.table.TableType; @@ -66,8 +65,9 @@ public class QueryRoutingTest { SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY); private static final BrokerRequest BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); - private static final Map, List>> ROUTING_TABLE = - Collections.singletonMap(SERVER_INSTANCE, Pair.of(Collections.emptyList(), Collections.emptyList())); + private static final Map ROUTING_TABLE = + Collections.singletonMap(SERVER_INSTANCE, + new ServerRouteInfo(Collections.emptyList(), Collections.emptyList())); private QueryRouter _queryRouter; private ServerRoutingStatsManager _serverRoutingStatsManager; @@ -481,9 +481,9 @@ public void testSkipUnavailableServer() serverInstance1.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); ServerRoutingInstance serverRoutingInstance2 = serverInstance2.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); - Map, List>> routingTable = - Map.of(serverInstance1, Pair.of(Collections.emptyList(), Collections.emptyList()), serverInstance2, - Pair.of(Collections.emptyList(), Collections.emptyList())); + Map routingTable = + Map.of(serverInstance1, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList()), + serverInstance2, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList())); long requestId = 123; DataSchema dataSchema = diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 7556993876a6..b4ef55b5458d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -31,12 +31,12 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions; import org.apache.pinot.calcite.rel.rules.TableOptions; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -332,12 +332,13 @@ private void assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata String tableType = routingEntry.getKey(); RoutingTable routingTable = routingEntry.getValue(); // for each server instance, attach all table types and their associated segment list. - Map, List>> segmentsMap = routingTable.getServerInstanceToSegmentsMap(); - for (Map.Entry, List>> serverEntry : segmentsMap.entrySet()) { + Map segmentsMap = routingTable.getServerInstanceToSegmentsMap(); + for (Map.Entry serverEntry : segmentsMap.entrySet()) { Map> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); // TODO: support optional segments for multi-stage engine. - Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getLeft()) == null, + Preconditions.checkState( + tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getSegments()) == null, "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType); } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java index d01185214077..f4ca63a32b81 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java @@ -22,18 +22,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -57,7 +56,7 @@ public class MockRoutingManagerFactory { private final Map _schemaMap; private final Set _hybridTables; private final Map _serverInstances; - private final Map, List>>> _tableServerSegmentsMap; + private final Map> _tableServerSegmentsMap; public MockRoutingManagerFactory(int... ports) { _tableNameMap = new HashMap<>(); @@ -90,7 +89,8 @@ private void registerTableNameWithType(Schema schema, String tableNameWithType) public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) { ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort)); _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>()) - .computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), null)).getLeft().add(segmentName); + .computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), null)).getSegments() + .add(segmentName); } public RoutingManager buildRoutingManager(@Nullable Map partitionInfoMap) { diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java index 3df75ce8ab93..2217f4157004 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -58,7 +58,7 @@ public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets ti Preconditions.checkNotNull(routingTable, "Failed to get routing table for table: " + sfpNode.getTableName()); for (var entry : routingTable.getServerInstanceToSegmentsMap().entrySet()) { ServerInstance serverInstance = entry.getKey(); - List segments = entry.getValue().getLeft(); + List segments = entry.getValue().getSegments(); context.getLeafIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>()) .put(sfpNode.getId(), segments); }