Skip to content

Commit

Permalink
Feat/asyncapi 3: more fixes (#550)
Browse files Browse the repository at this point in the history
* chore: typo

* fix: correct Listener=receive, Publisher=send operationAction

* feat(kafka): handle kafka message binding key

* chore: update example json

* feat(core): all operation include the action in the id

refactor: rename operationName to operationId

* feat(cloud-stream): align asyncapi.json structure

* feat(ui): show message when bindings are missing

* chore: resolve open FIXME

* chore: fix dependencies
  • Loading branch information
timonback committed Jan 19, 2024
1 parent 1f373d0 commit 0fe6a84
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public AsyncOperation getAsyncOperation(AsyncListener annotation) {

@Override
public OperationAction getOperationType() {
return OperationAction.SEND;
return OperationAction.RECEIVE;
}
};
}
Expand All @@ -142,7 +142,7 @@ public AsyncOperation getAsyncOperation(AsyncPublisher annotation) {

@Override
public OperationAction getOperationType() {
return OperationAction.RECEIVE;
return OperationAction.SEND;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static Map<String, ChannelObject> mergeChannels(List<Map.Entry<String, Ch
* Messages within operations are merged
*
* @param operationEntries Ordered pairs of operation name to Operation
* @return A map of operationName to a single Operation
* @return A map of operationId to a single Operation
*/
public static Map<String, Operation> mergeOperations(List<Map.Entry<String, Operation>> operationEntries) {
Map<String, Operation> mergedOperations = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ private Map.Entry<String, ChannelObject> buildChannel(MethodAndAnnotation<A> met
private Map.Entry<String, Operation> buildOperation(MethodAndAnnotation<A> methodAndAnnotation) {
AsyncOperation operationAnnotation =
this.asyncAnnotationProvider.getAsyncOperation(methodAndAnnotation.annotation());
String operationName = resolver.resolveStringValue(operationAnnotation.channelName());
String channelName = resolver.resolveStringValue(operationAnnotation.channelName());
String operationId = channelName + "_" + this.asyncAnnotationProvider.getOperationType().type + "_"
+ methodAndAnnotation.method.getName();

Operation operation = buildOperation(operationAnnotation, methodAndAnnotation.method(), operationName);
Operation operation = buildOperation(operationAnnotation, methodAndAnnotation.method(), channelName);
operation.setAction(this.asyncAnnotationProvider.getOperationType());

return Map.entry(operationName, operation);
return Map.entry(operationId, operation);
}

private Operation buildOperation(AsyncOperation asyncOperation, Method method, String channelName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static Map<String, MessageBinding> processMessageBindingFromAnnotation(
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(
ProcessedMessageBinding::getType, ProcessedMessageBinding::getBinding, (e1, e2) -> e2));
ProcessedMessageBinding::getType, ProcessedMessageBinding::getBinding, (e1, e2) -> e1));
}

public static void processAsyncMessageAnnotation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private Stream<Map.Entry<String, Operation>> mapClassToOperation(Class<?> compon
}

String channelName = bindingFactory.getChannelName(classAnnotation);
String operationId = channelName + "_receive_" + component.getSimpleName();
String operationId = channelName + "_" + OperationAction.RECEIVE + "_" + component.getSimpleName();

Operation operation = buildOperation(classAnnotation, annotatedMethods);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Map.Entry<String, Operation> mapMethodToOperation(Method method) {
MethodAnnotation annotation = AnnotationUtil.findAnnotationOrThrow(methodAnnotationClass, method);

String channelName = bindingFactory.getChannelName(annotation);
String operationId = channelName + "_receive_" + method.getName();
String operationId = channelName + "_" + OperationAction.RECEIVE + "_" + method.getName();
Class<?> payload = payloadClassExtractor.extractFrom(method);

Operation operation = buildOperation(annotation, payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* Internal interface to allow post-processing of a new schema (and their definition) after detection.
* <br/>
* It is closely coupled with the data structure of the SchemaService.
* It is closely coupled with the data structure of the SchemasService.
*/
public interface SchemasPostProcessor {
void process(Schema schema, Map<String, Schema> definitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ private AsyncAPI getAsyncAPITestObject() {

Operation newUserOperation = Operation.builder()
.action(OperationAction.SEND)
// FIXME: Generate Ref from Channel Instance
.channel(ChannelReference.builder().ref("#/channels/new-user").build())
.channel(ChannelReference.fromChannel("new-user"))
.messages(List.of(MessageReference.toChannelMessage("new-user", message.getName())))
.bindings(Map.of("kafka", operationBinding))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ void shouldNotMergeDifferentChannelNames() {
}

@Test
void shouldNotMergeDifferentOperationNames() {
void shouldNotMergeDifferentoperationIds() {
// given
String operationName1 = "operation1";
String operationName2 = "operation2";
String operationId1 = "operation1";
String operationId2 = "operation2";
Operation publisherOperation = Operation.builder().build();
Operation subscriberOperation = Operation.builder().build();

// when
Map<String, Operation> mergedOperations = ChannelMerger.mergeOperations(Arrays.asList(
Map.entry(operationName1, publisherOperation), Map.entry(operationName2, subscriberOperation)));
Map.entry(operationId1, publisherOperation), Map.entry(operationId2, subscriberOperation)));

// then
assertThat(mergedOperations).hasSize(2);
Expand All @@ -66,9 +66,9 @@ void shouldMergeEqualChannelNamesIntoOneChannel() {
}

@Test
void shouldMergeEqualOperationNamesIntoOneOperation() {
void shouldMergeEqualoperationIdsIntoOneOperation() {
// given
String operationName = "operation";
String operationId = "operation";
Operation publishOperation = Operation.builder()
.action(OperationAction.SEND)
.title("publisher")
Expand All @@ -79,8 +79,8 @@ void shouldMergeEqualOperationNamesIntoOneOperation() {
.build();

// when
Map<String, Operation> mergedOperations = ChannelMerger.mergeOperations(Arrays.asList(
Map.entry(operationName, publishOperation), Map.entry(operationName, subscribeOperation)));
Map<String, Operation> mergedOperations = ChannelMerger.mergeOperations(
Arrays.asList(Map.entry(operationId, publishOperation), Map.entry(operationId, subscribeOperation)));

// then
assertThat(mergedOperations).hasSize(1);
Expand Down Expand Up @@ -108,18 +108,18 @@ void shouldUseFirstChannelFound() {
@Test
void shouldUseFirstOperationFound() {
// given
String operationName = "operation";
String operationId = "operation";
Operation senderOperation =
Operation.builder().action(OperationAction.SEND).build();
Operation receiverOperation =
Operation.builder().action(OperationAction.RECEIVE).build();

// when
Map<String, Operation> mergedOperations = ChannelMerger.mergeOperations(
Arrays.asList(Map.entry(operationName, senderOperation), Map.entry(operationName, receiverOperation)));
Arrays.asList(Map.entry(operationId, senderOperation), Map.entry(operationId, receiverOperation)));

// then
assertThat(mergedOperations).hasSize(1).hasEntrySatisfying(operationName, it -> {
assertThat(mergedOperations).hasSize(1).hasEntrySatisfying(operationId, it -> {
assertThat(it.getAction()).isEqualTo(OperationAction.SEND);
});
}
Expand Down Expand Up @@ -171,7 +171,7 @@ void shouldMergeDifferentMessagesForSameChannel() {
void shouldMergeDifferentMessageForSameOperation() {
// given
String channelName = "channel";
String operationName = "operation";
String operationId = "operation";
MessageObject message1 = MessageObject.builder()
.messageId("message1")
.name(String.class.getCanonicalName())
Expand Down Expand Up @@ -209,13 +209,13 @@ void shouldMergeDifferentMessageForSameOperation() {

// when
Map<String, Operation> mergedOperations = ChannelMerger.mergeOperations(List.of(
Map.entry(operationName, senderOperation1),
Map.entry(operationName, senderOperation2),
Map.entry(operationName, senderOperation3)));
Map.entry(operationId, senderOperation1),
Map.entry(operationId, senderOperation2),
Map.entry(operationId, senderOperation3)));

// then expectedMessage only includes message1 and message2.
// Message3 is not included as it is identical in terms of payload type (Message#name) to message 2
assertThat(mergedOperations).hasSize(1).hasEntrySatisfying(operationName, it -> {
assertThat(mergedOperations).hasSize(1).hasEntrySatisfying(operationId, it -> {
assertThat(it.getMessages()).containsExactlyInAnyOrder(messageRef1, messageRef2);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ void scan_componentHasListenerMethodWithAllAttributes() {
.build();

assertThat(actualChannels).containsExactly(Map.entry("test-channel", expectedChannel));
assertThat(actualOperations).containsExactly(Map.entry("test-channel", expectedOperation));
assertThat(actualOperations)
.containsExactly(Map.entry("test-channel_send_methodWithAnnotation", expectedOperation));
}

@Test
Expand Down Expand Up @@ -275,8 +276,8 @@ void scan_componentHasMultipleListenerAnnotations() {
"test-channel-2", expectedChannel2));
assertThat(actualOperations)
.containsExactlyInAnyOrderEntriesOf(Map.of(
"test-channel-1", expectedOperation1,
"test-channel-2", expectedOperation2));
"test-channel-1_send_methodWithMultipleAnnotation", expectedOperation1,
"test-channel-2_send_methodWithMultipleAnnotation", expectedOperation2));
}

@Test
Expand Down Expand Up @@ -318,7 +319,8 @@ void scan_componentHasAsyncMethodAnnotation() {
.build();

assertThat(actualChannels).containsExactly(Map.entry("test-channel", expectedChannel));
assertThat(actualOperations).containsExactly(Map.entry("test-channel", expectedOperation));
assertThat(actualOperations)
.containsExactly(Map.entry("test-channel_send_methodWithAnnotation", expectedOperation));
}

private static class ClassWithoutListenerAnnotation {
Expand Down Expand Up @@ -435,7 +437,8 @@ void scan_componentHasOnlyDeclaredMethods(Class<?> clazz) {
.build();

assertThat(actualChannels).containsExactly(Map.entry("test-channel", expectedChannel));
assertThat(actualOperations).containsExactly(Map.entry("test-channel", expectedOperation));
assertThat(actualOperations)
.containsExactly(Map.entry("test-channel_send_methodFromInterface", expectedOperation));
}

private static class ClassImplementingInterface implements ClassInterface<String> {
Expand Down Expand Up @@ -510,7 +513,8 @@ void scan_componentHasListenerMethodWithMetaAnnotation() {
.build();

assertThat(actualChannels).containsExactly(Map.entry("test-channel", expectedChannel));
assertThat(actualOperations).containsExactly(Map.entry("test-channel", expectedOperation));
assertThat(actualOperations)
.containsExactly(Map.entry("test-channel_send_methodFromInterface", expectedOperation));
}

public static class ClassWithMetaAnnotation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@
}
]
},
"example-producer-channel-publisher": {
"action": "receive",
"example-producer-channel-publisher_send_sendMessage": {
"action": "send",
"channel": {
"$ref": "#/channels/example-producer-channel-publisher"
},
"title": "example-producer-channel-publisher_receive",
"title": "example-producer-channel-publisher_send",
"description": "Custom, optional description defined in the AsyncPublisher annotation",
"bindings": {
"amqp": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,7 @@
"another-topic": {
"messages": {
"io.github.stavshamir.springwolf.example.cloudstream.dtos.AnotherPayloadDto": {
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"payload": {
"$ref": "#/components/schemas/AnotherPayloadDto"
},
"name": "io.github.stavshamir.springwolf.example.cloudstream.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"bindings": {
"kafka": { }
}
"$ref": "#/components/messages/io.github.stavshamir.springwolf.example.cloudstream.dtos.AnotherPayloadDto"
}
},
"bindings": {
Expand All @@ -45,17 +35,7 @@
"example-topic": {
"messages": {
"io.github.stavshamir.springwolf.example.cloudstream.dtos.ExamplePayloadDto": {
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"payload": {
"$ref": "#/components/schemas/ExamplePayloadDto"
},
"name": "io.github.stavshamir.springwolf.example.cloudstream.dtos.ExamplePayloadDto",
"title": "ExamplePayloadDto",
"bindings": {
"kafka": { }
}
"$ref": "#/components/messages/io.github.stavshamir.springwolf.example.cloudstream.dtos.ExamplePayloadDto"
}
},
"bindings": {
Expand Down Expand Up @@ -132,7 +112,40 @@
"example": { }
}
},
"messages": { }
"messages": {
"io.github.stavshamir.springwolf.example.cloudstream.dtos.AnotherPayloadDto": {
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"payload": {
"schemaFormat": "application/vnd.aai.asyncapi+json;version=3.0.0",
"schema": {
"$ref": "#/components/schemas/AnotherPayloadDto"
}
},
"name": "io.github.stavshamir.springwolf.example.cloudstream.dtos.AnotherPayloadDto",
"title": "AnotherPayloadDto",
"bindings": {
"kafka": { }
}
},
"io.github.stavshamir.springwolf.example.cloudstream.dtos.ExamplePayloadDto": {
"headers": {
"$ref": "#/components/schemas/HeadersNotDocumented"
},
"payload": {
"schemaFormat": "application/vnd.aai.asyncapi+json;version=3.0.0",
"schema": {
"$ref": "#/components/schemas/ExamplePayloadDto"
}
},
"name": "io.github.stavshamir.springwolf.example.cloudstream.dtos.ExamplePayloadDto",
"title": "ExamplePayloadDto",
"bindings": {
"kafka": { }
}
}
}
},
"operations": {
"another-topic_publish_consumerMethod": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,34 +151,34 @@
}
},
"operations": {
"another-queue": {
"another-queue_receive_receiveAnotherPayload": {
"action": "receive",
"channel": {
"$ref": "#/channels/another-queue"
},
"title": "another-queue_receive",
"description": "Custom, optional description defined in the AsyncPublisher annotation",
"bindings": {
"jms": {
"internal-field": "customValue",
"nested": {
"key": "nestedValue"
}
}
"jms": { }
},
"messages": [
{
"$ref": "#/channels/another-queue/messages/io.github.stavshamir.springwolf.example.jms.dtos.AnotherPayloadDto"
}
]
},
"another-queue_receive_receiveAnotherPayload": {
"action": "receive",
"another-queue_send_sendMessage": {
"action": "send",
"channel": {
"$ref": "#/channels/another-queue"
},
"title": "another-queue_send",
"description": "Custom, optional description defined in the AsyncPublisher annotation",
"bindings": {
"jms": { }
"jms": {
"internal-field": "customValue",
"nested": {
"key": "nestedValue"
}
}
},
"messages": [
{
Expand Down
Loading

0 comments on commit 0fe6a84

Please sign in to comment.