Skip to content

Commit

Permalink
Code changes for the feedback on issue hobbit-project#45
Browse files Browse the repository at this point in the history
  • Loading branch information
altafhusen-mr committed Nov 23, 2019
1 parent e57f792 commit 67f46f1
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.hobbit.core.components.AbstractCommandReceivingComponent;
import org.hobbit.core.components.PlatformConnector;
import org.hobbit.core.data.usage.ResourceUsageInformation;
import org.hobbit.core.rabbit.CustomConsumer;
import org.hobbit.core.rabbit.QueueingConsumer;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +26,12 @@ public class SystemResourceUsageRequester implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(SystemResourceUsageRequester.class);

/**
* Consumer of the queue that is used to receive responses for messages that are
* sent via the command queue and for which an answer is expected.
*/
private QueueingConsumer responseConsumer = null;

public static SystemResourceUsageRequester create(PlatformConnector connector, String sessionId) {
try {
Channel cmdChannel = connector.getFactoryForOutgoingCmdQueues().createChannel();
Expand All @@ -34,7 +40,9 @@ public static SystemResourceUsageRequester create(PlatformConnector connector, S
// if (responseQueueName == null) {
responseQueueName = incomingChannel.queueDeclare().getQueue();
// }
return new SystemResourceUsageRequester(cmdChannel, incomingChannel, responseQueueName, sessionId);
QueueingConsumer responseConsumer = new QueueingConsumer(cmdChannel);
incomingChannel.basicConsume(responseQueueName, responseConsumer);
return new SystemResourceUsageRequester(cmdChannel, incomingChannel, responseQueueName, responseConsumer, sessionId);
} catch (Exception e) {
LOGGER.error("Exception while creating SystemResourceUsageRequester. Returning null.", e);
}
Expand All @@ -46,11 +54,7 @@ public static SystemResourceUsageRequester create(PlatformConnector connector, S
* sent via the command queue and for which an answer is expected.
*/
private String responseQueueName = null;
/**
* Consumer of the queue that is used to receive responses for messages that are
* sent via the command queue and for which an answer is expected.
*/
private CustomConsumer responseConsumer = null;

/**
* Channel that is used for the command queue but not owned by this class (i.e.,
* it won't be closed).
Expand All @@ -64,9 +68,8 @@ public static SystemResourceUsageRequester create(PlatformConnector connector, S
protected Gson gson = new Gson();

protected SystemResourceUsageRequester(Channel cmdChannel, Channel incomingChannel, String responseQueueName,
String sessionId) throws IOException {
CustomConsumer responseConsumer = new CustomConsumer(cmdChannel);
incomingChannel.basicConsume(responseQueueName, responseConsumer);
QueueingConsumer responseConsumer, String sessionId) {

this.cmdChannel = cmdChannel;
this.incomingChannel = incomingChannel;
this.responseQueueName = responseQueueName;
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/hobbit/core/rabbit/DataReceiverImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected DataReceiverImpl(RabbitQueue queue, DataHandler handler, int maxParall
throws IOException {
this.queue = queue;
this.dataHandler = handler;
CustomConsumer consumer = new CustomConsumer(queue.channel);
QueueingConsumer consumer = new QueueingConsumer(queue.channel);
queue.channel.basicConsume(queue.name, true, consumer);
queue.channel.basicQos(maxParallelProcessedMsgs);
executor = Executors.newFixedThreadPool(maxParallelProcessedMsgs);
Expand Down Expand Up @@ -143,7 +143,7 @@ public static Builder builder() {
* @return a Runnable instance that will handle incoming messages as soon as it
* will be executed
*/
protected TerminatableRunnable buildMsgReceivingTask(CustomConsumer consumer) {
protected TerminatableRunnable buildMsgReceivingTask(QueueingConsumer consumer) {
return new MsgReceivingTask(consumer);
}

Expand All @@ -161,11 +161,11 @@ protected Runnable buildMsgProcessingTask(Delivery delivery) {

protected class MsgReceivingTask implements TerminatableRunnable {

private CustomConsumer consumer;
private QueueingConsumer consumer;
private boolean runFlag = true;
private boolean terminatedFlag = false;

public MsgReceivingTask(CustomConsumer consumer) {
public MsgReceivingTask(QueueingConsumer consumer) {
this.consumer = consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,32 @@
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;

public class CustomConsumer extends DefaultConsumer {
/**
* This class extends the {@link DefaultConsumer} class
* with blocking semantics. The class provides a linked blocking queue
* of {@link Delivery} class to fetch the next delivery message.
*
* @author Altaf & Sourabh
*
*/
public class QueueingConsumer extends DefaultConsumer {

private LinkedBlockingQueue<Delivery> deliveryQueue;


public CustomConsumer(Channel channel) {
public QueueingConsumer(Channel channel) {
this(channel, new LinkedBlockingQueue<Delivery>());
}

public CustomConsumer(Channel channel, LinkedBlockingQueue<Delivery> deliveryQueue) {
public QueueingConsumer(Channel channel, LinkedBlockingQueue<Delivery> deliveryQueue) {
super(channel);
this.deliveryQueue = deliveryQueue;
}
@Override

/**
* Adds the delivery object to linked blocking queue for every receive
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/hobbit/core/rabbit/SimpleFileReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,28 @@ public class SimpleFileReceiver {

protected static final long DEFAULT_TIMEOUT = 1000;

protected QueueingConsumer consumer;

public static SimpleFileReceiver create(RabbitQueueFactory factory, String queueName) throws IOException {
return create(factory.createDefaultRabbitQueue(queueName));
}

public static SimpleFileReceiver create(RabbitQueue queue) throws IOException {
CustomConsumer consumer = new CustomConsumer(queue.channel);
QueueingConsumer consumer = new QueueingConsumer(queue.channel);
queue.channel.basicConsume(queue.name, true, consumer);
queue.channel.basicQos(20);
return new SimpleFileReceiver(queue, consumer);
}

protected RabbitQueue queue;
protected CustomConsumer consumer;

protected Map<String, FileReceiveState> fileStates = new HashMap<>();
protected boolean terminated = false;
protected int errorCount = 0;
protected ExecutorService executor = Executors.newCachedThreadPool();
protected long waitingForMsgTimeout = DEFAULT_TIMEOUT;

protected SimpleFileReceiver(RabbitQueue queue, CustomConsumer consumer) {
protected SimpleFileReceiver(RabbitQueue queue, QueueingConsumer consumer) {
this.queue = queue;
this.consumer = consumer;
}
Expand Down

0 comments on commit 67f46f1

Please sign in to comment.