diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java b/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java index 8ea795471d5..aefa293541b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java @@ -66,6 +66,9 @@ public class MetricsContainer { public static final String METRIC_NAME_JSON_SCHEMAS_DELETED = "json-schemas-deleted"; public static final String METRIC_NAME_PB_SCHEMAS_CREATED = "protobuf-schemas-created"; public static final String METRIC_NAME_PB_SCHEMAS_DELETED = "protobuf-schemas-deleted"; + public static final String METRIC_NAME_SCHEMA_CACHE_SIZE = "schema-cache-size"; + public static final String METRIC_NAME_SCHEMA_CACHE_HIT = "schema-cache-hit"; + public static final String METRIC_NAME_SCHEMA_CACHE_MISS = "schema-cache-miss"; private final Metrics metrics; private final Map configuredTags; @@ -86,6 +89,9 @@ public class MetricsContainer { private final SchemaRegistryMetric avroSchemasDeleted; private final SchemaRegistryMetric jsonSchemasDeleted; private final SchemaRegistryMetric protobufSchemasDeleted; + private final SchemaRegistryMetric schemaCacheHit; + private final SchemaRegistryMetric schemaCacheMiss; + private final SchemaRegistryMetric schemaCacheSize; private final MetricsContext metricsContext; @@ -146,6 +152,23 @@ public MetricsContainer(SchemaRegistryConfig config, String kafkaClusterId) { this.protobufSchemasDeleted = createMetric(METRIC_NAME_PB_SCHEMAS_DELETED, "Number of deleted Protobuf schemas", new CumulativeCount()); + + this.schemaCacheHit = createMetric( + METRIC_NAME_SCHEMA_CACHE_HIT, + "Number of times the local schema cache has been hit", + new CumulativeCount() + ); + + this.schemaCacheMiss = createMetric( + METRIC_NAME_SCHEMA_CACHE_MISS, + "Number of times the local schema cache has been missed", + new CumulativeCount() + ); + + this.schemaCacheSize = createMetric( + METRIC_NAME_SCHEMA_CACHE_SIZE, + "Size of the local schema cache", + new CumulativeCount()); } public Metrics getMetrics() { diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java index f3b4460b772..61bc3165ed5 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java @@ -165,6 +165,18 @@ public class SchemaRegistryConfig extends RestConfig { public static final String COMPATIBILITY_CONFIG = "avro.compatibility.level"; public static final String SCHEMA_COMPATIBILITY_CONFIG = "schema.compatibility.level"; + /** + * schema.cache.maximum.weight + */ + public static final String SCHEMA_CACHE_MAXIMUM_WEIGHT_CONFIG = "schema.cache.maximum.weight"; + public static final int SCHEMA_CACHE_MAXIMUM_WEIGHT_DEFAULT = 1000000; + + /** + * schema.cache.use.weight + */ + public static final String SCHEMA_CACHE_USE_WEIGHT_CONFIG = "schema.cache.use.weight"; + public static final boolean SCHEMA_CACHE_USE_WEIGHT_DEFAULT = false; + /** * schema.cache.size */ @@ -322,6 +334,10 @@ public class SchemaRegistryConfig extends RestConfig { "The expiration in seconds for entries accessed in the cache."; protected static final String SCHEMA_CANONICALIZE_ON_CONSUME_DOC = "A list of schema types to canonicalize on consume, to be used if canonicalization changes."; + protected static final String SCHEMA_CACHE_MAXIMUM_WEIGHT_DOC = + "The maximum weight of the schema cache."; + protected static final String SCHEMA_CACHE_USE_WEIGHT_DOC = + "Whether to use weight or size based local schema cache."; protected static final String METADATA_ENCODER_SECRET_DOC = "The secret used to encrypt and decrypt encoder keysets. " + "Use a random string with high entropy."; @@ -513,6 +529,14 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0), .define(SCHEMA_CANONICALIZE_ON_CONSUME_CONFIG, ConfigDef.Type.LIST, "", ConfigDef.Importance.LOW, SCHEMA_CANONICALIZE_ON_CONSUME_DOC ) + .define(SCHEMA_CACHE_MAXIMUM_WEIGHT_CONFIG, + ConfigDef.Type.INT, SCHEMA_CACHE_MAXIMUM_WEIGHT_DEFAULT, + ConfigDef.Importance.LOW, SCHEMA_CACHE_MAXIMUM_WEIGHT_DOC + ) + .define(SCHEMA_CACHE_USE_WEIGHT_CONFIG, + ConfigDef.Type.BOOLEAN, SCHEMA_CACHE_USE_WEIGHT_DEFAULT, + ConfigDef.Importance.LOW, SCHEMA_CACHE_USE_WEIGHT_DOC + ) .define(METADATA_ENCODER_SECRET_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, METADATA_ENCODER_SECRET_DOC ) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 3dbbd37b5db..d994893e35b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -66,6 +66,7 @@ import io.confluent.rest.RestConfig; import io.confluent.rest.exceptions.RestException; import io.confluent.rest.NamedURI; +import com.google.common.cache.Weigher; import java.util.Map; import java.util.List; @@ -188,21 +189,42 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, this.groupId = config.getString(SchemaRegistryConfig.SCHEMAREGISTRY_GROUP_ID_CONFIG); this.metricsContainer = new MetricsContainer(config, this.kafkaClusterId); this.providers = initProviders(config); - this.schemaCache = CacheBuilder.newBuilder() - .maximumSize(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_SIZE_CONFIG)) + this.schemaCache = this.buildLocalSchemaCache(); + this.lookupCache = lookupCache(); + this.idGenerator = identityGenerator(config); + this.kafkaStore = kafkaStore(config); + this.metadataEncoder = new MetadataEncoderService(this); + this.ruleSetHandler = new RuleSetHandler(); + } + + private LoadingCache buildLocalSchemaCache() { + if (config.getBoolean(SchemaRegistryConfig.SCHEMA_CACHE_USE_WEIGHT_CONFIG)) { + return CacheBuilder.newBuilder() + .maximumWeight(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_MAXIMUM_WEIGHT_CONFIG)) + .weigher((Weigher) (k, v) -> v.canonicalString().length()) .expireAfterAccess( - config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), TimeUnit.SECONDS) + config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), + TimeUnit.SECONDS + ) .build(new CacheLoader() { @Override public ParsedSchema load(RawSchema s) throws Exception { return loadSchema(s.getSchema(), s.isNew(), s.isNormalize()); } }); - this.lookupCache = lookupCache(); - this.idGenerator = identityGenerator(config); - this.kafkaStore = kafkaStore(config); - this.metadataEncoder = new MetadataEncoderService(this); - this.ruleSetHandler = new RuleSetHandler(); + } + return CacheBuilder.newBuilder() + .maximumSize(config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_SIZE_CONFIG)) + .expireAfterAccess( + config.getInt(SchemaRegistryConfig.SCHEMA_CACHE_EXPIRY_SECS_CONFIG), + TimeUnit.SECONDS + ) + .build(new CacheLoader() { + @Override + public ParsedSchema load(RawSchema s) throws Exception { + return loadSchema(s.getSchema(), s.isNew(), s.isNormalize()); + } + }); } private Map initProviders(SchemaRegistryConfig config) { diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java index ea73f05a32d..13a0d9d02b0 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java @@ -33,6 +33,9 @@ import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_DELETED_COUNT; import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_MASTER_SLAVE_ROLE; import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_REGISTERED_COUNT; +import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_SCHEMA_CACHE_SIZE; +import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_SCHEMA_CACHE_HIT; +import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_SCHEMA_CACHE_MISS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -96,6 +99,45 @@ public void testSchemaCreatedCount() throws Exception { assertEquals((double) schemaCount, mBeanServer.getAttribute(avroDeleted, METRIC_NAME_AVRO_SCHEMAS_DELETED)); } + @Test + public void testLocalSchemaCacheMetrics() throws Exception { + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName schemaCacheSize = + new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_SCHEMA_CACHE_SIZE); + ObjectName schemaCacheHit = + new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_SCHEMA_CACHE_HIT); + ObjectName schemaCacheMiss = + new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_SCHEMA_CACHE_MISS); + + RestService service = restApp.restClient; + String subject = "testTopic1"; + int schemaCount = 3; + List schemas = TestUtils.getRandomCanonicalAvroString(schemaCount); + + // test registering and verifying new schemas in subject1 + int schemaIdCounter = 1; + for (int i = 0; i < schemaCount; i++) { + String schema = schemas.get(i); + TestUtils.registerAndVerifySchema(service, schema, schemaIdCounter++, subject); + } + + // Re-registering schemas should only increase cache hits. + for (int i = 0; i < schemaCount; i++) { + String schemaString = schemas.get(i); + service.registerSchema(schemaString, subject); + } + + // Deleting schemas should only increase cache hits. + for (Integer i = 1; i < schemaIdCounter; i++) { + assertEquals(i, service.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, + subject, i.toString())); + } + + assertEquals((double) schemaCount, mBeanServer.getAttribute(schemaCacheSize, METRIC_NAME_REGISTERED_COUNT)); + assertEquals((double) schemaCount, mBeanServer.getAttribute(schemaCacheMiss, METRIC_NAME_AVRO_SCHEMAS_CREATED)); + assertEquals((double) schemaCount * 3, mBeanServer.getAttribute(schemaCacheHit, METRIC_NAME_DELETED_COUNT)); + } + @Test public void testApiCallMetrics() throws Exception { MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();