Skip to content

Commit

Permalink
Fixes GH-219 (#220)
Browse files Browse the repository at this point in the history
- Default contentType is now application/json. Messages without a contentType will have it set by default (same as docs)
- Fixed issue where manual ack was being verified on the wrong place, causing an warning to be displayed when not needed
  • Loading branch information
viniciusccarvalho authored Sep 3, 2020
1 parent 6be641a commit e132cd6
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@

/**
* Defines the Content-Type to be used for message deserialization.
* There's no default, if not set and the message does not contain a Content-Type header deserialization will fail.
* Defaults to application/json.
* @return contentType to use
*/
String contentType() default "";
String contentType() default "application/json";

/**
* Defines the name of a particular configuration used for a Subscriber.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.gcp.pubsub.exception.PubSubListenerException;
import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDes;
import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDesRegistry;
Expand Down Expand Up @@ -56,7 +57,7 @@ public BindingResult<Object> bind(ArgumentConversionContext<Object> context, Pub
} else if (bodyType.getType().equals(PubsubMessage.class)) {
result = state.getPubsubMessage();
} else {
if (!state.getPubsubMessage().containsAttributes("Content-Type")) {
if (StringUtils.isEmpty(state.getContentType()) && !state.getPubsubMessage().containsAttributes("Content-Type")) {
throw new PubSubListenerException("Could not detect Content-Type header at message and no Content-Type specified on method.");
}
PubSubMessageSerDes serDes = serDesRegistry.find(state.getContentType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
ProjectSubscriptionName projectSubscriptionName = PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, googleCloudConfiguration.getProjectId());
String defaultContentType = subscriptionAnnotation.get("contentType", String.class).orElse("");
String configuration = subscriptionAnnotation.get("configuration", String.class).orElse("");
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer ackReplyConsumer) -> {
String messageContentType = message.getAttributesMap().getOrDefault("Content-Type", "");
String contentType = Optional.of(messageContentType)
.filter(StringUtils::isNotEmpty)
.orElse(defaultContentType);
DefaultPubSubAcknowledgement pubSubAcknowledgement = new DefaultPubSubAcknowledgement(consumer);
DefaultPubSubAcknowledgement pubSubAcknowledgement = new DefaultPubSubAcknowledgement(ackReplyConsumer);

PubSubConsumerState consumerState = new PubSubConsumerState(message, consumer,
PubSubConsumerState consumerState = new PubSubConsumerState(message, ackReplyConsumer,
projectSubscriptionName, contentType);
try {
BoundExecutable executable = null;
Expand All @@ -130,8 +130,12 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
if (!hasAckArg) { // if manual ack is not specified we auto ack message after method execution
pubSubAcknowledgement.ack();
} else {
if (!pubSubAcknowledgement.isClientAck()) {
logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", method.getName());
Optional<Object> boundAck = Arrays.stream(executable.getBoundArguments()).filter(o -> (o instanceof DefaultPubSubAcknowledgement)).findFirst();
if (boundAck.isPresent()) {
DefaultPubSubAcknowledgement manualAck = (DefaultPubSubAcknowledgement) boundAck.get();
if (!manualAck.isClientAck()) {
logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", method.getName());
}
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.AbstractConsumerSpec
import io.micronaut.gcp.pubsub.MockPubSubEngine
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Person
import io.micronaut.http.annotation.Body
import io.micronaut.messaging.Acknowledgement
import io.micronaut.messaging.annotation.Header
import io.micronaut.test.annotation.MicronautTest
import spock.util.concurrent.PollingConditions
Expand All @@ -31,6 +34,8 @@ class SimpleConsumerSpec extends AbstractConsumerSpec {
@Inject
ObjectMapper mapper

@Inject MockPubSubEngine mockPubSubEngine

void "simple consumer spec"() {
PollingConditions conditions = new PollingConditions(timeout: 3)
when:
Expand Down Expand Up @@ -93,6 +98,35 @@ class SimpleConsumerSpec extends AbstractConsumerSpec {
receiver.dataHolder["test-without-content-type"].name == person.name
}
}

void "receive with manual ack"() {
PollingConditions conditions = new PollingConditions(timeout: 3)
def person = new Person()
person.name = "alf"
when:
pubSubClient.publishPojoForManualAck(person)
then:
conditions.eventually {
receiver.dataHolder["test-with-manual-ack"].name == person.name
}
}

void "receive with default content type used"() {
PollingConditions conditions = new PollingConditions(timeout: 3)
def person = new Person()
person.name = "alf"
def message = PubsubMessage
.newBuilder()
.setData(ByteString.copyFrom(mapper.writeValueAsBytes(person)))
.build()

when:
mockPubSubEngine.publish(message, "test-with-default-contentType")
then:
conditions.eventually {
receiver.dataHolder["test-with-default-contentType"].name == person.name
}
}
}

@PubSubClient
Expand All @@ -113,6 +147,9 @@ interface SimplePubSubClient {
@Topic(value = "test-without-content-type", contentType = "")
String publishPojoWithoutContentType(byte[] data)

@Topic(value = "test-with-manual-ack")
String publishPojoForManualAck(Person person);

}

@PubSubListener
Expand Down Expand Up @@ -150,4 +187,14 @@ class SimpleReceiver {
void receive(Person person){
dataHolder["test-without-content-type"] = person
}

@Subscription(value = "test-with-manual-ack", contentType = "application/json")
void receive(@Body Person person, Acknowledgement ack) {
dataHolder["test-with-manual-ack"] = person
ack.ack()
}
@Subscription(value="test-with-default-contentType")
void receiveWithDefault(Person person){
dataHolder["test-with-default-contentType"] = person
}
}

0 comments on commit e132cd6

Please sign in to comment.