Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail query if one source fails #2045

Open
wants to merge 3 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.opentsdb.query;

public class IncompleteStateException extends RuntimeException {

public IncompleteStateException() {
super();
}

public IncompleteStateException(String message) {
super(message);
}

public IncompleteStateException(String message, Throwable t) {
super(message, t);
}

public IncompleteStateException(Throwable t) {
super(t);
}

}
46 changes: 40 additions & 6 deletions core/src/main/java/net/opentsdb/query/hacluster/HACluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import net.opentsdb.query.IncompleteStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -209,6 +210,28 @@ public void onNext(final QueryResult next) {
}
}
}
} else {
//Error from downstream


final String log;
if (config.getDataSources().size() == 1 ||
config.getDataSources().get(0).equals(id)) {
log = "Primary errored out: " +
(next.error() == null ? "" : next.error());
} else {
log = "Secondary errored out: " +
(next.error() == null ? "" : next.error());
}

if(Boolean.parseBoolean(config.failOnAnyError())) {
if(next.exception() != null) {
onError(new IncompleteStateException(log, next.exception()));
} else {
onError(new IncompleteStateException(log));
}
}

}
}

Expand Down Expand Up @@ -269,13 +292,24 @@ public void run(final Timeout ignored) throws Exception {
// Already sent upstream, lost the race.
return;
}
if (is_primary) {
context.queryContext().logWarn(HACluster.this,
"Primary timed out after: " + config.getPrimaryTimeout());

final String log;

if(is_primary) {
log = "Primary timed out after: " + config.getPrimaryTimeout();
} else {
context.queryContext().logWarn(HACluster.this,
"Secondary timed out after: " + config.getSecondaryTimeout());
log = "Secondary timed out after: " + config.getSecondaryTimeout();
}

if(Boolean.parseBoolean(config.failOnAnyError())) {
//Error out
onError(new IncompleteStateException(log));
return;
}

context.queryContext().logWarn(HACluster.this,
log);


QueryResult good_result = null;
for (final QueryResult result : results.values()) {
Expand Down Expand Up @@ -306,7 +340,7 @@ public void run(final Timeout ignored) throws Exception {
}

}

void complete(final boolean timed_out) {
if (!timed_out && !completed.compareAndSet(false, true)) {
LOG.warn("HA cluster node was trying to mark as complete but has "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public class HAClusterConfig extends BaseTimeSeriesDataSourceConfig<
* returns first. */
private final String primary_timeout;

/**
* Fails if any of the sources timeout/error out
*/
private final String fail_on_any_error;

/** A hash that calculates and stores the hash code once. */
private int hash;

Expand All @@ -86,7 +91,7 @@ protected HAClusterConfig(final Builder builder) {
merge_aggregator = builder.mergeAggregator;
secondary_timeout = builder.secondaryTimeout;
primary_timeout = builder.primaryTimeout;

fail_on_any_error = builder.failOnAnyError;
// validate the timeouts
if (!Strings.isNullOrEmpty(secondary_timeout)) {
DateTime.parseDuration(secondary_timeout);
Expand Down Expand Up @@ -118,6 +123,11 @@ public String getSecondaryTimeout() {
return secondary_timeout;
}

/**@return */
public String failOnAnyError() {
return fail_on_any_error;
}

/** @return An optional timeout for the primary when a secondary
* responds first. */
public String getPrimaryTimeout() {
Expand Down Expand Up @@ -230,6 +240,7 @@ public Builder toBuilder() {
.setMergeAggregator(merge_aggregator)
.setSecondaryTimeout(secondary_timeout)
.setPrimaryTimeout(primary_timeout)
.setFailOnAnyError(fail_on_any_error)
.setHasBeenSetup(has_been_setup);
if (!data_sources.isEmpty()) {
builder.setDataSources(Lists.newArrayList(data_sources));
Expand Down Expand Up @@ -260,6 +271,8 @@ public static class Builder extends BaseTimeSeriesDataSourceConfig.Builder<
private String secondaryTimeout;
@JsonProperty
private String primaryTimeout;
@JsonProperty
private String failOnAnyError;

Builder() {
setType("HAClusterConfig");
Expand Down Expand Up @@ -326,6 +339,11 @@ public Builder setPrimaryTimeout(final String primary_timeout) {
return this;
}

public Builder setFailOnAnyError(final String fail_on_any_error) {
failOnAnyError = fail_on_any_error;
return this;
}

public List<String> dataSources() {
return dataSources == null ? Collections.emptyList() : dataSources;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class HAClusterFactory extends BaseQueryNodeFactory<
public static final String AGGREGATOR_KEY = "hacluster.default.aggregator";
public static final String PRIMARY_KEY = "hacluster.default.timeout.primary";
public static final String SECONDARY_KEY = "hacluster.default.timeout.secondary";
public static final String FAIL_ON_ANY_ERROR = "hacluster.fail.on.any.error";

/** The default sources updated on config callback. */
protected final List<String> default_sources;
Expand Down Expand Up @@ -193,6 +194,10 @@ public void setupGraph(final QueryPipelineContext context,
builder.setSecondaryTimeout(tsdb.getConfig().getString(
getConfigKey(SECONDARY_KEY)));
}
if(Strings.isNullOrEmpty(cluster_config.failOnAnyError())) {
builder.setFailOnAnyError(tsdb.getConfig().getString(
getConfigKey(FAIL_ON_ANY_ERROR)));
}

if (cluster_config.getDataSources().isEmpty() &&
cluster_config.getDataSourceConfigs().isEmpty()) {
Expand Down Expand Up @@ -728,6 +733,11 @@ void registerConfigs(final TSDB tsdb) {
"A duration defining how long to wait for a secondary source "
+ "the primary source responds first.");
}
if(!tsdb.getConfig().hasProperty(getConfigKey(FAIL_ON_ANY_ERROR))) {
tsdb.getConfig().register(getConfigKey(FAIL_ON_ANY_ERROR), "false", true,
"Whether to fail the entire request if, any one of the listed sources " +
"fails to return a valid result.");
}

tsdb.getConfig().bind(getConfigKey(SOURCES_KEY), new SettingsCallback());
}
Expand Down