From 8d70fa5c4f7be8fbb5269f35bcdcbb3b544d2f10 Mon Sep 17 00:00:00 2001 From: chiruvol Date: Tue, 10 Nov 2020 12:13:37 -0800 Subject: [PATCH 1/3] Fail query if one source fails --- .../query/IncompleteStateException.java | 21 +++++++++ .../opentsdb/query/hacluster/HACluster.java | 45 ++++++++++++++++--- .../query/hacluster/HAClusterConfig.java | 21 ++++++++- .../query/hacluster/HAClusterFactory.java | 10 +++++ 4 files changed, 90 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/net/opentsdb/query/IncompleteStateException.java diff --git a/core/src/main/java/net/opentsdb/query/IncompleteStateException.java b/core/src/main/java/net/opentsdb/query/IncompleteStateException.java new file mode 100644 index 0000000000..bc696ba1cc --- /dev/null +++ b/core/src/main/java/net/opentsdb/query/IncompleteStateException.java @@ -0,0 +1,21 @@ +package net.opentsdb.query; + +public class IncompleteStateException extends RuntimeException { + + public IncompleteStateException() { + super(); + } + + public IncompleteStateException(String message) { + super(message); + } + + public IncompleteStateException(String message, Throwable t) { + super(message, t); + } + + public IncompleteStateException(Throwable t) { + super(t); + } + +} diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java b/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java index 13bd874c25..a595f6e9e5 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import net.opentsdb.query.IncompleteStateException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,6 +210,28 @@ public void onNext(final QueryResult next) { } } } + } else { + //Error from downstream + + + final String log; + if (config.getDataSources().size() == 1 || + config.getDataSources().get(0).equals(id)) { + log = "Primary errored out: " + + (next.error() == null ? "" : next.error()); + } else { + log = "Secondary errored out: " + + (next.error() == null ? "" : next.error()); + } + + if(config.failOnAnyError()) { + if(next.exception() != null) { + onError(new IncompleteStateException(log, next.exception())); + } else { + onError(new IncompleteStateException(log)); + } + } + } } @@ -269,13 +292,23 @@ public void run(final Timeout ignored) throws Exception { // Already sent upstream, lost the race. return; } - if (is_primary) { - context.queryContext().logWarn(HACluster.this, - "Primary timed out after: " + config.getPrimaryTimeout()); + + final String log; + + if(is_primary) { + log = "Primary timed out after: " + config.getPrimaryTimeout(); } else { - context.queryContext().logWarn(HACluster.this, - "Secondary timed out after: " + config.getSecondaryTimeout()); + log = "Secondary timed out after: " + config.getSecondaryTimeout(); + } + + if(config.failOnAnyError()) { + //Error out + onError(new IncompleteStateException(log)); } + + context.queryContext().logWarn(HACluster.this, + log); + QueryResult good_result = null; for (final QueryResult result : results.values()) { @@ -306,7 +339,7 @@ public void run(final Timeout ignored) throws Exception { } } - + void complete(final boolean timed_out) { if (!timed_out && !completed.compareAndSet(false, true)) { LOG.warn("HA cluster node was trying to mark as complete but has " diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java index 8a92b22672..83679db2e2 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java @@ -68,6 +68,11 @@ public class HAClusterConfig extends BaseTimeSeriesDataSourceConfig< * returns first. */ private final String primary_timeout; + /** + * Fails if any of the sources timeout/error out + */ + private final String fail_on_any_error; + /** A hash that calculates and stores the hash code once. */ private int hash; @@ -86,7 +91,7 @@ protected HAClusterConfig(final Builder builder) { merge_aggregator = builder.mergeAggregator; secondary_timeout = builder.secondaryTimeout; primary_timeout = builder.primaryTimeout; - + fail_on_any_error = builder.failOnAnyError; // validate the timeouts if (!Strings.isNullOrEmpty(secondary_timeout)) { DateTime.parseDuration(secondary_timeout); @@ -118,6 +123,12 @@ public String getSecondaryTimeout() { return secondary_timeout; } + /**@return */ + public boolean failOnAnyError() { + //TODO: Avoid parsing Boolean each time. + return Boolean.parseBoolean(fail_on_any_error); + } + /** @return An optional timeout for the primary when a secondary * responds first. */ public String getPrimaryTimeout() { @@ -230,6 +241,7 @@ public Builder toBuilder() { .setMergeAggregator(merge_aggregator) .setSecondaryTimeout(secondary_timeout) .setPrimaryTimeout(primary_timeout) + .setFailOnAnyError(fail_on_any_error) .setHasBeenSetup(has_been_setup); if (!data_sources.isEmpty()) { builder.setDataSources(Lists.newArrayList(data_sources)); @@ -260,6 +272,8 @@ public static class Builder extends BaseTimeSeriesDataSourceConfig.Builder< private String secondaryTimeout; @JsonProperty private String primaryTimeout; + @JsonProperty + private String failOnAnyError; Builder() { setType("HAClusterConfig"); @@ -326,6 +340,11 @@ public Builder setPrimaryTimeout(final String primary_timeout) { return this; } + public Builder setFailOnAnyError(final String fail_on_any_error) { + failOnAnyError = fail_on_any_error; + return this; + } + public List dataSources() { return dataSources == null ? Collections.emptyList() : dataSources; } diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java index ec91b7c856..85bd641fac 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java @@ -89,6 +89,7 @@ public class HAClusterFactory extends BaseQueryNodeFactory< public static final String AGGREGATOR_KEY = "hacluster.default.aggregator"; public static final String PRIMARY_KEY = "hacluster.default.timeout.primary"; public static final String SECONDARY_KEY = "hacluster.default.timeout.secondary"; + public static final String FAIL_ON_ANY_ERROR = "hacluster.fail.on.any.error"; /** The default sources updated on config callback. */ protected final List default_sources; @@ -193,6 +194,10 @@ public void setupGraph(final QueryPipelineContext context, builder.setSecondaryTimeout(tsdb.getConfig().getString( getConfigKey(SECONDARY_KEY))); } + if(Strings.isNullOrEmpty(cluster_config.getFail_on_any_error())) { + builder.setFailOnAnyError(tsdb.getConfig().getString( + getConfigKey(FAIL_ON_ANY_ERROR))); + } if (cluster_config.getDataSources().isEmpty() && cluster_config.getDataSourceConfigs().isEmpty()) { @@ -728,6 +733,11 @@ void registerConfigs(final TSDB tsdb) { "A duration defining how long to wait for a secondary source " + "the primary source responds first."); } + if(!tsdb.getConfig().hasProperty(getConfigKey(FAIL_ON_ANY_ERROR))) { + tsdb.getConfig().register(getConfigKey(FAIL_ON_ANY_ERROR), "false", true, + "Whether to fail the entire request if, any one of the listed sources " + + "fails to return a valid result."); + } tsdb.getConfig().bind(getConfigKey(SOURCES_KEY), new SettingsCallback()); } From eb52ed1b4085957a690a5485e0b7c5da6fc998a8 Mon Sep 17 00:00:00 2001 From: chiruvol Date: Tue, 10 Nov 2020 14:00:19 -0800 Subject: [PATCH 2/3] return on error --- core/src/main/java/net/opentsdb/query/hacluster/HACluster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java b/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java index a595f6e9e5..186829f163 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java @@ -304,6 +304,7 @@ public void run(final Timeout ignored) throws Exception { if(config.failOnAnyError()) { //Error out onError(new IncompleteStateException(log)); + return; } context.queryContext().logWarn(HACluster.this, From 42a4a3a5e72a262814abf4c87d5f0fea964419d8 Mon Sep 17 00:00:00 2001 From: chiruvol Date: Wed, 11 Nov 2020 10:42:48 -0800 Subject: [PATCH 3/3] fix method name --- .../main/java/net/opentsdb/query/hacluster/HACluster.java | 4 ++-- .../java/net/opentsdb/query/hacluster/HAClusterConfig.java | 5 ++--- .../java/net/opentsdb/query/hacluster/HAClusterFactory.java | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java b/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java index 186829f163..71548d6f6c 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HACluster.java @@ -224,7 +224,7 @@ public void onNext(final QueryResult next) { (next.error() == null ? "" : next.error()); } - if(config.failOnAnyError()) { + if(Boolean.parseBoolean(config.failOnAnyError())) { if(next.exception() != null) { onError(new IncompleteStateException(log, next.exception())); } else { @@ -301,7 +301,7 @@ public void run(final Timeout ignored) throws Exception { log = "Secondary timed out after: " + config.getSecondaryTimeout(); } - if(config.failOnAnyError()) { + if(Boolean.parseBoolean(config.failOnAnyError())) { //Error out onError(new IncompleteStateException(log)); return; diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java index 83679db2e2..d4d83c879f 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterConfig.java @@ -124,9 +124,8 @@ public String getSecondaryTimeout() { } /**@return */ - public boolean failOnAnyError() { - //TODO: Avoid parsing Boolean each time. - return Boolean.parseBoolean(fail_on_any_error); + public String failOnAnyError() { + return fail_on_any_error; } /** @return An optional timeout for the primary when a secondary diff --git a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java index 85bd641fac..0276f87442 100644 --- a/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java +++ b/core/src/main/java/net/opentsdb/query/hacluster/HAClusterFactory.java @@ -194,7 +194,7 @@ public void setupGraph(final QueryPipelineContext context, builder.setSecondaryTimeout(tsdb.getConfig().getString( getConfigKey(SECONDARY_KEY))); } - if(Strings.isNullOrEmpty(cluster_config.getFail_on_any_error())) { + if(Strings.isNullOrEmpty(cluster_config.failOnAnyError())) { builder.setFailOnAnyError(tsdb.getConfig().getString( getConfigKey(FAIL_ON_ANY_ERROR))); }