diff --git a/maven-plugin/pom.xml b/maven-plugin/pom.xml
index 0c01c63c11e..b758020ceef 100644
--- a/maven-plugin/pom.xml
+++ b/maven-plugin/pom.xml
@@ -72,6 +72,14 @@
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 11
+
+
diff --git a/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojo.java b/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojo.java
index 4559f10d9e6..221a6995bf9 100644
--- a/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojo.java
+++ b/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojo.java
@@ -39,8 +39,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
@Mojo(name = "download")
public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
@@ -54,30 +52,39 @@ public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
@Parameter(required = true)
File outputDirectory;
- Map downloadSchemas(Collection subjects)
+ Map downloadSchemas(Collection subjectVersions)
throws MojoExecutionException {
Map results = new LinkedHashMap<>();
- for (String subject : subjects) {
+ for (SubjectVersion subjectVersion : subjectVersions) {
SchemaMetadata schemaMetadata;
try {
- getLog().info(String.format("Downloading latest metadata for %s.", subject));
- schemaMetadata = this.client().getLatestSchemaMetadata(subject);
+ if(subjectVersion.version == -1) {
+ getLog().info(String.format("Downloading latest metadata for %s.", subjectVersion));
+ schemaMetadata = this.client().getLatestSchemaMetadata(subjectVersion.subject);
+ } else {
+ getLog().info(String.format("Downloading latest metadata for %s.", subjectVersion));
+ schemaMetadata = this.client().getSchemaMetadata(subjectVersion.subject, subjectVersion.version);
+ }
Optional schema =
this.client().parseSchema(
schemaMetadata.getSchemaType(),
schemaMetadata.getSchema(),
schemaMetadata.getReferences());
if (schema.isPresent()) {
- results.put(subject, schema.get());
+ results.put(subjectVersion.subject, schema.get());
} else {
throw new MojoExecutionException(
- String.format("Error while parsing schema for %s", subject)
+ String.format("Error while parsing schema for %s", subjectVersion.subject)
);
}
} catch (Exception ex) {
throw new MojoExecutionException(
- String.format("Exception thrown while downloading metadata for %s.", subject),
+ String.format(
+ "Exception thrown while downloading metadata for %s with version %s.",
+ subjectVersion.subject,
+ (subjectVersion.version == -1) ? "latest" : subjectVersion.version
+ ),
ex
);
}
@@ -112,21 +119,6 @@ public void execute() throws MojoExecutionException, MojoFailureException {
throw new MojoExecutionException("Exception thrown while creating outputDirectory", ex);
}
- List patterns = new ArrayList<>();
-
- for (String subject : subjectPatterns) {
- try {
- getLog().debug(String.format("Creating pattern for '%s'", subject));
- Pattern pattern = Pattern.compile(subject);
- patterns.add(pattern);
- } catch (Exception ex) {
- throw new IllegalStateException(
- String.format("Exception thrown while creating pattern '%s'", subject),
- ex
- );
- }
- }
-
Collection allSubjects;
try {
getLog().info("Getting all subjects on schema registry...");
@@ -135,20 +127,20 @@ public void execute() throws MojoExecutionException, MojoFailureException {
throw new MojoExecutionException("Exception thrown", ex);
}
- getLog().info(String.format("Schema Registry has %s subject(s).", allSubjects.size()));
- Set subjectsToDownload = new LinkedHashSet<>();
-
+ Set subjectsToDownload = new LinkedHashSet<>();
for (String subject : allSubjects) {
- for (Pattern pattern : patterns) {
- getLog()
- .debug(String.format("Checking '%s' against pattern '%s'", subject, pattern.pattern()));
- Matcher matcher = pattern.matcher(subject);
-
- if (matcher.matches()) {
+ for (String subjectPattern : subjectPatterns) {
+ int version = -1;
+ if(subjectPattern.indexOf(":") != -1) {
+ String[] patternAndVersion = subjectPattern.split(":");
+ subjectPattern = patternAndVersion[0];
+ version = Integer.valueOf(patternAndVersion[1]);
+ }
+ getLog().debug(String.format("Checking '%s' against pattern '%s'", subject, subjectPattern));
+ if(subject.matches(subjectPattern)) {
getLog().debug(String.format("'%s' matches pattern '%s' so downloading.", subject,
- pattern.pattern()));
- subjectsToDownload.add(subject);
- break;
+ subjectPattern));
+ subjectsToDownload.add(new SubjectVersion(subject, version));
}
}
}
@@ -192,4 +184,5 @@ private String getExtension(ParsedSchema parsedSchema) {
return ".txt";
}
}
+
}
diff --git a/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/SubjectVersion.java b/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/SubjectVersion.java
new file mode 100644
index 00000000000..a3d2297f756
--- /dev/null
+++ b/maven-plugin/src/main/java/io/confluent/kafka/schemaregistry/maven/SubjectVersion.java
@@ -0,0 +1,11 @@
+package io.confluent.kafka.schemaregistry.maven;
+
+public class SubjectVersion {
+ public final String subject;
+ public final int version;
+
+ public SubjectVersion(String subject, int version) {
+ this.subject = subject;
+ this.version = version;
+ }
+}
diff --git a/maven-plugin/src/test/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojoTest.java b/maven-plugin/src/test/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojoTest.java
index 29b4694e76f..acba1287331 100644
--- a/maven-plugin/src/test/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojoTest.java
+++ b/maven-plugin/src/test/java/io/confluent/kafka/schemaregistry/maven/DownloadSchemaRegistryMojoTest.java
@@ -27,10 +27,12 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
public class DownloadSchemaRegistryMojoTest extends SchemaRegistryTest {
@@ -45,10 +47,10 @@ public void createMojo() {
@Test
public void specificSubjects() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
+ this.mojo.subjectPatterns.clear();
List filesToDownload = new ArrayList<>();
List filesNotToDownload = new ArrayList<>();
- this.mojo.subjectPatterns.clear();
for (int i = 0; i < 100; i++) {
String keySubject = String.format("TestSubject%03d-key", i);
@@ -70,13 +72,53 @@ public void specificSubjects() throws IOException, RestClientException, MojoFail
filesNotToDownload.add(valueSchemaFile);
}
}
+
this.mojo.execute();
- for (File file: filesToDownload) {
+
+ for (File file : filesToDownload) {
Assert.assertThat(file.exists(), is(true));
}
- for (File file: filesNotToDownload) {
+ for (File file : filesNotToDownload) {
Assert.assertThat(file.exists(), is(false));
}
}
+ @Test
+ public void specificSubjectAndVersion() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
+ this.mojo.outputDirectory = this.tempDirectory;
+ this.mojo.subjectPatterns.clear();
+
+ String valueSubject = "TestSubject-value";
+ Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
+ this.mojo.client().register(valueSubject, new AvroSchema(valueSchema), 1, 101);
+
+ Schema valueSchemaUpdated = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL)));
+ this.mojo.client().register(valueSubject, new AvroSchema(valueSchemaUpdated), 2, 102);
+
+ File valueSchemaFile = new File(this.tempDirectory, valueSubject + ".avsc");
+
+ String subjectPattern = "^TestSubject-(key|value)$:2";
+ this.mojo.subjectPatterns.add(subjectPattern);
+
+ this.mojo.execute();
+
+ Assert.assertThat(valueSchemaFile.exists(), is(true));
+ Assert.assertThat(new Schema.Parser().parse(valueSchemaFile), is(valueSchemaUpdated));
+ }
+
+ @Test(expected = MojoExecutionException.class)
+ public void specificSubjectAndVersionNotFound() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
+ this.mojo.outputDirectory = this.tempDirectory;
+ this.mojo.subjectPatterns.clear();
+
+ String valueSubject = "TestSubject-value";
+ Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
+ this.mojo.client().register(valueSubject, new AvroSchema(valueSchema), 1, 101);
+
+ String subjectPattern = "^TestSubject-(key|value)$:9999";
+ this.mojo.subjectPatterns.add(subjectPattern);
+
+ this.mojo.execute();
+ }
+
}