Skip to content

Commit

Permalink
Fix subject redirection from message on large registries
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexisSouquiere committed Jan 17, 2024
1 parent 15e7ee1 commit 22d6a57
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 9 deletions.
4 changes: 2 additions & 2 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -603,9 +603,9 @@ class TopicData extends Root {
}

_redirectToSchema(id) {
const { selectedCluster } = this.state;
const { selectedCluster, selectedTopic } = this.state;

this.getApi(uriSchemaId(selectedCluster, id)).then(response => {
this.getApi(uriSchemaId(selectedCluster, id, selectedTopic)).then(response => {
if (response.data) {
this.props.history.push({
pathname: `/ui/${selectedCluster}/schema/details/${response.data.subject}`,
Expand Down
4 changes: 2 additions & 2 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ export const uriSchemaRegistry = (clusterId, search, pageNumber) => {
return `${apiUrl}/${clusterId}/schema?&search=${search}&page=${pageNumber}`;
};

export const uriSchemaId = (clusterId, id) => {
return `${apiUrl}/${clusterId}/schema/id/${id}`;
export const uriSchemaId = (clusterId, id, topic) => {
return `${apiUrl}/${clusterId}/schema/id/${id}?topic=${topic}`;
};

export const uriSchemaVersions = (clusterId, subject) => {
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/org/akhq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,19 @@ private Schema registerSchema(String cluster, @Body Schema schema) throws IOExce

@Get("api/{cluster}/schema/id/{id}")
@Operation(tags = {"schema registry"}, summary = "Find a schema by id")
public Schema redirectId(
public Schema getSubjectBySchemaIdAndTopic(
HttpRequest<?> request,
String cluster,
Integer id
) throws IOException, RestClientException, ExecutionException, InterruptedException {
Integer id,
@QueryValue String topic
) throws IOException, RestClientException {
// TODO Do the check on the subject name too
checkIfClusterAllowed(cluster);

return this.schemaRepository
.getById(cluster, id)
return this.schemaRepository.getSubjectsBySchemaId(cluster, id)
.stream()
.filter(s -> s.getSubject().contains(topic))
.findFirst()
.orElse(null);
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/models/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public class Schema {

private String exception;

public Schema(int schemaId, String subject, int version) {
this.id = schemaId;
this.subject = subject;
this.version = version;
}

public Schema(Schema schema, Schema.Config config) {
this.id = schema.id;
this.subject = schema.subject;
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/akhq/repositories/SchemaRegistryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ public boolean exist(String clusterId, String subject) throws IOException, RestC
return found;
}

public List<Schema> getSubjectsBySchemaId(String clusterId, int id) throws IOException, RestClientException {
Optional<RestService> maybeRegistryRestClient = Optional.ofNullable(kafkaModule
.getRegistryRestClient(clusterId));
if(maybeRegistryRestClient.isEmpty()){
return List.of();
}

return maybeRegistryRestClient.get()
.getAllVersionsById(id)
.stream()
.map(v -> new Schema(id, v.getSubject(), v.getVersion()))
.collect(Collectors.toList());
}

public Optional<Schema> getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException {
for (String subject: this.all(clusterId, Optional.empty(), List.of())) {
for (Schema version: this.getAllVersions(clusterId, subject)) {
Expand Down

0 comments on commit 22d6a57

Please sign in to comment.