Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix application qps quota stalls. #14859

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@
* - broker added or removed from cluster
*/
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {

// Maximum 'disabled' value for app quota. If actual value is equal or less than this, it is considered as
// disabled, otherwise it's enabled. This is a side effect of rate limiter accepting only positive values.
private static final double MAX_DISABLED_APP_QUOTA = 0.0d;
// standard value meaning - no app quota limit set
private static final double DISABLED_APP_QUOTA = -1;

private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
Expand Down Expand Up @@ -130,9 +137,9 @@ private void initializeApplicationQpsQuotas() {

String appName = entry.getKey();
double appQpsQuota =
entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication;
entry.getValue() != null ? entry.getValue() : _defaultQpsQuotaForApplication;

if (appQpsQuota < 0) {
if (isDisabled(appQpsQuota)) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
Expand All @@ -144,8 +151,14 @@ private void initializeApplicationQpsQuotas() {
new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, appQpsQuota, -1);
_applicationRateLimiterMap.put(appName, queryQuotaEntity);
}
}

private static boolean isEnabled(double appQpsQuota) {
return appQpsQuota > MAX_DISABLED_APP_QUOTA;
}

return;
private static boolean isDisabled(double appQpsQuota) {
return appQpsQuota <= MAX_DISABLED_APP_QUOTA;
}

@Override
Expand Down Expand Up @@ -348,19 +361,45 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databas
}

public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) {
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA);
}

public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double newQps) {
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), newQps);
}

// Caller method need not worry about getting lock on _applicationRateLimiterMap
// as this method will do idempotent updates to the application rate limiters
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames) {
createOrUpdateApplicationRateLimiter(applicationNames, DISABLED_APP_QUOTA);
}

/**
* Caller method need not worry about getting lock on _applicationRateLimiterMap
* as this method will do idempotent updates to the application rate limiters
* @param applicationNames application names for which to update the rate limiter
* @param newQps - if > 0, fixed value to use for rate limiter(s), otherwise value is fetched from ZK.
*/
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames, double newQps) {
ExternalView brokerResource = getBrokerResource();
Map<String, Double> quotas = null;
if (applicationNames.size() > 0 && !isEnabled(newQps)) {
quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
}

for (String appName : applicationNames) {
double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
if (qpsQuota < 0) {
double qpsQuota;
if (isEnabled(newQps)) {
qpsQuota = newQps;
} else if (quotas != null && quotas.get(appName) != null) {
qpsQuota = quotas.get(appName);
} else {
qpsQuota = _defaultQpsQuotaForApplication;
}

if (isDisabled(qpsQuota)) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}

int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName);
Expand Down Expand Up @@ -436,22 +475,6 @@ private double getEffectiveQueryQuotaOnDatabase(String databaseName) {
return _defaultQpsQuotaForDatabase;
}

/**
* Utility to get the effective query quota being imposed on an application. It is computed based on the default quota
* set at cluster config.
*
* @param applicationName application name to get the query quota on.
* @return effective query quota limit being applied
*/
private double getEffectiveQueryQuotaOnApplication(String applicationName) {
Map<String, Double> quotas =
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) {
return quotas.get(applicationName);
}
return _defaultQpsQuotaForApplication;
}

/**
* Creates a new database rate limiter. Will not update the database rate limiter if it already exists.
* @param databaseName database name for which rate limiter needs to be created
Expand All @@ -472,7 +495,7 @@ public void createApplicationRateLimiter(String applicationName) {
if (_applicationRateLimiterMap.containsKey(applicationName)) {
return;
}
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
createOrUpdateApplicationRateLimiter(applicationName);
}

/**
Expand Down Expand Up @@ -579,10 +602,12 @@ public boolean acquireApplication(String applicationName) {
}
QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName);
if (queryQuota == null) {
if (getDefaultQueryQuotaForApplication() < 0) {
// do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names
if (isDisabled(_defaultQpsQuotaForApplication)) {
return true;
} else {
createOrUpdateApplicationRateLimiter(applicationName);
// create limiter without querying ZK
createOrUpdateApplicationRateLimiter(applicationName, _defaultQpsQuotaForApplication);
queryQuota = _applicationRateLimiterMap.get(applicationName);
}
}
Expand Down Expand Up @@ -809,9 +834,12 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
quota.setNumOnlineBrokers(onlineBrokerCount);
}
if (quota.getOverallRate() > 0) {
if (isEnabled(quota.getOverallRate())) {
double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
quota.setRateLimiter(RateLimiter.create(qpsQuota));
// dividing small qps value by broker's count can result in 0 and blow up in rate limiter
if (isEnabled(qpsQuota)) {
quota.setRateLimiter(RateLimiter.create(qpsQuota));
}
}
}

Expand All @@ -820,9 +848,8 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
}
_lastKnownBrokerResourceVersion.set(currentVersionNumber);
long endTime = System.currentTimeMillis();
LOGGER
.info("Processed query quota change in {}ms, {} out of {} query quota configs rebuilt.", (endTime - startTime),
numRebuilt, _rateLimiterMap.size());
LOGGER.info("Processed query quota change in {}ms, {} out of {} query quota configs rebuilt.",
(endTime - startTime), numRebuilt, _rateLimiterMap.size());
}

/**
Expand Down Expand Up @@ -857,11 +884,16 @@ private double getDefaultQueryQuotaForDatabase() {

private double getDefaultQueryQuotaForApplication() {
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();
return Double.parseDouble(helixAdmin.getConfig(configScope,
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(_helixManager.getClusterName()).build();
String value = helixAdmin.getConfig(configScope,
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
if (value != null) {
return Double.parseDouble(value);
} else {
return DISABLED_APP_QUOTA;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public void testWhenOnlyDefaultAppQuotaIsSetItAffectsAllApplications()
}

@Test
public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
public void testCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
Map<String, Double> apps = new HashMap<>();
apps.put("app1", null);
apps.put("app2", 1d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource {
PinotHelixResourceManager _pinotHelixResourceManager;

/**
* API to get application quota configs. Will return null if application quotas are not defined
* API to get application quota configs. Will return empty map if application quotas are not defined at all.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -88,7 +88,7 @@ public Map<String, Double> getApplicationQuotas(@Context HttpHeaders httpHeaders
}

/**
* API to get application quota configs. Will return null if application quotas are not defined
* API to get application quota config. Will return null if application quotas is not defined.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -102,17 +102,19 @@ public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam("
return quotas.get(appName);
}

HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_pinotHelixResourceManager.getHelixClusterName()).build();
HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(_pinotHelixResourceManager.getHelixClusterName())
.build();

HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
String defaultQuota =
helixAdmin.getConfig(scope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
}

/**
* API to update the quota configs for application
* API to update the quota config for application.
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,35 @@ public void testDefaultDatabaseQueryQuotaOverride()
public void testDefaultApplicationQueryQuotaOverride()
throws Exception {
addAppQueryQuotaToClusterConfig(25);

// override lower than default quota
setQueryQuotaForApplication(10);
testQueryRate(10);

// override higher than default quota
setQueryQuotaForApplication(27);
testQueryRate(27);

// disable
setQueryQuotaForApplication(-1);
verifyQuotaUpdate(Long.MAX_VALUE);
runQueries(50, false);

// verify that default still applies to other application names
runQueries(20, false, "other");
//increase the qps and some of the queries should be throttled.
runQueries(50, true, "other");
}

@Test
public void testDisabledDefaultApplicationQueryQuotaOverride()
throws Exception {
addAppQueryQuotaToClusterConfig(-1);

verifyQuotaUpdate(Long.MAX_VALUE);
runQueries(10, false);

// override default quota
setQueryQuotaForApplication(40);
testQueryRate(40);
}
Expand Down Expand Up @@ -264,48 +289,58 @@ private static void sleep(long deadline, double iterationsLeft) {
}
}

private void runQueries(int qps, boolean shouldFail) {
runQueries(qps, shouldFail, "default");
}

// try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps
private void runQueries(int qps, boolean shouldFail) {
private void runQueries(int qps, boolean shouldFail, String applicationName) {
int failCount = 0;
boolean isLastFail = false;
long deadline = System.currentTimeMillis() + 1000;

String query = "SET applicationName='" + applicationName + "'; SELECT COUNT(*) FROM " + getTableName();

for (int i = 0; i < qps; i++) {
sleep(deadline, qps - i);
ResultSetGroup resultSetGroup =
_pinotConnection.execute("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName());
ResultSetGroup resultSetGroup = _pinotConnection.execute(query);
for (PinotClientException exception : resultSetGroup.getExceptions()) {
if (exception.getMessage().contains("QuotaExceededError")) {
failCount++;
isLastFail = i == qps - 1;
break;
}
}
}

if (shouldFail) {
assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
Assert.assertNotEquals(failCount, 0, "Expected nonzero failures for qps: " + qps + " isLastFail: " + isLastFail);
} else {
Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + qps);
Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + qps + " isLastFail: " + isLastFail);
}
}

private static volatile float _quota;
private static volatile double _quota;
private static volatile String _quotaSource;

private void verifyQuotaUpdate(float quotaQps) {
private void verifyQuotaUpdate(double quotaQps) {
try {
TestUtils.waitForCondition(aVoid -> {
try {
float tableQuota = Float.parseFloat(sendGetRequest(
String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE", _brokerHostPort, getTableName())));
double tableQuota = Double.parseDouble(sendGetRequest(
"http://" + _brokerHostPort + "/debug/tables/queryQuota/" + getTableName() + "_OFFLINE"));
double dbQuota = Double.parseDouble(
sendGetRequest("http://" + _brokerHostPort + "/debug/databases/queryQuota/default"));
double appQuota = Double.parseDouble(
sendGetRequest("http://" + _brokerHostPort + "/debug/applicationQuotas/default"));

tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
float dbQuota = Float.parseFloat(
sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default", _brokerHostPort)));
float appQuota = Float.parseFloat(
sendGetRequest(String.format("http://%s/debug/applicationQuotas/default", _brokerHostPort)));
dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota;
float actualQuota = Math.min(Math.min(tableQuota, dbQuota), appQuota);

double actualQuota = Math.min(Math.min(tableQuota, dbQuota), appQuota);

_quota = actualQuota;
if (_quota == dbQuota) {
_quotaSource = "database";
Expand All @@ -314,12 +349,13 @@ private void verifyQuotaUpdate(float quotaQps) {
} else {
_quotaSource = "application";
}
return quotaQps == actualQuota || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE
return Math.abs(quotaQps - actualQuota) < 0.01
|| (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE
&& appQuota == Long.MAX_VALUE);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 5000, "Failed to reflect query quota on rate limiter in 5s.");
}, 10000, "Failed to reflect query quota on rate limiter in 5s.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not strictly necessary, so I'll revert it.

} catch (AssertionError ae) {
throw new AssertionError(
ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae);
Expand Down
Loading