Skip to content

Commit

Permalink
Fix deadlock in registry when performing nested meter registration
Browse files Browse the repository at this point in the history
* Added timerPercentilesMax and timerPercentilesMin to Spectator-based registries.
  • Loading branch information
jkschneider committed Sep 14, 2017
1 parent 44ec768 commit 2651452
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class AbstractMeterRegistry implements MeterRegistry {
*/
private final List<Tag> commonTags = new ArrayList<>();

private final ConcurrentMap<Meter.Id, Meter> meterMap = new ConcurrentHashMap<>();
private final Map<Meter.Id, Meter> meterMap = new HashMap<>();

/**
* We'll use snake case as a general-purpose default for registries because it is the most
Expand Down Expand Up @@ -114,26 +114,28 @@ protected String getConventionName(Meter.Id id) {

@Override
public Meter register(Meter.Id id, Meter.Type type, Iterable<Measurement> measurements) {
return meterMap.computeIfAbsent(id, id2 -> {
id2.setType(type);
newMeter(id2, type, measurements);
return new Meter() {
@Override
public Id getId() {
return id2;
}

@Override
public Type getType() {
return type;
}

@Override
public Iterable<Measurement> measure() {
return measurements;
}
};
});
synchronized (meterMap) {
return meterMap.computeIfAbsent(id, id2 -> {
id2.setType(type);
newMeter(id2, type, measurements);
return new Meter() {
@Override
public Id getId() {
return id2;
}

@Override
public Type getType() {
return type;
}

@Override
public Iterable<Measurement> measure() {
return measurements;
}
};
});
}
}

@Override
Expand Down Expand Up @@ -278,28 +280,30 @@ public Optional<Meter> meter() {

@Override
public Collection<Meter> meters() {
return meterMap.keySet().stream()
.filter(id -> id.getName().equals(name))
.filter(id -> {
if(tags.isEmpty())
return true;
List<Tag> idTags = new ArrayList<>();
id.getTags().forEach(idTags::add);
return idTags.containsAll(tags);
})
.map(meterMap::get)
.filter(m -> {
if(valueAsserts.isEmpty())
return true;
for (Measurement measurement : m.measure()) {
if (valueAsserts.containsKey(measurement.getStatistic()) &&
Math.abs(valueAsserts.get(measurement.getStatistic()) - measurement.getValue()) > 1e-7) {
return false;
synchronized (meterMap) {
return meterMap.keySet().stream()
.filter(id -> id.getName().equals(name))
.filter(id -> {
if (tags.isEmpty())
return true;
List<Tag> idTags = new ArrayList<>();
id.getTags().forEach(idTags::add);
return idTags.containsAll(tags);
})
.map(meterMap::get)
.filter(m -> {
if (valueAsserts.isEmpty())
return true;
for (Measurement measurement : m.measure()) {
if (valueAsserts.containsKey(measurement.getStatistic()) &&
Math.abs(valueAsserts.get(measurement.getStatistic()) - measurement.getValue()) > 1e-7) {
return false;
}
}
}
return true;
})
.collect(Collectors.toList());
return true;
})
.collect(Collectors.toList());
}
}
}

Expand All @@ -314,12 +318,12 @@ public Collection<Meter> getMeters() {
}

private <M extends Meter> M registerMeterIfNecessary(Class<M> meterClass, Meter.Id id, Function<Meter.Id, Meter> builder) {
synchronized (meterMap) {
// If the id is coming down from a composite registry it will already have the common tags of the composite.
// This adds common tags of the registry within the composite.
MeterId idWithCommonTags = new MeterId(id.getName(), Tags.concat(id.getTags(), config().commonTags()),
id.getBaseUnit(), id.getDescription());
// If the id is coming down from a composite registry it will already have the common tags of the composite.
// This adds common tags of the registry within the composite.
MeterId idWithCommonTags = new MeterId(id.getName(), Tags.concat(id.getTags(), config().commonTags()),
id.getBaseUnit(), id.getDescription());

synchronized (meterMap) {
Meter m = meterMap.computeIfAbsent(idWithCommonTags, builder);
if (!meterClass.isInstance(m)) {
throw new IllegalArgumentException("There is already a registered meter of a different type with the same name");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.micrometer.core.instrument.spectator;

import com.netflix.spectator.api.RegistryConfig;

import java.time.Duration;

public interface SpectatorConf extends RegistryConfig {
/**
* Property prefix to prepend to configuration names.
*/
String prefix();

/**
* A bucket filter clamping the bucket domain of timer percentiles histograms to some max value.
* This is used to limit the number of buckets shipped to save on storage.
*/
default Duration timerPercentilesMax() {
String v = get(prefix() + ".timerPercentilesMax");
return v == null ? Duration.ofMinutes(2) : Duration.parse(v);
}

/**
* A bucket filter clamping the bucket domain of timer percentiles histograms to some min value.
* This is used to limit the number of buckets shipped to save on storage.
*/
default Duration timerPercentilesMin() {
String v = get(prefix() + ".timerPercentilesMin");
return v == null ? Duration.ofMillis(10) : Duration.parse(v);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.netflix.spectator.api.Measurement;
import com.netflix.spectator.api.Registry;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.stats.hist.Bucket;
import io.micrometer.core.instrument.stats.hist.Histogram;
import io.micrometer.core.instrument.spectator.step.StepRegistryConfig;
import io.micrometer.core.instrument.stats.hist.*;
import io.micrometer.core.instrument.stats.quantile.Quantiles;
import io.micrometer.core.instrument.util.TimeUtils;

Expand All @@ -39,10 +39,12 @@
*/
public abstract class SpectatorMeterRegistry extends AbstractMeterRegistry {
private final Registry registry;
private final SpectatorConf spectatorConf;

public SpectatorMeterRegistry(Registry registry, Clock clock) {
public SpectatorMeterRegistry(SpectatorConf config, Registry registry, Clock clock) {
super(clock);
this.registry = registry;
this.spectatorConf = config;
}

protected Collection<com.netflix.spectator.api.Tag> toSpectatorTags(Iterable<io.micrometer.core.instrument.Tag> tags) {
Expand Down Expand Up @@ -89,6 +91,17 @@ protected <T> io.micrometer.core.instrument.Gauge newGauge(Meter.Id id, T obj, T
protected Histogram<?> registerHistogramCounterIfNecessary(Meter.Id id, Histogram.Builder<?> histogramBuilder) {
if (histogramBuilder != null) {
Histogram<?> hist = histogramBuilder.create(Histogram.Summation.Normal);

if(hist instanceof PercentileHistogram || hist instanceof PercentileTimeHistogram) {
@SuppressWarnings("unchecked") Histogram<Double> percentileHist = (Histogram<Double>) hist;

double max = (double) spectatorConf.timerPercentilesMax().toNanos();
double min = (double) spectatorConf.timerPercentilesMin().toNanos();

percentileHist.filterBuckets(BucketFilter.clampMax(max));
percentileHist.filterBuckets(BucketFilter.clampMin(min));
}

for (Bucket<?> bucket : hist.getBuckets()) {
more().counter(createId(id.getName(), Tags.concat(id.getTags(), "bucket", bucket.getTagString()), null),
bucket, Bucket::getValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micrometer.core.instrument.spectator.step;

import com.netflix.spectator.api.RegistryConfig;
import io.micrometer.core.instrument.spectator.SpectatorConf;

import java.time.Duration;

Expand All @@ -25,12 +25,7 @@
*
* @author Jon Schneider
*/
public interface StepRegistryConfig extends RegistryConfig {
/**
* Property prefix to prepend to configuration names.
*/
String prefix();

public interface StepRegistryConfig extends SpectatorConf {
/**
* Returns the step size (reporting frequency) to use. The default is 10 seconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
public abstract class StepSpectatorMeterRegistry extends SpectatorMeterRegistry {
private long stepMillis;

public StepSpectatorMeterRegistry(Registry registry, Clock clock, long stepMillis) {
super(registry, clock);
public StepSpectatorMeterRegistry(StepRegistryConfig config, Registry registry, Clock clock, long stepMillis) {
super(config, registry, clock);
this.stepMillis = stepMillis;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ class SpectatorMeterRegistryTest {
@DisplayName("quantiles are registered as a separate gauge")
@Test
void quantiles() {
SpectatorMeterRegistry registry = new SpectatorMeterRegistry(new DefaultRegistry(), Clock.SYSTEM) {};
SpectatorMeterRegistry registry = new SpectatorMeterRegistry(null, new DefaultRegistry(), Clock.SYSTEM) {};
Registry spectatorRegistry = registry.getSpectatorRegistry();

Timer timer = Timer.builder("timer")
.quantiles(GKQuantiles.quantiles(0.5, 0.95).create())
.register(registry);
.quantiles(GKQuantiles.quantiles(0.5, 0.95).create())
.register(registry);

timer.record(100, TimeUnit.MILLISECONDS);

DistributionSummary.builder("ds")
.quantiles(GKQuantiles.quantiles(0.5).create())
.register(registry);
.quantiles(GKQuantiles.quantiles(0.5).create())
.register(registry);

assertThat(spectatorRegistry).haveAtLeastOne(withNameAndQuantile("timer"));
assertThat(spectatorRegistry).haveAtLeastOne(withNameAndQuantile("ds"));

assertThat(spectatorRegistry).haveAtLeast(2,
new Condition<>(m -> quantilePredicate("timer").test(m.id()) && m.measure().iterator().next().value() != Double.NaN,
"a meter with at least two quantiles where both quantiles have a value"));
new Condition<>(m -> quantilePredicate("timer").test(m.id()) && m.measure().iterator().next().value() != Double.NaN,
"a meter with at least two quantiles where both quantiles have a value"));
}

private Condition<Meter> withNameAndQuantile(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micrometer.spring.samples;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication(scanBasePackages = "io.micrometer.spring.samples.components")
@EnableScheduling
public class InfluxSample {
public static void main(String[] args) {
new SpringApplicationBuilder(AtlasSample.class).profiles("influx").run(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright 2017 Pivotal Software, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

metrics:
influx:
enabled: true
step: PT10S
readTimeout: PT30S
batchSize: 20000
Empty file modified scripts/influx.sh
100644 → 100755
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class AtlasMeterRegistry extends StepSpectatorMeterRegistry {
public AtlasMeterRegistry(AtlasConfig config, Clock clock) {
// The Spectator Atlas registry will do tag formatting for us, so we'll just pass through
// tag keys and values with the identity formatter.
super(new AtlasRegistry(new com.netflix.spectator.api.Clock() {
super(null, new AtlasRegistry(new com.netflix.spectator.api.Clock() {
@Override
public long wallTime() {
return clock.wallTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class DatadogMeterRegistry extends StepSpectatorMeterRegistry {
public DatadogMeterRegistry(DatadogConfig config, Clock clock) {
super(new DatadogRegistry(config, new com.netflix.spectator.api.Clock() {
super(config, new DatadogRegistry(config, new com.netflix.spectator.api.Clock() {
@Override
public long wallTime() {
return clock.wallTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

public class InfluxMeterRegistry extends StepSpectatorMeterRegistry {
public InfluxMeterRegistry(InfluxConfig config, Clock clock) {
super(new InfluxRegistry(config, new com.netflix.spectator.api.Clock() {
super(config, new InfluxRegistry(config, new com.netflix.spectator.api.Clock() {
@Override
public long wallTime() {
return clock.wallTime();
Expand Down

0 comments on commit 2651452

Please sign in to comment.