diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java index c285c8472..b9ae07052 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java @@ -58,6 +58,7 @@ public String register(AsyncHeaders headers) { log.debug("Registering schema for {}", headers.getSchemaName()); MapSchema headerSchema = new MapSchema(); + headerSchema.setName(headers.getSchemaName()); headerSchema.properties(headers); this.definitions.put(headers.getSchemaName(), headerSchema); diff --git a/springwolf-plugins/springwolf-amqp-plugin/build.gradle b/springwolf-plugins/springwolf-amqp-plugin/build.gradle index 00d3bc6c0..67ac167fc 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/build.gradle +++ b/springwolf-plugins/springwolf-amqp-plugin/build.gradle @@ -24,6 +24,8 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}" + compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}" permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}" diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/amqp/SpringwolfAmqpProducerConfiguration.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/amqp/SpringwolfAmqpProducerConfiguration.java index 66abee549..b2e4dd73f 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/amqp/SpringwolfAmqpProducerConfiguration.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/amqp/SpringwolfAmqpProducerConfiguration.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.github.stavshamir.springwolf.asyncapi.AsyncApiService; import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfAmqpController; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -37,9 +37,7 @@ public SpringwolfAmqpProducer springwolfAmqpProducer( @Bean @ConditionalOnMissingBean public SpringwolfAmqpController springwolfAmqpController( - AsyncApiDocketService asyncApiDocketService, - SpringwolfAmqpProducer springwolfAmqpProducer, - ObjectMapper objectMapper) { - return new SpringwolfAmqpController(asyncApiDocketService, springwolfAmqpProducer, objectMapper); + SchemasService schemasService, SpringwolfAmqpProducer springwolfAmqpProducer, ObjectMapper objectMapper) { + return new SpringwolfAmqpController(schemasService, springwolfAmqpProducer, objectMapper); } } diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfAmqpController.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfAmqpController.java index 013ee756f..11fab89fe 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfAmqpController.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfAmqpController.java @@ -4,8 +4,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; +import io.swagger.v3.oas.models.media.Schema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; @@ -25,7 +26,7 @@ @RequiredArgsConstructor public class SpringwolfAmqpController implements InitializingBean { - private final AsyncApiDocketService asyncApiDocketService; + private final SchemasService schemasService; private final SpringwolfAmqpProducer producer; @@ -38,22 +39,37 @@ public void publish(@RequestParam String topic, @RequestBody MessageDto message) throw new ResponseStatusException(HttpStatus.NOT_FOUND, "AMQP producer is not enabled"); } - String payloadType = message.getPayloadType(); - if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) { - try { - Class payloadClass = Class.forName(payloadType); - Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + boolean foundDefinition = false; + String messagePayloadType = message.getPayloadType(); + for (Schema value : schemasService.getDefinitions().values()) { + String schemaPayloadType = value.getName(); + // security: match against user input, but always use our controlled data from the DefaultSchemaService + if (schemaPayloadType.equals(messagePayloadType)) { + publishMessage(topic, message, schemaPayloadType); - log.debug("Publishing to amqp queue {}: {}", topic, message.getPayload()); - producer.send(topic, payload); - } catch (ClassNotFoundException | JsonProcessingException ex) { - throw new ResponseStatusException( - HttpStatus.BAD_REQUEST, - MessageFormat.format( - "Unable to create payload {0} from data: {1}", payloadType, message.getPayload())); + foundDefinition = true; + break; } - } else { - throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified."); + } + + if (!foundDefinition) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema."); + } + } + + private void publishMessage(String topic, MessageDto message, String schemaPayloadType) { + try { + Class payloadClass = Class.forName(schemaPayloadType); + Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + + log.debug("Publishing to amqp queue {}: {}", topic, message.getPayload()); + producer.send(topic, payload); + } catch (ClassNotFoundException | JsonProcessingException ex) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + MessageFormat.format( + "Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload())); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfKafkaController.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfKafkaController.java index 05b6785d4..8f8d32ac5 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfKafkaController.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfKafkaController.java @@ -4,8 +4,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; +import io.swagger.v3.oas.models.media.Schema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; @@ -25,7 +26,7 @@ @RequiredArgsConstructor public class SpringwolfKafkaController implements InitializingBean { - private final AsyncApiDocketService asyncApiDocketService; + private final SchemasService schemasService; private final SpringwolfKafkaProducer producer; @@ -34,28 +35,43 @@ public class SpringwolfKafkaController implements InitializingBean { @PostMapping("/publish") public void publish(@RequestParam String topic, @RequestBody MessageDto message) { if (!producer.isEnabled()) { - log.debug("Kafka producer is not enabled - message will not be published"); + log.warn("Kafka producer is not enabled - message will not be published"); throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled"); } - String payloadType = message.getPayloadType(); - if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) { - try { - Class payloadClass = Class.forName(payloadType); - Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + boolean foundDefinition = false; + String messagePayloadType = message.getPayloadType(); + for (Schema value : schemasService.getDefinitions().values()) { + String schemaPayloadType = value.getName(); + // security: match against user input, but always use our controlled data from the DefaultSchemaService + if (schemaPayloadType.equals(messagePayloadType)) { + publishMessage(topic, message, schemaPayloadType); - String kafkaKey = - message.getBindings() != null ? message.getBindings().get("key") : null; - log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message); - producer.send(topic, kafkaKey, message.getHeaders(), payload); - } catch (ClassNotFoundException | JsonProcessingException ex) { - throw new ResponseStatusException( - HttpStatus.BAD_REQUEST, - MessageFormat.format( - "Unable to create payload {0} from data: {1}", payloadType, message.getPayload())); + foundDefinition = true; + break; } - } else { - throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified."); + } + + if (!foundDefinition) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema."); + } + } + + private void publishMessage(String topic, MessageDto message, String schemaPayloadType) { + try { + Class payloadClass = Class.forName(schemaPayloadType); + Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + + String kafkaKey = + message.getBindings() != null ? message.getBindings().get("key") : null; + log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message); + producer.send(topic, kafkaKey, message.getHeaders(), payload); + } catch (ClassNotFoundException | JsonProcessingException ex) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + MessageFormat.format( + "Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload())); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java index 8036e2fee..805dcfa27 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java @@ -3,10 +3,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFactory; +import io.github.stavshamir.springwolf.schemas.SchemasService; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -25,10 +25,8 @@ public class SpringwolfKafkaProducerConfiguration { @Bean @ConditionalOnMissingBean public SpringwolfKafkaController springwolfKafkaController( - AsyncApiDocketService asyncApiDocketService, - SpringwolfKafkaProducer springwolfKafkaProducer, - ObjectMapper objectMapper) { - return new SpringwolfKafkaController(asyncApiDocketService, springwolfKafkaProducer, objectMapper); + SchemasService schemasService, SpringwolfKafkaProducer springwolfKafkaProducer, ObjectMapper objectMapper) { + return new SpringwolfKafkaController(schemasService, springwolfKafkaProducer, objectMapper); } @Bean diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerIntegrationTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerIntegrationTest.java index db6b8de83..e38fa9ba0 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerIntegrationTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/asyncapi/SpringwolfKafkaControllerIntegrationTest.java @@ -3,9 +3,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController; -import io.github.stavshamir.springwolf.configuration.DefaultAsyncApiDocketService; import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; +import io.github.stavshamir.springwolf.schemas.DefaultSchemasService; +import io.github.stavshamir.springwolf.schemas.SchemasService; +import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -39,7 +41,8 @@ classes = { SpringwolfKafkaController.class, SpringwolfKafkaProducer.class, - DefaultAsyncApiDocketService.class, + DefaultSchemasService.class, + ExampleJsonGenerator.class, SpringwolfConfigProperties.class, }) @TestPropertySource( @@ -49,13 +52,17 @@ "springwolf.docket.info.version=1.0", "springwolf.docket.servers.kafka.protocol=kafka", "springwolf.docket.servers.kafka.url=127.0.0.1", - "springwolf.plugin.kafka.publishing.enabled=true" + "springwolf.plugin.kafka.publishing.enabled=true", + "springwolf.use-fqn=true" }) class SpringwolfKafkaControllerIntegrationTest { @Autowired private MockMvc mvc; + @Autowired + private SchemasService schemasService; + @MockBean private SpringwolfKafkaProducer springwolfKafkaProducer; @@ -68,6 +75,8 @@ class SpringwolfKafkaControllerIntegrationTest { @BeforeEach void setup() { when(springwolfKafkaProducer.isEnabled()).thenReturn(true); + + schemasService.register(PayloadDto.class); } @Test diff --git a/springwolf-plugins/springwolf-sns-plugin/build.gradle b/springwolf-plugins/springwolf-sns-plugin/build.gradle index e79abc7f7..6640bb31f 100644 --- a/springwolf-plugins/springwolf-sns-plugin/build.gradle +++ b/springwolf-plugins/springwolf-sns-plugin/build.gradle @@ -32,6 +32,8 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}" + compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}" permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}" diff --git a/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSnsController.java b/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSnsController.java index 1b09d27b0..800126463 100644 --- a/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSnsController.java +++ b/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSnsController.java @@ -4,8 +4,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfSnsProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; +import io.swagger.v3.oas.models.media.Schema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; @@ -26,7 +27,7 @@ @RequiredArgsConstructor public class SpringwolfSnsController implements InitializingBean { - private final AsyncApiDocketService asyncApiDocketService; + private final SchemasService schemasService; private final SpringwolfSnsProducer producer; @@ -39,22 +40,37 @@ public void publish(@RequestParam String topic, @RequestBody MessageDto message) throw new ResponseStatusException(HttpStatus.NOT_FOUND, "SNS producer is not enabled"); } - String payloadType = message.getPayloadType(); - if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) { - try { - Class payloadClass = Class.forName(payloadType); - Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + boolean foundDefinition = false; + String messagePayloadType = message.getPayloadType(); + for (Schema value : schemasService.getDefinitions().values()) { + String schemaPayloadType = value.getName(); + // security: match against user input, but always use our controlled data from the DefaultSchemaService + if (schemaPayloadType.equals(messagePayloadType)) { + publishMessage(topic, message, schemaPayloadType); - log.debug("Publishing to sns queue {}: {}", topic, payload); - producer.send(topic, MessageBuilder.withPayload(payload).build()); - } catch (ClassNotFoundException | JsonProcessingException ex) { - throw new ResponseStatusException( - HttpStatus.BAD_REQUEST, - MessageFormat.format( - "Unable to create payload {0} from data: {1}", payloadType, message.getPayload())); + foundDefinition = true; + break; } - } else { - throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified."); + } + + if (!foundDefinition) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema."); + } + } + + private void publishMessage(String topic, MessageDto message, String schemaPayloadType) { + try { + Class payloadClass = Class.forName(schemaPayloadType); + Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + + log.debug("Publishing to sns topic {}: {}", topic, message); + producer.send(topic, MessageBuilder.withPayload(payload).build()); + } catch (ClassNotFoundException | JsonProcessingException ex) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + MessageFormat.format( + "Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload())); } } diff --git a/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sns/SpringwolfSnsProducerConfiguration.java b/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sns/SpringwolfSnsProducerConfiguration.java index d76f84464..d0f58fea3 100644 --- a/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sns/SpringwolfSnsProducerConfiguration.java +++ b/springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sns/SpringwolfSnsProducerConfiguration.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.awspring.cloud.sns.core.SnsTemplate; import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfSnsController; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfSnsProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -26,10 +26,8 @@ public class SpringwolfSnsProducerConfiguration { @Bean @ConditionalOnMissingBean public SpringwolfSnsController springwolfSnsController( - AsyncApiDocketService asyncApiDocketService, - SpringwolfSnsProducer springwolfSnsProducer, - ObjectMapper objectMapper) { - return new SpringwolfSnsController(asyncApiDocketService, springwolfSnsProducer, objectMapper); + SchemasService schemasService, SpringwolfSnsProducer springwolfSnsProducer, ObjectMapper objectMapper) { + return new SpringwolfSnsController(schemasService, springwolfSnsProducer, objectMapper); } @Bean diff --git a/springwolf-plugins/springwolf-sqs-plugin/build.gradle b/springwolf-plugins/springwolf-sqs-plugin/build.gradle index cbf2a64c1..9a3e94802 100644 --- a/springwolf-plugins/springwolf-sqs-plugin/build.gradle +++ b/springwolf-plugins/springwolf-sqs-plugin/build.gradle @@ -31,6 +31,8 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}" + compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}" permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}" diff --git a/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSqsController.java b/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSqsController.java index 2296b534b..bae367e5f 100644 --- a/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSqsController.java +++ b/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/controller/SpringwolfSqsController.java @@ -4,8 +4,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfSqsProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; +import io.swagger.v3.oas.models.media.Schema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; @@ -25,7 +26,7 @@ @RequiredArgsConstructor public class SpringwolfSqsController implements InitializingBean { - private final AsyncApiDocketService asyncApiDocketService; + private final SchemasService schemasService; private final SpringwolfSqsProducer producer; @@ -34,26 +35,41 @@ public class SpringwolfSqsController implements InitializingBean { @PostMapping("/publish") public void publish(@RequestParam String topic, @RequestBody MessageDto message) { if (!producer.isEnabled()) { - log.warn("SQS producer is not enabled - message will not be published"); - throw new ResponseStatusException(HttpStatus.NOT_FOUND, "SQS producer is not enabled"); + log.warn("Kafka producer is not enabled - message will not be published"); + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled"); } - String payloadType = message.getPayloadType(); - if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) { - try { - Class payloadClass = Class.forName(payloadType); - Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + boolean foundDefinition = false; + String messagePayloadType = message.getPayloadType(); + for (Schema value : schemasService.getDefinitions().values()) { + String schemaPayloadType = value.getName(); + // security: match against user input, but always use our controlled data from the DefaultSchemaService + if (schemaPayloadType.equals(messagePayloadType)) { + publishMessage(topic, message, schemaPayloadType); - log.debug("Publishing to sqs queue {}: {}", topic, payload); - producer.send(topic, payload); - } catch (ClassNotFoundException | JsonProcessingException ex) { - throw new ResponseStatusException( - HttpStatus.BAD_REQUEST, - MessageFormat.format( - "Unable to create payload {0} from data: {1}", payloadType, message.getPayload())); + foundDefinition = true; + break; } - } else { - throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified."); + } + + if (!foundDefinition) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema."); + } + } + + private void publishMessage(String topic, MessageDto message, String schemaPayloadType) { + try { + Class payloadClass = Class.forName(schemaPayloadType); + Object payload = objectMapper.readValue(message.getPayload(), payloadClass); + + log.debug("Publishing to SQS queue {}: {}", topic, message); + producer.send(topic, payload); + } catch (ClassNotFoundException | JsonProcessingException ex) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + MessageFormat.format( + "Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload())); } } diff --git a/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sqs/SpringwolfSqsProducerConfiguration.java b/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sqs/SpringwolfSqsProducerConfiguration.java index 04d6e6120..03f912f4d 100644 --- a/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sqs/SpringwolfSqsProducerConfiguration.java +++ b/springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/sqs/SpringwolfSqsProducerConfiguration.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfSqsController; -import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService; import io.github.stavshamir.springwolf.producer.SpringwolfSqsProducer; +import io.github.stavshamir.springwolf.schemas.SchemasService; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -32,9 +32,7 @@ public SpringwolfSqsProducer springwolfSqsProducer(List sqsTemplate @Bean @ConditionalOnMissingBean public SpringwolfSqsController springwolfSqsController( - AsyncApiDocketService asyncApiDocketService, - SpringwolfSqsProducer springwolfSqsProducer, - ObjectMapper objectMapper) { - return new SpringwolfSqsController(asyncApiDocketService, springwolfSqsProducer, objectMapper); + SchemasService schemasService, SpringwolfSqsProducer springwolfSqsProducer, ObjectMapper objectMapper) { + return new SpringwolfSqsController(schemasService, springwolfSqsProducer, objectMapper); } }