Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple RabbitTemplate beans #50

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ public class RabbitMqTracingManualConfig {
private Tracer tracer;

@Bean
public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitTemplate rabbitTemplate,
RabbitMqSpanDecorator spanDecorator) {
Assert.notNull(rabbitTemplate.getMessageConverter(), "RabbitTemplate has no message converter configured");

return new RabbitMqSendTracingAspect(tracer, rabbitTemplate.getExchange(), rabbitTemplate.getRoutingKey(),
rabbitTemplate.getMessageConverter(), spanDecorator);
public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitMqSpanDecorator spanDecorator) {
return new RabbitMqSendTracingAspect(tracer, spanDecorator);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ public class RabbitMqTracingAutoConfiguration {
*/
@ConditionalOnBean(RabbitTemplate.class)
@Bean
public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitTemplate rabbitTemplate,
RabbitMqSpanDecorator spanDecorator) {
Assert.notNull(rabbitTemplate.getMessageConverter(), "RabbitTemplate has no message converter configured");

return new RabbitMqSendTracingAspect(tracer, rabbitTemplate.getExchange(), rabbitTemplate.getRoutingKey(),
rabbitTemplate.getMessageConverter(), spanDecorator);
public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitMqSpanDecorator spanDecorator) {
return new RabbitMqSendTracingAspect(tracer, spanDecorator);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.opentracing.contrib.spring.rabbitmq;

import io.opentracing.Tracer;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
Expand All @@ -32,9 +33,6 @@
class RabbitMqSendTracingAspect {

private final Tracer tracer;
private final String exchange;
private final String routingKey;
private final MessageConverter messageConverter;
private final RabbitMqSpanDecorator spanDecorator;

/**
Expand All @@ -43,9 +41,7 @@ class RabbitMqSendTracingAspect {
@Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.send(..)) && args(message)",
argNames = "pjp,message")
public Object traceRabbitSend(ProceedingJoinPoint pjp, Object message) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 0));
return tagAndProceed(pjp, null, null, message, 0, false);
}

/**
Expand All @@ -54,9 +50,7 @@ public Object traceRabbitSend(ProceedingJoinPoint pjp, Object message) throws Th
@Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.send(..)) && args(routingKey, message)",
argNames = "pjp,routingKey,message")
public Object traceRabbitSend(ProceedingJoinPoint pjp, String routingKey, Object message) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, false);
}

/**
Expand All @@ -66,9 +60,7 @@ public Object traceRabbitSend(ProceedingJoinPoint pjp, String routingKey, Object
"routingKey, message)", argNames = "pjp,exchange, routingKey, message")
public Object traceRabbitSend(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, false);
}

/**
Expand All @@ -77,9 +69,7 @@ public Object traceRabbitSend(ProceedingJoinPoint pjp, String exchange, String r
@Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.convertAndSend(..)) " +
"&& args(message)", argNames = "pjp,message")
public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 0));
return tagAndProceed(pjp, null, null, message, 0, false);
}

/**
Expand All @@ -90,9 +80,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message)
public Object traceRabbitConvertAndSend(
ProceedingJoinPoint pjp, String routingKey, Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, false);
}

/**
Expand All @@ -103,9 +91,7 @@ public Object traceRabbitConvertAndSend(
public Object traceRabbitConvertAndSend(
ProceedingJoinPoint pjp, String exchange, String routingKey, Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, false);
}

/**
Expand All @@ -115,9 +101,7 @@ public Object traceRabbitConvertAndSend(
" && args(message, messagePostProcessor)", argNames = "pjp,message,messagePostProcessor")
public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message,
MessagePostProcessor messagePostProcessor) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 0));
return tagAndProceed(pjp, null, null, message, 0, false);
}

/**
Expand All @@ -128,9 +112,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message,
argNames = "pjp,routingKey,message,messagePostProcessor")
public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, false);
}

/**
Expand All @@ -141,9 +123,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingK
argNames = "pjp,exchange,routingKey,message,messagePostProcessor")
public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp,String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, false);
}

/**
Expand All @@ -155,9 +135,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp,String exchange,
public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingKey, Object message,
MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, false);
}

/**
Expand All @@ -169,9 +147,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingK
public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message,
CorrelationData correlationData)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, false);
}

/**
Expand All @@ -180,9 +156,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String exchange
@Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.sendAndReceive(..))" +
" && args(message)", argNames = "pjp,message")
public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, Object message) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 0));
return tagAndProceed(pjp, null, null, message, 0, true);
}

/**
Expand All @@ -191,9 +165,7 @@ public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, Object message)
@Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.sendAndReceive(..))" +
" && args(routingKey, message)", argNames = "pjp,routingKey,message")
public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String routingKey, Object message) throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, true);
}

/**
Expand All @@ -203,9 +175,7 @@ public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String routingK
" && args(exchange, routingKey, message)", argNames = "pjp,exchange,routingKey,message")
public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, true);
}

// Intercept public methods that eventually delegate to RabbitTemplate.doSendAndReceive
Expand All @@ -218,10 +188,7 @@ public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String exchange
public Object traceRabbitSendAndReceive(
ProceedingJoinPoint pjp, String exchange, String routingKey, Message message, CorrelationData correlationData)
throws Throwable {
return createTracingHelper()
.nullResponseMeansTimeout((RabbitTemplate) pjp.getTarget())
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, true);
}

/**
Expand All @@ -231,9 +198,7 @@ public Object traceRabbitSendAndReceive(
" && args(message)", argNames = "pjp,message")
public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 0));
return tagAndProceed(pjp, null, null, message, 0, true);
}

/**
Expand All @@ -243,9 +208,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object m
" && args(routingKey, message)", argNames = "pjp,routingKey,message")
public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String routingKey, Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, true);
}

/**
Expand All @@ -256,9 +219,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String r
public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String exchange, String routingKey,
Object message)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, true);
}

/**
Expand All @@ -269,9 +230,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String e
public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object message,
MessagePostProcessor messagePostProcessor)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 0));
return tagAndProceed(pjp, null, null, message, 0, true);
}

/**
Expand All @@ -283,9 +242,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object m
public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 1));
return tagAndProceed(pjp, null, routingKey, message, 1, true);
}

/**
Expand All @@ -297,9 +254,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String r
public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String exchange, String routingKey,
Object message, MessagePostProcessor messagePostProcessor)
throws Throwable {
return createTracingHelper()
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, true);
}

/**
Expand All @@ -312,14 +267,27 @@ public Object traceRabbitConvertSendAndReceive(
ProceedingJoinPoint pjp, String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
throws Throwable {
return createTracingHelper()
.nullResponseMeansTimeout((RabbitTemplate) pjp.getTarget())
.doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) ->
proceedReplacingMessage(pjp, convertedMessage, 2));
return tagAndProceed(pjp, exchange, routingKey, message, 2, true);
}

private RabbitMqSendTracingHelper createTracingHelper() {
return new RabbitMqSendTracingHelper(tracer, messageConverter, spanDecorator);
private Object tagAndProceed(ProceedingJoinPoint pjp, String exchange, String routingKey,
Object message, int messageIndex, boolean nullResponseMeansTimeout)
throws Throwable {
if (pjp.getTarget() instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) pjp.getTarget();
MessageConverter converter = rabbitTemplate.getMessageConverter();
String exactlyRoutingKey = Optional.ofNullable(routingKey).orElseGet(rabbitTemplate::getRoutingKey);
String exactlyExchange = Optional.ofNullable(exchange).orElseGet(rabbitTemplate::getExchange);

RabbitMqSendTracingHelper helper = new RabbitMqSendTracingHelper(tracer, converter, spanDecorator);
if (nullResponseMeansTimeout) {
helper.nullResponseMeansTimeout(rabbitTemplate);
}
return helper
.doWithTracingHeadersMessage(exactlyExchange, exactlyRoutingKey, message,
(convertedMessage) -> proceedReplacingMessage(pjp, convertedMessage, messageIndex));
}
return pjp.proceed(pjp.getArgs());
}

private Object proceedReplacingMessage(ProceedingJoinPoint pjp, Message convertedMessage, int messageArgumentIndex)
Expand Down
Loading