Skip to content

Commit

Permalink
Merge branch 'apache:master' into master-support-storing-non-encodabl…
Browse files Browse the repository at this point in the history
…e-values-in-another-column
  • Loading branch information
jackluo923 authored Nov 17, 2024
2 parents 03edc2a + 07962b0 commit 518d89a
Show file tree
Hide file tree
Showing 452 changed files with 15,438 additions and 5,578 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pinot_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ jobs:
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
run: .github/workflows/scripts/pr-tests/.pinot_tests_unit.sh
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
continue-on-error: true
timeout-minutes: 5
with:
Expand Down Expand Up @@ -232,7 +232,7 @@ jobs:
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
run: .github/workflows/scripts/pr-tests/.pinot_tests_integration.sh
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
continue-on-error: true
timeout-minutes: 5
with:
Expand All @@ -259,7 +259,7 @@ jobs:
run: .github/workflows/scripts/pr-tests/.pinot_tests_custom_integration.sh
- name: Upload coverage to Codecov
if : ${{ matrix.testset == 1 }}
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
continue-on-error: true
timeout-minutes: 5
with:
Expand Down
3 changes: 1 addition & 2 deletions config/codestyle-intellij.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@
</codeStyleSettings>
<codeStyleSettings language="JAVA">
<option name="RIGHT_MARGIN" value="120"/>
<option name="KEEP_LINE_BREAKS" value="false"/>
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
<option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="0"/>
Expand All @@ -206,7 +205,7 @@
<option name="THROWS_LIST_WRAP" value="1"/>
<option name="EXTENDS_KEYWORD_WRAP" value="1"/>
<option name="THROWS_KEYWORD_WRAP" value="2"/>
<option name="METHOD_CALL_CHAIN_WRAP" value="1"/>
<option name="METHOD_CALL_CHAIN_WRAP" value="5"/>
<option name="BINARY_OPERATION_WRAP" value="1"/>
<option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true"/>
<option name="TERNARY_OPERATION_WRAP" value="1"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ rules:
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ServerMetrics\", name=\"pinot\\.server\\.realtimeConsumptionExceptions\"><>(\\w+)"
name: "pinot_server_realtime_consumptionExceptions_$1"
cache: true
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ServerMetrics\", name=\"pinot\\.server\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+)\\.(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsFiltered|realtimeRowsConsumed|realtimeRowsFetched|streamConsumerCreateExceptions)\"><>(\\w+)"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ServerMetrics\", name=\"pinot\\.server\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+)\\.(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsFiltered|realtimeRowsConsumed|realtimeRowsFetched|streamConsumerCreateExceptions|realtimeRowsSanitized)\"><>(\\w+)"
name: "pinot_server_$7_$8"
cache: true
labels:
Expand Down Expand Up @@ -132,6 +132,18 @@ rules:
labels:
database: "$2"
table: "$1$3"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ServerMetrics\", name=\"pinot\\.server\\.luceneIndexingDelayMs\\.(([^.]+)\\.)?([^.]*)\"><>(\\w+)"
name: "pinot_server_luceneIndexingDelayMs_$4"
cache: true
labels:
database: "$2"
table: "$1$3"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ServerMetrics\", name=\"pinot\\.server\\.luceneIndexingDelayDocs\\.(([^.]+)\\.)?([^.]*)\"><>(\\w+)"
name: "pinot_server_luceneIndexingDelayDocs_$4"
cache: true
labels:
database: "$2"
table: "$1$3"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ServerMetrics\", name=\"pinot\\.server\\.numResizes\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\"><>(\\w+)"
name: "pinot_server_numResizes_$5"
cache: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.utils.PinotAppConfigs;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
Expand Down Expand Up @@ -259,6 +261,7 @@ public void start()
throws Exception {
LOGGER.info("Starting Pinot broker (Version: {})", PinotVersion.VERSION);
LOGGER.info("Broker configs: {}", new PinotAppConfigs(getConfig()).toJSONString());
long startTimeMs = System.currentTimeMillis();
_isStarting = true;
Utils.logVersions();

Expand Down Expand Up @@ -435,6 +438,8 @@ public void start()
registerServiceStatusHandler();

_isStarting = false;
_brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
LOGGER.info("Finish starting Pinot broker");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
if (realtimeTableName == null) {
realtimeTableConfig = null;
}

HandlerContext handlerContext = getHandlerContext(offlineTableConfig, realtimeTableConfig);
if (handlerContext._disableGroovy) {
rejectGroovyQuery(serverPinotQuery);
Expand Down Expand Up @@ -620,9 +621,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null;
List<String> unavailableSegments = new ArrayList<>();
int numPrunedSegmentsTotal = 0;
boolean offlineTableDisabled = false;
boolean realtimeTableDisabled = false;
List<ProcessingException> exceptions = new ArrayList<>();
if (offlineBrokerRequest != null) {
offlineTableDisabled = _routingManager.isTableDisabled(offlineTableName);
// NOTE: Routing table might be null if table is just removed
RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId);
RoutingTable routingTable = null;
if (!offlineTableDisabled) {
routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId);
}
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
Expand All @@ -638,8 +646,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}
if (realtimeBrokerRequest != null) {
realtimeTableDisabled = _routingManager.isTableDisabled(realtimeTableName);
// NOTE: Routing table might be null if table is just removed
RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
RoutingTable routingTable = null;
if (!realtimeTableDisabled) {
routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
}
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
Expand All @@ -654,10 +666,25 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
realtimeBrokerRequest = null;
}
}

if (offlineTableDisabled || realtimeTableDisabled) {
String errorMessage = null;
if (((realtimeTableConfig != null && offlineTableConfig != null) && (offlineTableDisabled
&& realtimeTableDisabled)) || (offlineTableConfig == null && realtimeTableDisabled) || (
realtimeTableConfig == null && offlineTableDisabled)) {
requestContext.setErrorCode(QueryException.TABLE_IS_DISABLED_ERROR_CODE);
return BrokerResponseNative.TABLE_IS_DISABLED;
} else if ((realtimeTableConfig != null && offlineTableConfig != null) && realtimeTableDisabled) {
errorMessage = "Realtime table is disabled in hybrid table";
} else if ((realtimeTableConfig != null && offlineTableConfig != null) && offlineTableDisabled) {
errorMessage = "Offline table is disabled in hybrid table";
}
exceptions.add(QueryException.getException(QueryException.TABLE_IS_DISABLED_ERROR, errorMessage));
}

int numUnavailableSegments = unavailableSegments.size();
requestContext.setNumUnavailableSegments(numUnavailableSegments);

List<ProcessingException> exceptions = new ArrayList<>();
if (numUnavailableSegments > 0) {
String errorMessage;
if (numUnavailableSegments > MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION) {
Expand Down Expand Up @@ -792,6 +819,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
}
brokerResponse.setTablesQueried(Set.of(rawTableName));

for (ProcessingException exception : exceptions) {
brokerResponse.addException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.ExceptionUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.query.QueryEnvironment;
Expand Down Expand Up @@ -92,7 +93,10 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
_queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config));
_queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config), config.getProperty(
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config,
CommonConstants.Broker.BROKER_TLS_PREFIX) : null);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs,
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit());
Expand Down Expand Up @@ -246,6 +250,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());

Expand Down Expand Up @@ -297,11 +302,8 @@ private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse,
List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) {
try {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
List<PlanNode> planNodes = new ArrayList<>(stagePlans.size());
for (DispatchablePlanFragment stagePlan : stagePlans) {
planNodes.add(stagePlan.getPlanFragment().getFragmentRoot());
}
MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(planNodes, queryStats);

MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(stagePlans, queryStats);
brokerResponse.setStageStats(treeBuilder.jsonStatsByStage(0));
for (MultiStageQueryStats.StageStats.Closed stageStats : queryStats) {
if (stageStats != null) { // for example pipeline breaker may not have stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public void shutDown() {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
RequestContext requestContext) {
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(_requestIdGenerator.get());
RangeTimeSeriesRequest timeSeriesRequest = null;
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public synchronized void buildRouting(String tableNameWithType) {
RoutingEntry routingEntry =
new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, segmentPreSelector, segmentSelector,
segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, segmentZkMetadataFetcher,
timeBoundaryManager, partitionMetadataManager, queryTimeoutMs);
timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, !idealState.isEnabled());
if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
LOGGER.info("Built routing for table: {}", tableNameWithType);
} else {
Expand Down Expand Up @@ -603,6 +603,20 @@ public boolean routingExists(String tableNameWithType) {
return _routingEntryMap.containsKey(tableNameWithType);
}

/**
* Returns whether the given table is enabled
* @param tableNameWithType Table name with type
* @return Whether the given table is enabled
*/
public boolean isTableDisabled(String tableNameWithType) {
RoutingEntry routingEntry = _routingEntryMap.getOrDefault(tableNameWithType, null);
if (routingEntry == null) {
return false;
} else {
return routingEntry.isDisabled();
}
}

/**
* Returns the routing table (a map from server instance to list of segments hosted by the server, and a list of
* unavailable segments) based on the broker request, or {@code null} if the routing does not exist.
Expand Down Expand Up @@ -729,11 +743,14 @@ private static class RoutingEntry {
// Time boundary manager is only available for the offline part of the hybrid table
transient TimeBoundaryManager _timeBoundaryManager;

transient boolean _disabled;

RoutingEntry(String tableNameWithType, String idealStatePath, String externalViewPath,
SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, List<SegmentPruner> segmentPruners,
InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, int lastUpdateExternalViewVersion,
SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable TimeBoundaryManager timeBoundaryManager,
@Nullable SegmentPartitionMetadataManager partitionMetadataManager, @Nullable Long queryTimeoutMs) {
@Nullable SegmentPartitionMetadataManager partitionMetadataManager, @Nullable Long queryTimeoutMs,
boolean disabled) {
_tableNameWithType = tableNameWithType;
_idealStatePath = idealStatePath;
_externalViewPath = externalViewPath;
Expand All @@ -747,6 +764,7 @@ private static class RoutingEntry {
_partitionMetadataManager = partitionMetadataManager;
_queryTimeoutMs = queryTimeoutMs;
_segmentZkMetadataFetcher = segmentZkMetadataFetcher;
_disabled = disabled;
}

String getTableNameWithType() {
Expand Down Expand Up @@ -779,6 +797,10 @@ Long getQueryTimeoutMs() {
return _queryTimeoutMs;
}

boolean isDisabled() {
return _disabled;
}

// NOTE: The change gets applied in sequence, and before change applied to all components, there could be some
// inconsistency between components, which is fine because the inconsistency only exists for the newly changed
// segments and only lasts for a very short time.
Expand All @@ -793,6 +815,7 @@ void onAssignmentChange(IdealState idealState, ExternalView externalView) {
}
_lastUpdateIdealStateVersion = idealState.getStat().getVersion();
_lastUpdateExternalViewVersion = externalView.getStat().getVersion();
_disabled = !idealState.isEnabled();
}

void onInstancesChange(Set<String> enabledInstances, List<String> changedInstances) {
Expand Down
16 changes: 16 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@
<value>false</value> <!-- This will disable all default listeners -->
</property>
</properties>
<!--Disabling tests as Pinot currently uses Yammer and these tests fail for DropwizardMetricsFactory-->
<excludes>
<exclude>**/DropwizardBrokerPrometheusMetricsTest.java</exclude>
<exclude>**/DropwizardServerPrometheusMetricsTest.java</exclude>
<exclude>**/DropwizardMinionPrometheusMetricsTest.java</exclude>
<exclude>**/DropwizardControllerPrometheusMetricsTest.java</exclude>
</excludes>
</configuration>
</plugin>

Expand Down Expand Up @@ -107,6 +114,10 @@
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.prometheus.jmx</groupId>
<artifactId>jmx_prometheus_javaagent</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-spi</artifactId>
Expand Down Expand Up @@ -269,6 +280,11 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-dropwizard</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.sourceforge.fmpp</groupId>
<artifactId>fmpp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.collections;

import java.util.AbstractList;


/**
* An immutable list like the one returned by {@link java.util.Collections#nCopies(int, Object)}, but with two values
* (that are not interleaved) instead of a single one. Useful for avoiding unnecessary allocations.
*/
public class DualValueList<E> extends AbstractList<E> {

private final E _firstValue;
private final E _secondValue;
private final int _firstCount;
private final int _totalSize;

public DualValueList(E firstValue, int firstCount, E secondValue, int secondCount) {
_firstValue = firstValue;
_firstCount = firstCount;
_secondValue = secondValue;
_totalSize = firstCount + secondCount;
}

@Override
public E get(int index) {
if (index < 0 || index >= _totalSize) {
throw new IndexOutOfBoundsException(index);
}
return index < _firstCount ? _firstValue : _secondValue;
}

@Override
public int size() {
return _totalSize;
}
}
Loading

0 comments on commit 518d89a

Please sign in to comment.