Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/avoid user controlled input #381

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public String register(AsyncHeaders headers) {
log.debug("Registering schema for {}", headers.getSchemaName());

MapSchema headerSchema = new MapSchema();
headerSchema.setName(headers.getSchemaName());
headerSchema.properties(headers);

this.definitions.put(headers.getSchemaName(), headerSchema);
Expand Down
2 changes: 2 additions & 0 deletions springwolf-plugins/springwolf-amqp-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}"

compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}"
permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.AsyncApiService;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfAmqpController;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -37,9 +37,7 @@ public SpringwolfAmqpProducer springwolfAmqpProducer(
@Bean
@ConditionalOnMissingBean
public SpringwolfAmqpController springwolfAmqpController(
AsyncApiDocketService asyncApiDocketService,
SpringwolfAmqpProducer springwolfAmqpProducer,
ObjectMapper objectMapper) {
return new SpringwolfAmqpController(asyncApiDocketService, springwolfAmqpProducer, objectMapper);
SchemasService schemasService, SpringwolfAmqpProducer springwolfAmqpProducer, ObjectMapper objectMapper) {
return new SpringwolfAmqpController(schemasService, springwolfAmqpProducer, objectMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfAmqpProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.models.media.Schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -25,7 +26,7 @@
@RequiredArgsConstructor
public class SpringwolfAmqpController implements InitializingBean {
timonback marked this conversation as resolved.
Show resolved Hide resolved

private final AsyncApiDocketService asyncApiDocketService;
private final SchemasService schemasService;

private final SpringwolfAmqpProducer producer;

Expand All @@ -38,22 +39,37 @@ public void publish(@RequestParam String topic, @RequestBody MessageDto message)
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "AMQP producer is not enabled");
}

String payloadType = message.getPayloadType();
if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
try {
Class<?> payloadClass = Class.forName(payloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);
boolean foundDefinition = false;
String messagePayloadType = message.getPayloadType();
for (Schema<?> value : schemasService.getDefinitions().values()) {
String schemaPayloadType = value.getName();
// security: match against user input, but always use our controlled data from the DefaultSchemaService
if (schemaPayloadType.equals(messagePayloadType)) {
publishMessage(topic, message, schemaPayloadType);

log.debug("Publishing to amqp queue {}: {}", topic, message.getPayload());
producer.send(topic, payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
foundDefinition = true;
break;
}
} else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified.");
}

if (!foundDefinition) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema.");
}
}

private void publishMessage(String topic, MessageDto message, String schemaPayloadType) {
try {
Class<?> payloadClass = Class.forName(schemaPayloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);

log.debug("Publishing to amqp queue {}: {}", topic, message.getPayload());
producer.send(topic, payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.models.media.Schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -25,7 +26,7 @@
@RequiredArgsConstructor
public class SpringwolfKafkaController implements InitializingBean {

private final AsyncApiDocketService asyncApiDocketService;
private final SchemasService schemasService;

private final SpringwolfKafkaProducer producer;

Expand All @@ -34,28 +35,43 @@ public class SpringwolfKafkaController implements InitializingBean {
@PostMapping("/publish")
public void publish(@RequestParam String topic, @RequestBody MessageDto message) {
if (!producer.isEnabled()) {
log.debug("Kafka producer is not enabled - message will not be published");
log.warn("Kafka producer is not enabled - message will not be published");
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled");
}

String payloadType = message.getPayloadType();
if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
try {
Class<?> payloadClass = Class.forName(payloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);
boolean foundDefinition = false;
String messagePayloadType = message.getPayloadType();
for (Schema<?> value : schemasService.getDefinitions().values()) {
String schemaPayloadType = value.getName();
// security: match against user input, but always use our controlled data from the DefaultSchemaService
if (schemaPayloadType.equals(messagePayloadType)) {
publishMessage(topic, message, schemaPayloadType);

String kafkaKey =
message.getBindings() != null ? message.getBindings().get("key") : null;
log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message);
producer.send(topic, kafkaKey, message.getHeaders(), payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
foundDefinition = true;
break;
}
} else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified.");
}

if (!foundDefinition) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema.");
}
}

private void publishMessage(String topic, MessageDto message, String schemaPayloadType) {
try {
Class<?> payloadClass = Class.forName(schemaPayloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);

String kafkaKey =
message.getBindings() != null ? message.getBindings().get("key") : null;
log.debug("Publishing to kafka topic {} with key {}: {}", topic, kafkaKey, message);
producer.send(topic, kafkaKey, message.getHeaders(), payload);
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFactory;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -25,10 +25,8 @@ public class SpringwolfKafkaProducerConfiguration {
@Bean
@ConditionalOnMissingBean
public SpringwolfKafkaController springwolfKafkaController(
AsyncApiDocketService asyncApiDocketService,
SpringwolfKafkaProducer springwolfKafkaProducer,
ObjectMapper objectMapper) {
return new SpringwolfKafkaController(asyncApiDocketService, springwolfKafkaProducer, objectMapper);
SchemasService schemasService, SpringwolfKafkaProducer springwolfKafkaProducer, ObjectMapper objectMapper) {
return new SpringwolfKafkaController(schemasService, springwolfKafkaProducer, objectMapper);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController;
import io.github.stavshamir.springwolf.configuration.DefaultAsyncApiDocketService;
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -39,7 +41,8 @@
classes = {
SpringwolfKafkaController.class,
SpringwolfKafkaProducer.class,
DefaultAsyncApiDocketService.class,
DefaultSchemasService.class,
ExampleJsonGenerator.class,
SpringwolfConfigProperties.class,
})
@TestPropertySource(
Expand All @@ -49,13 +52,17 @@
"springwolf.docket.info.version=1.0",
"springwolf.docket.servers.kafka.protocol=kafka",
"springwolf.docket.servers.kafka.url=127.0.0.1",
"springwolf.plugin.kafka.publishing.enabled=true"
"springwolf.plugin.kafka.publishing.enabled=true",
"springwolf.use-fqn=true"
})
class SpringwolfKafkaControllerIntegrationTest {

@Autowired
private MockMvc mvc;

@Autowired
private SchemasService schemasService;

@MockBean
private SpringwolfKafkaProducer springwolfKafkaProducer;

Expand All @@ -68,6 +75,8 @@ class SpringwolfKafkaControllerIntegrationTest {
@BeforeEach
void setup() {
when(springwolfKafkaProducer.isEnabled()).thenReturn(true);

schemasService.register(PayloadDto.class);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions springwolf-plugins/springwolf-sns-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}"

compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}"
permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfSnsProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import io.swagger.v3.oas.models.media.Schema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -26,7 +27,7 @@
@RequiredArgsConstructor
public class SpringwolfSnsController implements InitializingBean {

private final AsyncApiDocketService asyncApiDocketService;
private final SchemasService schemasService;

private final SpringwolfSnsProducer producer;

Expand All @@ -39,22 +40,37 @@ public void publish(@RequestParam String topic, @RequestBody MessageDto message)
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "SNS producer is not enabled");
}

String payloadType = message.getPayloadType();
if (payloadType.startsWith(asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
try {
Class<?> payloadClass = Class.forName(payloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);
boolean foundDefinition = false;
String messagePayloadType = message.getPayloadType();
for (Schema<?> value : schemasService.getDefinitions().values()) {
String schemaPayloadType = value.getName();
// security: match against user input, but always use our controlled data from the DefaultSchemaService
if (schemaPayloadType.equals(messagePayloadType)) {
publishMessage(topic, message, schemaPayloadType);

log.debug("Publishing to sns queue {}: {}", topic, payload);
producer.send(topic, MessageBuilder.withPayload(payload).build());
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
foundDefinition = true;
break;
}
} else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No payloadType specified.");
}

if (!foundDefinition) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST, "Specified payloadType is not a registered springwolf schema.");
}
}

private void publishMessage(String topic, MessageDto message, String schemaPayloadType) {
try {
Class<?> payloadClass = Class.forName(schemaPayloadType);
Object payload = objectMapper.readValue(message.getPayload(), payloadClass);

log.debug("Publishing to sns topic {}: {}", topic, message);
producer.send(topic, MessageBuilder.withPayload(payload).build());
} catch (ClassNotFoundException | JsonProcessingException ex) {
throw new ResponseStatusException(
HttpStatus.BAD_REQUEST,
MessageFormat.format(
"Unable to create payload {0} from data: {1}", schemaPayloadType, message.getPayload()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sns.core.SnsTemplate;
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfSnsController;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfSnsProducer;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,10 +26,8 @@ public class SpringwolfSnsProducerConfiguration {
@Bean
@ConditionalOnMissingBean
public SpringwolfSnsController springwolfSnsController(
AsyncApiDocketService asyncApiDocketService,
SpringwolfSnsProducer springwolfSnsProducer,
ObjectMapper objectMapper) {
return new SpringwolfSnsController(asyncApiDocketService, springwolfSnsProducer, objectMapper);
SchemasService schemasService, SpringwolfSnsProducer springwolfSnsProducer, ObjectMapper objectMapper) {
return new SpringwolfSnsController(schemasService, springwolfSnsProducer, objectMapper);
}

@Bean
Expand Down
2 changes: 2 additions & 0 deletions springwolf-plugins/springwolf-sqs-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

implementation "io.swagger.core.v3:swagger-models-jakarta:${swaggerVersion}"

compileOnly "com.google.code.findbugs:jsr305:${jsr305Version}"
permitUnusedDeclared "com.google.code.findbugs:jsr305:${jsr305Version}"

Expand Down
Loading