Skip to content

Commit

Permalink
Refactor/avoid user controlled input (#381)
Browse files Browse the repository at this point in the history
* refactor(plugin): avoid user controlled input

* refactor(plugin): avoid user controlled input

* chore: fixes after rebase

* feat(core): add schema name to AsyncHeaders

* test(kafka): fix SpringwolfKafkaControllerIntegrationTest
  • Loading branch information
timonback authored Oct 27, 2023
1 parent adc1d80 commit 90ea6dd
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public String register(AsyncHeaders headers) {
log.debug("Registering schema for {}", headers.getSchemaName());

MapSchema headerSchema = new MapSchema();
headerSchema.setName(headers.getSchemaName());
headerSchema.properties(headers);

this.definitions.put(headers.getSchemaName(), headerSchema);
Expand Down
2 changes: 2 additions & 0 deletions springwolf-plugins/springwolf-amqp-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}"

compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}"
permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.AsyncApiService;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfAmqpController;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -37,9 +37,7 @@ public SpringwolfAmqpProducer springwolfAmqpProducer(
@Bean
@ConditionalOnMissingBean
public SpringwolfAmqpController springwolfAmqpController(
AsyncApiDocketService asyncApiDocketService,
SpringwolfAmqpProducer springwolfAmqpProducer,
ObjectMapper objectMapper) {
return new SpringwolfAmqpController(asyncApiDocketService, springwolfAmqpProducer, objectMapper);
SchemasService schemasService, SpringwolfAmqpProducer springwolfAmqpProducer, ObjectMapper objectMapper) {
return new SpringwolfAmqpController(schemasService, springwolfAmqpProducer, objectMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.models.media.Schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -25,7 +26,7 @@
@RequiredArgsConstructor
public class SpringwolfAmqpController implements InitializingBean {

private final AsyncApiDocketService asyncApiDocketService;
private final SchemasService schemasService;

private final SpringwolfAmqpProducer producer;

Expand All @@ -38,22 +39,37 @@ public void publish(@RequestParam String topic, @RequestBody MessageDto message)
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "AMQP producer is not enabled");
}

String payloadType = message.getPayloadType();
if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
try {
Class<?> payloadClass = Class.forName(payloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);
boolean foundDefinition = false;
String messagePayloadType = message.getPayloadType();
for (Schema<?> value : schemasService.getDefinitions().values()) {
String schemaPayloadType = value.getName();
// security: match against user input, but always use our controlled data from the DefaultSchemaService
if (schemaPayloadType.equals(messagePayloadType)) {
publishMessage(topic, message, schemaPayloadType);

log.debug("Publishing to amqp queue {}: {}", topic, message.getPayload());
producer.send(topic, payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
foundDefinition = true;
break;
}
} else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified.");
}

if (!foundDefinition) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema.");
}
}

private void publishMessage(String topic, MessageDto message, String schemaPayloadType) {
try {
Class<?> payloadClass = Class.forName(schemaPayloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);

log.debug("Publishing to amqp queue {}: {}", topic, message.getPayload());
producer.send(topic, payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.models.media.Schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -25,7 +26,7 @@
@RequiredArgsConstructor
public class SpringwolfKafkaController implements InitializingBean {

private final AsyncApiDocketService asyncApiDocketService;
private final SchemasService schemasService;

private final SpringwolfKafkaProducer producer;

Expand All @@ -34,28 +35,43 @@ public class SpringwolfKafkaController implements InitializingBean {
@PostMapping("/publish")
public void publish(@RequestParam String topic, @RequestBody MessageDto message) {
if (!producer.isEnabled()) {
log.debug("Kafka producer is not enabled - message will not be published");
log.warn("Kafka producer is not enabled - message will not be published");
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled");
}

String payloadType = message.getPayloadType();
if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
try {
Class<?> payloadClass = Class.forName(payloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);
boolean foundDefinition = false;
String messagePayloadType = message.getPayloadType();
for (Schema<?> value : schemasService.getDefinitions().values()) {
String schemaPayloadType = value.getName();
// security: match against user input, but always use our controlled data from the DefaultSchemaService
if (schemaPayloadType.equals(messagePayloadType)) {
publishMessage(topic, message, schemaPayloadType);

String kafkaKey =
message.getBindings() != null ? message.getBindings().get("key") : null;
log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message);
producer.send(topic, kafkaKey, message.getHeaders(), payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
foundDefinition = true;
break;
}
} else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified.");
}

if (!foundDefinition) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema.");
}
}

private void publishMessage(String topic, MessageDto message, String schemaPayloadType) {
try {
Class<?> payloadClass = Class.forName(schemaPayloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);

String kafkaKey =
message.getBindings() != null ? message.getBindings().get("key") : null;
log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message);
producer.send(topic, kafkaKey, message.getHeaders(), payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFactory;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -25,10 +25,8 @@ public class SpringwolfKafkaProducerConfiguration {
@Bean
@ConditionalOnMissingBean
public SpringwolfKafkaController springwolfKafkaController(
AsyncApiDocketService asyncApiDocketService,
SpringwolfKafkaProducer springwolfKafkaProducer,
ObjectMapper objectMapper) {
return new SpringwolfKafkaController(asyncApiDocketService, springwolfKafkaProducer, objectMapper);
SchemasService schemasService, SpringwolfKafkaProducer springwolfKafkaProducer, ObjectMapper objectMapper) {
return new SpringwolfKafkaController(schemasService, springwolfKafkaProducer, objectMapper);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController;
import io.github.stavshamir.springwolf.configuration.DefaultAsyncApiDocketService;
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -39,7 +41,8 @@
classes = {
SpringwolfKafkaController.class,
SpringwolfKafkaProducer.class,
DefaultAsyncApiDocketService.class,
DefaultSchemasService.class,
ExampleJsonGenerator.class,
SpringwolfConfigProperties.class,
})
@TestPropertySource(
Expand All @@ -49,13 +52,17 @@
"springwolf.docket.info.version=1.0",
"springwolf.docket.servers.kafka.protocol=kafka",
"springwolf.docket.servers.kafka.url=127.0.0.1",
"springwolf.plugin.kafka.publishing.enabled=true"
"springwolf.plugin.kafka.publishing.enabled=true",
"springwolf.use-fqn=true"
})
class SpringwolfKafkaControllerIntegrationTest {

@Autowired
private MockMvc mvc;

@Autowired
private SchemasService schemasService;

@MockBean
private SpringwolfKafkaProducer springwolfKafkaProducer;

Expand All @@ -68,6 +75,8 @@ class SpringwolfKafkaControllerIntegrationTest {
@BeforeEach
void setup() {
when(springwolfKafkaProducer.isEnabled()).thenReturn(true);

schemasService.register(PayloadDto.class);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions springwolf-plugins/springwolf-sns-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}"

compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}"
permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfSnsProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.models.media.Schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -26,7 +27,7 @@
@RequiredArgsConstructor
public class SpringwolfSnsController implements InitializingBean {

private final AsyncApiDocketService asyncApiDocketService;
private final SchemasService schemasService;

private final SpringwolfSnsProducer producer;

Expand All @@ -39,22 +40,37 @@ public void publish(@RequestParam String topic, @RequestBody MessageDto message)
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "SNS producer is not enabled");
}

String payloadType = message.getPayloadType();
if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
try {
Class<?> payloadClass = Class.forName(payloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);
boolean foundDefinition = false;
String messagePayloadType = message.getPayloadType();
for (Schema<?> value : schemasService.getDefinitions().values()) {
String schemaPayloadType = value.getName();
// security: match against user input, but always use our controlled data from the DefaultSchemaService
if (schemaPayloadType.equals(messagePayloadType)) {
publishMessage(topic, message, schemaPayloadType);

log.debug("Publishing to sns queue {}: {}", topic, payload);
producer.send(topic, MessageBuilder.withPayload(payload).build());
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
foundDefinition = true;
break;
}
} else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified.");
}

if (!foundDefinition) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema.");
}
}

private void publishMessage(String topic, MessageDto message, String schemaPayloadType) {
try {
Class<?> payloadClass = Class.forName(schemaPayloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);

log.debug("Publishing to sns topic {}: {}", topic, message);
producer.send(topic, MessageBuilder.withPayload(payload).build());
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sns.core.SnsTemplate;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfSnsController;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfSnsProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,10 +26,8 @@ public class SpringwolfSnsProducerConfiguration {
@Bean
@ConditionalOnMissingBean
public SpringwolfSnsController springwolfSnsController(
AsyncApiDocketService asyncApiDocketService,
SpringwolfSnsProducer springwolfSnsProducer,
ObjectMapper objectMapper) {
return new SpringwolfSnsController(asyncApiDocketService, springwolfSnsProducer, objectMapper);
SchemasService schemasService, SpringwolfSnsProducer springwolfSnsProducer, ObjectMapper objectMapper) {
return new SpringwolfSnsController(schemasService, springwolfSnsProducer, objectMapper);
}

@Bean
Expand Down
2 changes: 2 additions & 0 deletions springwolf-plugins/springwolf-sqs-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}"

compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}"
permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}"

Expand Down
Loading

0 comments on commit 90ea6dd

Please sign in to comment.