Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Download a specific version of schemas by maven plugin #1701

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Mojo(name = "download")
public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
Expand All @@ -54,30 +52,39 @@ public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
@Parameter(required = true)
File outputDirectory;

Map<String, ParsedSchema> downloadSchemas(Collection<String> subjects)
Map<String, ParsedSchema> downloadSchemas(Collection<SubjectVersion> subjectVersions)
throws MojoExecutionException {
Map<String, ParsedSchema> results = new LinkedHashMap<>();

for (String subject : subjects) {
for (SubjectVersion subjectVersion : subjectVersions) {
SchemaMetadata schemaMetadata;
try {
getLog().info(String.format("Downloading latest metadata for %s.", subject));
schemaMetadata = this.client().getLatestSchemaMetadata(subject);
if(subjectVersion.version == -1) {
getLog().info(String.format("Downloading latest metadata for %s.", subjectVersion));
schemaMetadata = this.client().getLatestSchemaMetadata(subjectVersion.subject);
} else {
getLog().info(String.format("Downloading latest metadata for %s.", subjectVersion));
schemaMetadata = this.client().getSchemaMetadata(subjectVersion.subject, subjectVersion.version);
}
Optional<ParsedSchema> schema =
this.client().parseSchema(
schemaMetadata.getSchemaType(),
schemaMetadata.getSchema(),
schemaMetadata.getReferences());
if (schema.isPresent()) {
results.put(subject, schema.get());
results.put(subjectVersion.subject, schema.get());
} else {
throw new MojoExecutionException(
String.format("Error while parsing schema for %s", subject)
String.format("Error while parsing schema for %s", subjectVersion.subject)
);
}
} catch (Exception ex) {
throw new MojoExecutionException(
String.format("Exception thrown while downloading metadata for %s.", subject),
String.format(
"Exception thrown while downloading metadata for %s with version %s.",
subjectVersion.subject,
(subjectVersion.version == -1) ? "latest" : subjectVersion.version
),
ex
);
}
Expand Down Expand Up @@ -112,21 +119,6 @@ public void execute() throws MojoExecutionException, MojoFailureException {
throw new MojoExecutionException("Exception thrown while creating outputDirectory", ex);
}

List<Pattern> patterns = new ArrayList<>();

for (String subject : subjectPatterns) {
try {
getLog().debug(String.format("Creating pattern for '%s'", subject));
Pattern pattern = Pattern.compile(subject);
patterns.add(pattern);
} catch (Exception ex) {
throw new IllegalStateException(
String.format("Exception thrown while creating pattern '%s'", subject),
ex
);
}
}

Collection<String> allSubjects;
try {
getLog().info("Getting all subjects on schema registry...");
Expand All @@ -135,20 +127,20 @@ public void execute() throws MojoExecutionException, MojoFailureException {
throw new MojoExecutionException("Exception thrown", ex);
}

getLog().info(String.format("Schema Registry has %s subject(s).", allSubjects.size()));
Set<String> subjectsToDownload = new LinkedHashSet<>();

Set<SubjectVersion> subjectsToDownload = new LinkedHashSet<>();
for (String subject : allSubjects) {
for (Pattern pattern : patterns) {
getLog()
.debug(String.format("Checking '%s' against pattern '%s'", subject, pattern.pattern()));
Matcher matcher = pattern.matcher(subject);

if (matcher.matches()) {
for (String subjectPattern : subjectPatterns) {
int version = -1;
if(subjectPattern.indexOf(":") != -1) {
String[] patternAndVersion = subjectPattern.split(":");
subjectPattern = patternAndVersion[0];
version = Integer.valueOf(patternAndVersion[1]);
}
getLog().debug(String.format("Checking '%s' against pattern '%s'", subject, subjectPattern));
if(subject.matches(subjectPattern)) {
getLog().debug(String.format("'%s' matches pattern '%s' so downloading.", subject,
pattern.pattern()));
subjectsToDownload.add(subject);
break;
subjectPattern));
subjectsToDownload.add(new SubjectVersion(subject, version));
}
}
}
Expand Down Expand Up @@ -192,4 +184,5 @@ private String getExtension(ParsedSchema parsedSchema) {
return ".txt";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.confluent.kafka.schemaregistry.maven;

public class SubjectVersion {
public final String subject;
public final int version;

public SubjectVersion(String subject, int version) {
this.subject = subject;
this.version = version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.plugin.MojoFailureException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -28,6 +31,8 @@
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;

public class DownloadSchemaRegistryMojoTest extends SchemaRegistryTest {
DownloadSchemaRegistryMojo mojo;

Expand All @@ -38,12 +43,13 @@ public void createMojo() {
}

@Test
public void specificSubjects() throws IOException, RestClientException {
int version = 1;

List<File> files = new ArrayList<>();
public void specificSubjects() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
this.mojo.subjectPatterns.clear();

List<File> filesToDownload = new ArrayList<>();
List<File> filesNotToDownload = new ArrayList<>();

for (int i = 0; i < 100; i++) {
String keySubject = String.format("TestSubject%03d-key", i);
String valueSubject = String.format("TestSubject%03d-value", i);
Expand All @@ -55,12 +61,62 @@ public void specificSubjects() throws IOException, RestClientException {
File valueSchemaFile = new File(this.tempDirectory, valueSubject + ".avsc");

if (i % 10 == 0) {
String subjectPattern = String.format("^TestSubject%03d-(Key|Value)$", i);
files.add(keySchemaFile);
files.add(valueSchemaFile);
String subjectPattern = String.format("^TestSubject%03d-(key|value)$", i);
filesToDownload.add(keySchemaFile);
filesToDownload.add(valueSchemaFile);
this.mojo.subjectPatterns.add(subjectPattern);
} else {
filesNotToDownload.add(keySchemaFile);
filesNotToDownload.add(valueSchemaFile);
}
}

this.mojo.execute();

for (File file : filesToDownload) {
Assert.assertThat(file.exists(), is(true));
}
for (File file : filesNotToDownload) {
Assert.assertThat(file.exists(), is(false));
}
}

@Test
public void specificSubjectAndVersion() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
this.mojo.subjectPatterns.clear();

String valueSubject = "TestSubject-value";
Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchema), 1, 101);

Schema valueSchemaUpdated = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchemaUpdated), 2, 102);

File valueSchemaFile = new File(this.tempDirectory, valueSubject + ".avsc");

String subjectPattern = "^TestSubject-(key|value)$:2";
this.mojo.subjectPatterns.add(subjectPattern);

this.mojo.execute();

Assert.assertThat(valueSchemaFile.exists(), is(true));
Assert.assertThat(new Schema.Parser().parse(valueSchemaFile), is(valueSchemaUpdated));
}

@Test(expected = MojoExecutionException.class)
public void specificSubjectAndVersionNotFound() throws IOException, RestClientException, MojoFailureException, MojoExecutionException {
this.mojo.outputDirectory = this.tempDirectory;
this.mojo.subjectPatterns.clear();

String valueSubject = "TestSubject-value";
Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchema), 1, 101);

String subjectPattern = "^TestSubject-(key|value)$:9999";
this.mojo.subjectPatterns.add(subjectPattern);

this.mojo.execute();
}

}