Skip to content

Commit

Permalink
Add broker setting to override default implicit query response limit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bziobrowski authored Jan 30, 2025
1 parent 7293122 commit 1196459
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final boolean _enableQueryLimitOverride;
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
// if >= 0, then overrides default limit of 10, otherwise setting is ignored
protected final int _defaultQueryLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
Expand All @@ -160,6 +162,8 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
_defaultQueryLimit = config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT,
Broker.DEFAULT_BROKER_QUERY_LIMIT);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
Expand All @@ -171,9 +175,10 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
}

LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, query log max length: {}, "
+ "query log max rate: {}, query cancellation enabled: {}", getClass().getSimpleName(), _brokerId,
_brokerTimeoutMs, _queryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, "
+ "default query limit {}, query log max length: {}, query log max rate: {}, query cancellation "
+ "enabled: {}", getClass().getSimpleName(), _brokerId, _brokerTimeoutMs, _queryResponseLimit,
_defaultQueryLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
enableQueryCancellation);
}

Expand Down Expand Up @@ -309,6 +314,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}

if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) {
pinotQuery.setLimit(_defaultQueryLimit);
}

if (isLiteralOnlyQuery(pinotQuery)) {
LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
try {
Expand Down Expand Up @@ -881,6 +890,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}

private boolean isDefaultQueryResponseLimitEnabled() {
return _defaultQueryLimit > -1;
}

@VisibleForTesting
static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy,
String offlineRoutingPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void setUp()
properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
properties.put(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, true);
properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
properties.put(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT, 1000);

_brokerStarter = new HelixBrokerStarter();
_brokerStarter.init(new PinotConfiguration(properties));
Expand Down Expand Up @@ -145,6 +146,7 @@ public void testClusterConfigOverride() {

// NOTE: It is disabled in cluster config, but enabled in instance config. Instance config should take precedence.
assertTrue(config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false));
assertEquals(config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT, 1), 1000);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;


// this test uses separate cluster because it needs to change broker configuration
// which is only done once per suite
public class BrokerQueryLimitTest extends BaseClusterIntegrationTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(BrokerQueryLimitTest.class);
private static final String LONG_COLUMN = "longCol";
public static final int DEFAULT_LIMIT = 5;

@Test
public void testWhenLimitIsNOTSetExplicitlyThenDefaultLimitIsApplied()
throws Exception {
setUseMultiStageQueryEngine(false);
String query = String.format("SELECT %s FROM %s", LONG_COLUMN, getTableName());

JsonNode result = postQuery(query).get("resultTable");
JsonNode columnDataTypesNode = result.get("dataSchema").get("columnDataTypes");
assertEquals(columnDataTypesNode.get(0).textValue(), "LONG");

JsonNode rows = result.get("rows");
assertEquals(rows.size(), DEFAULT_LIMIT);

for (int rowNum = 0; rowNum < rows.size(); rowNum++) {
JsonNode row = rows.get(rowNum);
assertEquals(row.size(), 1);
assertEquals(row.get(0).asLong(), rowNum);
}
}

@Test
public void testWhenLimitISSetExplicitlyThenDefaultLimitIsNotApplied()
throws Exception {
setUseMultiStageQueryEngine(false);
String query = String.format("SELECT %s FROM %s limit 20", LONG_COLUMN, getTableName());

JsonNode result = postQuery(query).get("resultTable");
JsonNode columnDataTypesNode = result.get("dataSchema").get("columnDataTypes");
assertEquals(columnDataTypesNode.get(0).textValue(), "LONG");

JsonNode rows = result.get("rows");
assertEquals(rows.size(), 20);

for (int rowNum = 0; rowNum < rows.size(); rowNum++) {
JsonNode row = rows.get(rowNum);
assertEquals(row.size(), 1);
assertEquals(row.get(0).asLong(), rowNum);
}
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT, DEFAULT_LIMIT);
}

@Override
public String getTableName() {
return DEFAULT_TABLE_NAME;
}

@Override
public Schema createSchema() {
return new Schema.SchemaBuilder().setSchemaName(getTableName())
.addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
.build();
}

public File createAvroFile()
throws Exception {
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(ImmutableList.of(
new org.apache.avro.Schema.Field(LONG_COLUMN, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
null, null)));

File avroFile = new File(_tempDir, "data.avro");
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
writer.create(avroSchema, avroFile);
for (int i = 0; i < getCountStarResult(); i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(LONG_COLUMN, i);
writer.append(record);
}
}
return avroFile;
}

@BeforeClass
public void setUp()
throws Exception {
LOGGER.warn("Setting up integration test class: {}", getClass().getSimpleName());
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
LOGGER.warn("Start Kafka in the integration test class");
startKafka();
startController();
startBroker();
startServer();

if (_controllerRequestURLBuilder == null) {
_controllerRequestURLBuilder =
ControllerRequestURLBuilder.baseUrl("http://localhost:" + getControllerPort());
}
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// create & upload schema AND table config
Schema schema = createSchema();
addSchema(schema);

File avroFile = createAvroFile();
// create offline table
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);

// create & upload segments
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);

waitForAllDocsLoaded(60_000);
LOGGER.warn("Finished setting up integration test class: {}", getClass().getSimpleName());
}

@AfterClass
public void tearDown()
throws Exception {
LOGGER.warn("Tearing down integration test class: {}", getClass().getSimpleName());
dropOfflineTable(getTableName());
FileUtils.deleteDirectory(_tempDir);

// Stop Kafka
LOGGER.warn("Stop Kafka in the integration test class");
stopKafka();
// Shutdown the Pinot cluster
stopServer();
stopBroker();
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
LOGGER.warn("Finished tearing down integration test class: {}", getClass().getSimpleName());
}

@Override
public TableConfig createOfflineTableConfig() {
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,14 @@ public static class Broker {

public static final String CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES = "pinot.broker.query.rewriter.class.names";
public static final String CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT = "pinot.broker.query.response.limit";
public static final String CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT =
"pinot.broker.default.query.limit";

public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;

// -1 means no limit; value of 10 aligns limit with PinotQuery's defaults.
public static final int DEFAULT_BROKER_QUERY_LIMIT = 10;

public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
Expand Down

0 comments on commit 1196459

Please sign in to comment.