Skip to content

Commit

Permalink
Merge pull request #451 from Cognifide/feature/collectors-and-compara…
Browse files Browse the repository at this point in the history
…tors-configured-by-single-config-number

Collectors and comparators configured by single config number
  • Loading branch information
tkaik authored Dec 21, 2018
2 parents 91601ec + fba3756 commit 8deb8c2
Show file tree
Hide file tree
Showing 30 changed files with 389 additions and 461 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to AET will be documented in this file.

**List of changes that are finished but not yet released in any final version.**

- [PR-451](https://github.com/Cognifide/aet/pull/451) Collectors and comparators configured by single config number
- [PR-449](https://github.com/Cognifide/aet/pull/449) Improvements to the Winter Edition Theme
- [PR-354](https://github.com/Cognifide/aet/pull/354) Remove jmsEndpointConfig information from communication settings endpoint ([#352](https://github.com/Cognifide/aet/issues/352))
- [PR-412](https://github.com/Cognifide/aet/pull/412) ([PR-336](https://github.com/Cognifide/aet/pull/336), [PR-337](https://github.com/Cognifide/aet/pull/337), [PR-395](https://github.com/Cognifide/aet/pull/395)) - Added rerun functionality for suite, test and url
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* AET
*
* Copyright (C) 2013 Cognifide Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.communication.api.queues;

public enum QueuesConstant implements WorkerConfig {
COLLECTOR("collectorJobs", "collectorResults"),
COMPARATOR("comparatorJobs", "comparatorResults");

public static final String NAMESPACE = "AET.";

final String jobsQueueName;
final String resultsQueueName;

QueuesConstant(String jobsQueueName, String resultsQueueName) {
this.jobsQueueName = jobsQueueName;
this.resultsQueueName = resultsQueueName;
}

@Override
public String getJobsQueueName() {
return NAMESPACE + jobsQueueName;
}

@Override
public String getResultsQueueName() {
return NAMESPACE + resultsQueueName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* AET
*
* Copyright (C) 2013 Cognifide Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.communication.api.queues;

public interface WorkerConfig {

/**
* @return name of the jobs queue
*/
String getJobsQueueName();

/**
* @return name of the results queue name
*/
String getResultsQueueName();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.cognifide.aet.runner;

import com.cognifide.aet.communication.api.exceptions.AETException;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.configuration.MessagesManagerConf;
import java.io.IOException;
import java.util.HashSet;
Expand All @@ -25,7 +26,6 @@
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.metatype.annotations.Designate;
Expand All @@ -48,8 +48,6 @@ public class MessagesManager {

private static final String QUEUES_ATTRIBUTE = "Queues";

private static final String AET_QUEUE_DOMAIN = "AET.";

static final String DESTINATION_NAME_PROPERTY = "destinationName";

private MessagesManagerConf config;
Expand Down Expand Up @@ -83,13 +81,6 @@ public void remove(String correlationID) throws AETException {
}
}

public static String createFullQueueName(String name) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException("Queue name can't be null or empty string!");
}
return AET_QUEUE_DOMAIN + name;
}

protected Set<ObjectName> getAetQueuesObjects(MBeanServerConnection connection)
throws AETException {
ObjectName[] queues;
Expand All @@ -106,7 +97,8 @@ protected Set<ObjectName> getAetQueuesObjects(MBeanServerConnection connection)
private Set<ObjectName> filter(ObjectName[] queuesObjects) {
Set<ObjectName> queues = new HashSet<>();
for (ObjectName queueObject : queuesObjects) {
if (queueObject.getKeyProperty(DESTINATION_NAME_PROPERTY).startsWith(AET_QUEUE_DOMAIN)) {
// filters all with the AET queue namespace
if (queueObject.getKeyProperty(DESTINATION_NAME_PROPERTY).startsWith(QueuesConstant.NAMESPACE)) {
queues.add(queueObject);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.wrappers.Run;
import com.cognifide.aet.queues.JmsUtils;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService;
import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -45,11 +46,9 @@ public class RunnerMessageListener implements MessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(RunnerMessageListener.class);

private static final String API_QUEUE_IN = MessagesManager
.createFullQueueName("runner-in");
private static final String API_QUEUE_IN = QueuesConstant.NAMESPACE + "runner-in";

private static final String MAINTENANCE_QUEUE_IN = MessagesManager
.createFullQueueName("maintenance-in");
private static final String MAINTENANCE_QUEUE_IN = QueuesConstant.NAMESPACE + "maintenance-in";

private MessageConsumer inConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.wrappers.MetadataRunDecorator;
import com.cognifide.aet.communication.api.wrappers.UrlRunWrapper;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.RunnerConfiguration;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper;
import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService;
import com.cognifide.aet.runner.scheduler.MessageWithDestination;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected String getQueueInName() {

@Override
protected String getQueueOutName() {
return "AET.collectorJobs";
return QueuesConstant.COLLECTOR.getJobsQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.cognifide.aet.communication.api.metadata.Url;
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.util.ExecutionTimer;
import com.cognifide.aet.runner.MessagesManager;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.RunnerConfiguration;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper;
Expand Down Expand Up @@ -184,12 +184,12 @@ public boolean isFinished() {

@Override
protected String getQueueInName() {
return MessagesManager.createFullQueueName("collectorResults");
return QueuesConstant.COLLECTOR.getResultsQueueName();
}

@Override
protected String getQueueOutName() {
return MessagesManager.createFullQueueName("comparatorJobs");
return QueuesConstant.COMPARATOR.getJobsQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.cognifide.aet.communication.api.metadata.Url;
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.util.ExecutionTimer;
import com.cognifide.aet.runner.MessagesManager;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.RunnerConfiguration;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper;
Expand Down Expand Up @@ -150,7 +150,7 @@ public void abort() {

@Override
protected String getQueueInName() {
return MessagesManager.createFullQueueName("comparatorResults");
return QueuesConstant.COMPARATOR.getResultsQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.cognifide.aet.runner;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
Expand All @@ -41,22 +39,6 @@ public class MessagesManagerTest {
@Mock
private MessagesManagerConf config;

@Test(expected = IllegalArgumentException.class)
public void createFullQueueName_whenNameIsNull_expectException() throws Exception {
MessagesManager.createFullQueueName(null);
}

@Test(expected = IllegalArgumentException.class)
public void createFullQueueName_whenNameIsEmpty_expectException() throws Exception {
MessagesManager.createFullQueueName("");
}

@Test
public void createFullQueueName_expectFullName() throws Exception {
String fullQueueName = MessagesManager.createFullQueueName("test");
assertThat(fullQueueName, is("AET.test"));
}

@Test
public void remove_ExpectRemovingInvoked() throws Exception {
MessagesManager messagesManager = new MessagesManager();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* AET
*
* Copyright (C) 2013 Cognifide Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.worker.exceptions;

/**
* Thrown when failed to create consumer.
*/
public class ConsumerInitException extends RuntimeException {

public ConsumerInitException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,22 @@
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(
service = CollectorMessageListenerImpl.class,
immediate = true)
@Designate(ocd = CollectorMessageListenerImplConfig.class, factory = true)
public class CollectorMessageListenerImpl extends AbstractTaskMessageListener {
class CollectorMessageListener extends WorkerMessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListenerImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListener.class);

@Reference
private JmsConnection jmsConnection;
private final CollectorDispatcher dispatcher;
private final WebDriverProvider webDriverProvider;

@Reference
private CollectorDispatcher dispatcher;

@Reference
private WebDriverProvider webDriverProvider;

private CollectorMessageListenerImplConfig config;

@Activate
void activate(CollectorMessageListenerImplConfig config) {
this.config = config;
super.doActivate(config.consumerQueueName(), config.producerQueueName(), config.pf());
}

@Deactivate
void deactivate() {
super.doDeactivate();
CollectorMessageListener(String name, CollectorDispatcher dispatcher,
WebDriverProvider webDriverProvider, JmsConnection jmsConnection, String consumerQueueName,
String producerQueueName) {
super(name, jmsConnection, consumerQueueName, producerQueueName);
this.dispatcher = dispatcher;
this.webDriverProvider = webDriverProvider;
}

@Override
Expand All @@ -75,15 +55,15 @@ public void onMessage(final Message message) {
try {
collectorJobData = JmsUtils.getFromMessage(message, CollectorJobData.class);
} catch (JMSException e) {
LOGGER.error("Invalid message obtained!", e);
LOGGER.error("[{}] Invalid message obtained!", name, e);
}
String correlationId = JmsUtils.getJMSCorrelationID(message);
String requestMessageId = JmsUtils.getJMSMessageID(message);
if (collectorJobData != null && StringUtils.isNotBlank(correlationId)
&& requestMessageId != null) {
LOGGER.info(
"CollectorJobData [{}] message arrived with {} urls. CorrelationId: {} RequestMessageId: {}",
config.name(), collectorJobData.getUrls().size(), correlationId,
"[{}] CollectorJobData message arrived with {} urls. CorrelationId: {} RequestMessageId: {}",
name, collectorJobData.getUrls().size(), correlationId,
requestMessageId);
WebCommunicationWrapper webCommunicationWrapper = null;
int collected = 0;
Expand All @@ -101,7 +81,7 @@ public void onMessage(final Message message) {
} catch (WorkerException e) {
for (Url url : collectorJobData.getUrls()) {
String errorMessage = String.format(
"Couldn't process following url `%s` because of error: %s", url.getUrl(),
"[%s] Couldn't process following url `%s` because of error: %s", name, url.getUrl(),
e.getMessage());
LOGGER.error(errorMessage, e);
// updates all steps with worker exception
Expand All @@ -118,7 +98,7 @@ public void onMessage(final Message message) {
} finally {
quitWebDriver(webCommunicationWrapper);
}
LOGGER.info("Successfully collected from {}/{} urls.", collected,
LOGGER.info("[{}] Successfully collected from {}/{} urls.", name, collected,
collectorJobData.getUrls().size());
}

Expand All @@ -140,9 +120,8 @@ private int runUrls(CollectorJobData collectorJobData, String requestMessageId,
processedUrl.setCollectionStats(timer.toStatistics());
feedbackQueue.sendObjectMessageWithCorrelationID(collectorResultData, correlationId);
} catch (Exception e) {
LOGGER.error("Unrecognized collector error", e);
LOGGER.error("[{}] Unrecognized collector error", name, e);
final String message = "Unrecognized collector error: " + e.getMessage();

CollectorStepResult collectorStepProcessingError =
CollectorStepResult.newProcessingErrorResult(message);
for (Step step : url.getSteps()) {
Expand Down Expand Up @@ -175,9 +154,4 @@ private void quitWebDriver(WebCommunicationWrapper webCommunicationWrapper) {
}
}

@Override
protected JmsConnection getJmsConnection() {
return jmsConnection;
}

}
Loading

0 comments on commit 8deb8c2

Please sign in to comment.