From 8869dfdbf48fe673e9d3826d62c40d1448e71b2f Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Fri, 7 Dec 2018 15:31:07 +0100 Subject: [PATCH 01/11] worker message listeners spawned by single number --- .../exceptions/ConsumerInitException.java | 26 ++++ .../AbstractTaskMessageListener.java | 57 ------- ...mpl.java => CollectorMessageListener.java} | 63 +++----- .../CollectorMessageListenerImplConfig.java | 55 ------- ...pl.java => ComparatorMessageListener.java} | 58 +++---- .../ComparatorMessageListenerImplConfig.java | 55 ------- .../WorkersListenersFactoryConfig.java | 86 +++++++++++ .../listeners/WorkersListenersService.java | 142 ++++++++++++++++++ ...steners.CollectorMessageListenerImpl-B.cfg | 22 --- ...steners.CollectorMessageListenerImpl-C.cfg | 22 --- ...steners.CollectorMessageListenerImpl-D.cfg | 22 --- ...steners.CollectorMessageListenerImpl-E.cfg | 22 --- ...teners.ComparatorMessageListenerImpl-A.cfg | 22 --- ...teners.ComparatorMessageListenerImpl-B.cfg | 22 --- ...teners.ComparatorMessageListenerImpl-C.cfg | 22 --- ...teners.ComparatorMessageListenerImpl-D.cfg | 22 --- ...teners.ComparatorMessageListenerImpl-E.cfg | 22 --- ...ker.listeners.WorkersListenersService.cfg} | 12 +- 18 files changed, 302 insertions(+), 450 deletions(-) create mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/exceptions/ConsumerInitException.java delete mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/listeners/AbstractTaskMessageListener.java rename core/worker/src/main/java/com/cognifide/aet/worker/listeners/{CollectorMessageListenerImpl.java => CollectorMessageListener.java} (78%) delete mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImplConfig.java rename core/worker/src/main/java/com/cognifide/aet/worker/listeners/{ComparatorMessageListenerImpl.java => ComparatorMessageListener.java} (69%) delete mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImplConfig.java create mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java create mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-B.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-C.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-D.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-E.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-A.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-B.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-C.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-D.cfg delete mode 100644 osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-E.cfg rename osgi-dependencies/configs/src/main/resources/{com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-A.cfg => com.cognifide.aet.worker.listeners.WorkersListenersService.cfg} (67%) diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/exceptions/ConsumerInitException.java b/core/worker/src/main/java/com/cognifide/aet/worker/exceptions/ConsumerInitException.java new file mode 100644 index 000000000..127660b4a --- /dev/null +++ b/core/worker/src/main/java/com/cognifide/aet/worker/exceptions/ConsumerInitException.java @@ -0,0 +1,26 @@ +/** + * AET + * + * Copyright (C) 2013 Cognifide Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.cognifide.aet.worker.exceptions; + +/** + * Thrown when failed to create consumer. + */ +public class ConsumerInitException extends RuntimeException { + + public ConsumerInitException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/AbstractTaskMessageListener.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/AbstractTaskMessageListener.java deleted file mode 100644 index 5b3892a68..000000000 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/AbstractTaskMessageListener.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * AET - * - * Copyright (C) 2013 Cognifide Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.cognifide.aet.worker.listeners; - -import com.cognifide.aet.communication.api.queues.JmsConnection; -import com.cognifide.aet.queues.JmsUtils; -import com.cognifide.aet.worker.results.FeedbackQueue; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -public abstract class AbstractTaskMessageListener implements MessageListener { - - private Session jmsSession; - - private MessageConsumer consumer; - - FeedbackQueue feedbackQueue; - - void doActivate(String consumerQueueName, String producerQueueName, String prefetchSize) { - - String queueName = consumerQueueName + "?consumer.prefetchSize=" + prefetchSize; - try { - jmsSession = getJmsConnection().getJmsSession(); - consumer = jmsSession.createConsumer(jmsSession.createQueue(queueName)); - consumer.setMessageListener(this); - feedbackQueue = new FeedbackQueue(jmsSession, producerQueueName); - } catch (JMSException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - - void doDeactivate() { - if (feedbackQueue != null) { - feedbackQueue.close(); - } - JmsUtils.closeQuietly(consumer); - JmsUtils.closeQuietly(jmsSession); - } - - protected abstract JmsConnection getJmsConnection(); - -} diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImpl.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java similarity index 78% rename from core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImpl.java rename to core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java index a9c04e5fb..2f81e05d6 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImpl.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java @@ -21,52 +21,35 @@ import com.cognifide.aet.communication.api.metadata.CollectorStepResult; import com.cognifide.aet.communication.api.metadata.Step; import com.cognifide.aet.communication.api.metadata.Url; -import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.util.ExecutionTimer; import com.cognifide.aet.job.api.collector.WebCommunicationWrapper; import com.cognifide.aet.queues.JmsUtils; import com.cognifide.aet.worker.api.CollectorDispatcher; import com.cognifide.aet.worker.drivers.WebDriverProvider; import com.cognifide.aet.worker.exceptions.WorkerException; +import com.cognifide.aet.worker.results.FeedbackQueue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageListener; import org.apache.commons.lang3.StringUtils; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Deactivate; -import org.osgi.service.component.annotations.Reference; -import org.osgi.service.metatype.annotations.Designate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Component( - service = CollectorMessageListenerImpl.class, - immediate = true) -@Designate(ocd = CollectorMessageListenerImplConfig.class, factory = true) -public class CollectorMessageListenerImpl extends AbstractTaskMessageListener { +class CollectorMessageListener implements MessageListener { - private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListenerImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListener.class); - @Reference - private JmsConnection jmsConnection; + private final String name; + private final CollectorDispatcher dispatcher; + private final FeedbackQueue feedbackQueue; + private final WebDriverProvider webDriverProvider; - @Reference - private CollectorDispatcher dispatcher; - - @Reference - private WebDriverProvider webDriverProvider; - - private CollectorMessageListenerImplConfig config; - - @Activate - void activate(CollectorMessageListenerImplConfig config) { - this.config = config; - super.doActivate(config.consumerQueueName(), config.producerQueueName(), config.pf()); - } - - @Deactivate - void deactivate() { - super.doDeactivate(); + CollectorMessageListener(String name, CollectorDispatcher dispatcher, FeedbackQueue feedbackQueue, + WebDriverProvider webDriverProvider) { + this.name = name; + this.dispatcher = dispatcher; + this.feedbackQueue = feedbackQueue; + this.webDriverProvider = webDriverProvider; } @Override @@ -75,15 +58,15 @@ public void onMessage(final Message message) { try { collectorJobData = JmsUtils.getFromMessage(message, CollectorJobData.class); } catch (JMSException e) { - LOGGER.error("Invalid message obtained!", e); + LOGGER.error("[{}] Invalid message obtained!", name, e); } String correlationId = JmsUtils.getJMSCorrelationID(message); String requestMessageId = JmsUtils.getJMSMessageID(message); if (collectorJobData != null && StringUtils.isNotBlank(correlationId) && requestMessageId != null) { LOGGER.info( - "CollectorJobData [{}] message arrived with {} urls. CorrelationId: {} RequestMessageId: {}", - config.name(), collectorJobData.getUrls().size(), correlationId, + "[{}] CollectorJobData message arrived with {} urls. CorrelationId: {} RequestMessageId: {}", + name, collectorJobData.getUrls().size(), correlationId, requestMessageId); WebCommunicationWrapper webCommunicationWrapper = null; int collected = 0; @@ -101,7 +84,7 @@ public void onMessage(final Message message) { } catch (WorkerException e) { for (Url url : collectorJobData.getUrls()) { String errorMessage = String.format( - "Couldn't process following url `%s` because of error: %s", url.getUrl(), + "[%s] Couldn't process following url `%s` because of error: %s", name, url.getUrl(), e.getMessage()); LOGGER.error(errorMessage, e); // updates all steps with worker exception @@ -118,7 +101,7 @@ public void onMessage(final Message message) { } finally { quitWebDriver(webCommunicationWrapper); } - LOGGER.info("Successfully collected from {}/{} urls.", collected, + LOGGER.info("[{}] Successfully collected from {}/{} urls.", name, collected, collectorJobData.getUrls().size()); } @@ -140,9 +123,8 @@ private int runUrls(CollectorJobData collectorJobData, String requestMessageId, processedUrl.setCollectionStats(timer.toStatistics()); feedbackQueue.sendObjectMessageWithCorrelationID(collectorResultData, correlationId); } catch (Exception e) { - LOGGER.error("Unrecognized collector error", e); + LOGGER.error("[{}] Unrecognized collector error", name, e); final String message = "Unrecognized collector error: " + e.getMessage(); - CollectorStepResult collectorStepProcessingError = CollectorStepResult.newProcessingErrorResult(message); for (Step step : url.getSteps()) { @@ -175,9 +157,4 @@ private void quitWebDriver(WebCommunicationWrapper webCommunicationWrapper) { } } - @Override - protected JmsConnection getJmsConnection() { - return jmsConnection; - } - } diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImplConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImplConfig.java deleted file mode 100644 index d6f993384..000000000 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListenerImplConfig.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * AET - * - * Copyright (C) 2013 Cognifide Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.cognifide.aet.worker.listeners; - -import org.osgi.service.metatype.annotations.AttributeDefinition; -import org.osgi.service.metatype.annotations.ObjectClassDefinition; - -@ObjectClassDefinition(name = "AET Collector Message Listener") -public @interface CollectorMessageListenerImplConfig { - - String LISTENER_NAME_LABEL = "Collector name"; - String LISTENER_NAME_DESC = "Name of collector. Used in logs only"; - String LISTENER_NAME_DEFAULT_VALUE = "Collector"; - - String CONSUMER_QUEUE_NAME_LABEL = "Consumer queue name"; - String CONSUMER_QUEUE_NAME_DEFAULT_VALUE = "AET.collectorJobs"; - - String PRODUCER_QUEUE_NAME_LABEL = "Producer queue name"; - String PRODUCER_QUEUE_NAME_DEFAULT_VALUE = "AET.collectorResults"; - - String PREFETCH_SIZE_LABEL = "Prefetch size"; - String PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; - String PREFETCH_SIZE_DEFAULT_VALUE = "1"; - - @AttributeDefinition( - name = LISTENER_NAME_LABEL, - description = LISTENER_NAME_DESC) - String name() default LISTENER_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = CONSUMER_QUEUE_NAME_LABEL) - String consumerQueueName() default CONSUMER_QUEUE_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = PRODUCER_QUEUE_NAME_LABEL) - String producerQueueName() default PRODUCER_QUEUE_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = PREFETCH_SIZE_LABEL, - description = PREFETCH_SIZE_DESC) - String pf() default PREFETCH_SIZE_DEFAULT_VALUE; -} diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImpl.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java similarity index 69% rename from core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImpl.java rename to core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java index f43b5db79..36e7b10f6 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImpl.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java @@ -22,46 +22,30 @@ import com.cognifide.aet.communication.api.metadata.Comparator; import com.cognifide.aet.communication.api.metadata.ComparatorStepResult; import com.cognifide.aet.communication.api.metadata.Step; -import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.job.api.comparator.ComparatorProperties; import com.cognifide.aet.queues.JmsUtils; import com.cognifide.aet.worker.api.ComparatorDispatcher; +import com.cognifide.aet.worker.results.FeedbackQueue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageListener; import org.apache.commons.lang3.StringUtils; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Deactivate; -import org.osgi.service.component.annotations.Reference; -import org.osgi.service.metatype.annotations.Designate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Component( - service = ComparatorMessageListenerImpl.class, - immediate = true) -@Designate(ocd = ComparatorMessageListenerImplConfig.class, factory = true) -public class ComparatorMessageListenerImpl extends AbstractTaskMessageListener { +class ComparatorMessageListener implements MessageListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ComparatorMessageListenerImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ComparatorMessageListener.class); - @Reference - private JmsConnection jmsConnection; + private final String name; + private final ComparatorDispatcher dispatcher; + private final FeedbackQueue feedbackQueue; - @Reference - private ComparatorDispatcher dispatcher; - - private ComparatorMessageListenerImplConfig config; - - @Activate - void activate(ComparatorMessageListenerImplConfig config) { - this.config = config; - super.doActivate(config.consumerQueueName(), config.producerQueueName(), config.pf()); - } - - @Deactivate - void deactivate() { - super.doDeactivate(); + ComparatorMessageListener(String name, + ComparatorDispatcher dispatcher, FeedbackQueue feedbackQueue) { + this.name = name; + this.dispatcher = dispatcher; + this.feedbackQueue = feedbackQueue; } @Override @@ -76,7 +60,8 @@ public void onMessage(final Message message) { if (comparatorJobData != null && StringUtils.isNotBlank(jmsCorrelationId)) { LOGGER.info( - "ComparatorJobData [{}] message arrived. CorrelationId: {} TestName: {} UrlName: {}", + "[{}] ComparatorJobData [{}] message arrived. CorrelationId: {} TestName: {} UrlName: {}", + name, comparatorJobData, jmsCorrelationId, comparatorJobData.getTestName(), @@ -84,17 +69,19 @@ public void onMessage(final Message message) { final Step step = comparatorJobData.getStep(); final ComparatorProperties properties = new ComparatorProperties( comparatorJobData.getCompany(), - comparatorJobData.getProject(), step.getPattern(), step.getStepResult().getArtifactId(), step.getStepResult().getPayload()); + comparatorJobData.getProject(), step.getPattern(), step.getStepResult().getArtifactId(), + step.getStepResult().getPayload()); for (Comparator comparator : step.getComparators()) { - LOGGER.info("Start comparison for comparator {} in step {}", comparator, step); + LOGGER.info("[{}] Start comparison for comparator {} in step {}", name, comparator, step); ComparatorResultData.Builder resultBuilder = ComparatorResultData .newBuilder(comparatorJobData.getTestName(), comparatorJobData.getUrlName(), step.getIndex()); try { Comparator processedComparator = dispatcher.run(comparator, properties); LOGGER.info( - "Comparison successfully ended. CorrelationId: {} TestName: {} Url: {} Comparator: {}", + "[{}] Comparison successfully ended. CorrelationId: {} TestName: {} Url: {} Comparator: {}", + name, jmsCorrelationId, comparatorJobData.getTestName(), comparatorJobData.getUrlName(), @@ -102,7 +89,7 @@ public void onMessage(final Message message) { resultBuilder.withComparisonResult(processedComparator) .withStatus(JobStatus.SUCCESS); } catch (Exception e) { - LOGGER.error("Exception during compare. CorrelationId: {}", jmsCorrelationId, e); + LOGGER.error("[{}] Exception during compare. CorrelationId: {}", name, jmsCorrelationId, e); final ComparatorStepResult errorResult = new ComparatorStepResult(null, ComparatorStepResult.Status.PROCESSING_ERROR); errorResult.addError(e.getMessage()); @@ -117,9 +104,4 @@ public void onMessage(final Message message) { } } - @Override - protected JmsConnection getJmsConnection() { - return jmsConnection; - } - } diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImplConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImplConfig.java deleted file mode 100644 index 8b3f8089d..000000000 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListenerImplConfig.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * AET - * - * Copyright (C) 2013 Cognifide Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.cognifide.aet.worker.listeners; - -import org.osgi.service.metatype.annotations.AttributeDefinition; -import org.osgi.service.metatype.annotations.ObjectClassDefinition; - -@ObjectClassDefinition(name = "AET Comparator Message Listener") -public @interface ComparatorMessageListenerImplConfig { - - String LISTENER_NAME_LABEL = "Comparator name"; - String LISTENER_NAME_DESC = "Name of comparator. Used in logs only"; - String LISTENER_NAME_DEFAULT_VALUE = "Comparator"; - - String CONSUMER_QUEUE_NAME_LABEL = "Consumer queue name"; - String CONSUMER_QUEUE_NAME_DEFAULT_VALUE = "AET.comparatorJobs"; - - String PRODUCER_QUEUE_NAME_LABEL = "Producer queue name"; - String PRODUCER_QUEUE_NAME_DEFAULT_VALUE = "AET.comparatorResults"; - - String PREFETCH_SIZE_LABEL = "Prefetch size"; - String PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; - String PREFETCH_SIZE_DEFAULT_VALUE = "1"; - - @AttributeDefinition( - name = LISTENER_NAME_LABEL, - description = LISTENER_NAME_DESC) - String name() default LISTENER_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = CONSUMER_QUEUE_NAME_LABEL) - String consumerQueueName() default CONSUMER_QUEUE_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = PRODUCER_QUEUE_NAME_LABEL) - String producerQueueName() default PRODUCER_QUEUE_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = PREFETCH_SIZE_LABEL, - description = PREFETCH_SIZE_DESC) - String pf() default PREFETCH_SIZE_DEFAULT_VALUE; -} diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java new file mode 100644 index 000000000..9d26f206d --- /dev/null +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java @@ -0,0 +1,86 @@ +/** + * AET + * + * Copyright (C) 2013 Cognifide Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.cognifide.aet.worker.listeners; + +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.AttributeType; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; + +@ObjectClassDefinition(name = "AET Workers Listener Factory") +public @interface WorkersListenersFactoryConfig { + + String COLLECTORS_NO_ENV = "COLLECTORS_NO"; + String COMPARATORS_NO_ENV = "COMPARATORS_NO"; + + String COLLECTOR_CONSUMER_QUEUE_NAME_LABEL = "Collectors consumer queue name"; + String COLLECTOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE = "AET.collectorJobs"; + + String COLLECTOR_PRODUCER_QUEUE_NAME_LABEL = "Collectors producer queue name"; + String COLLECTOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE = "AET.collectorResults"; + + String COLLECTOR_PREFETCH_SIZE_LABEL = "Collectors prefetch size"; + String COLLECTOR_PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; + String COLLECTOR_PREFETCH_SIZE_DEFAULT_VALUE = "1"; + + String COLLECTOR_INSTANCES_NO_LABEL = "Number of collector instances, might be overwritten by env variable " + COLLECTORS_NO_ENV; + int COLLECTOR_INSTANCES_NO_DEFAULT_VALUE = 5; + + String COMPARATOR_CONSUMER_QUEUE_NAME_LABEL = "Comparator consumer queue name"; + String COMPARATOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE = "AET.comparatorJobs"; + + String COMPARATOR_PRODUCER_QUEUE_NAME_LABEL = "Comparator producer queue name"; + String COMPARATOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE = "AET.comparatorResults"; + + String COMPARATOR_PREFETCH_SIZE_LABEL = "Comparators prefetch size"; + String COMPARATOR_PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; + String COMPARATOR_PREFETCH_SIZE_DEFAULT_VALUE = "1"; + + String COMPARATOR_INSTANCES_NO_LABEL = "Number of comparator instances, might be overwritten by env variable " + COMPARATORS_NO_ENV; + int COMPARATOR_INSTANCES_NO_DEFAULT_VALUE = 5; + + @AttributeDefinition( + name = COLLECTOR_CONSUMER_QUEUE_NAME_LABEL) + String collectorConsumerQueueName() default COLLECTOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE; + + @AttributeDefinition( + name = COLLECTOR_PRODUCER_QUEUE_NAME_LABEL) + String collectorProducerQueueName() default COLLECTOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE; + + @AttributeDefinition( + name = COLLECTOR_PREFETCH_SIZE_LABEL, + description = COLLECTOR_PREFETCH_SIZE_DESC) + String collectorPrefetchSize() default COLLECTOR_PREFETCH_SIZE_DEFAULT_VALUE; + + @AttributeDefinition(name = COLLECTOR_INSTANCES_NO_LABEL, type = AttributeType.INTEGER) + int collectorInstancesNo() default COLLECTOR_INSTANCES_NO_DEFAULT_VALUE; + + @AttributeDefinition( + name = COMPARATOR_CONSUMER_QUEUE_NAME_LABEL) + String comparatorConsumerQueueName() default COMPARATOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE; + + @AttributeDefinition( + name = COMPARATOR_PRODUCER_QUEUE_NAME_LABEL) + String comparatorProducerQueueName() default COMPARATOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE; + + @AttributeDefinition( + name = COMPARATOR_PREFETCH_SIZE_LABEL, + description = COMPARATOR_PREFETCH_SIZE_DESC) + String comparatorPrefetchSize() default COMPARATOR_PREFETCH_SIZE_DEFAULT_VALUE; + + @AttributeDefinition(name = COMPARATOR_INSTANCES_NO_LABEL, type = AttributeType.INTEGER) + int comparatorInstancesNo() default COMPARATOR_INSTANCES_NO_DEFAULT_VALUE; + +} diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java new file mode 100644 index 000000000..04683ea9c --- /dev/null +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java @@ -0,0 +1,142 @@ +/** + * AET + * + * Copyright (C) 2013 Cognifide Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.cognifide.aet.worker.listeners; + +import com.cognifide.aet.communication.api.queues.JmsConnection; +import com.cognifide.aet.queues.JmsUtils; +import com.cognifide.aet.worker.api.CollectorDispatcher; +import com.cognifide.aet.worker.api.ComparatorDispatcher; +import com.cognifide.aet.worker.drivers.WebDriverProvider; +import com.cognifide.aet.worker.exceptions.ConsumerInitException; +import com.cognifide.aet.worker.results.FeedbackQueue; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.IntStream; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import org.apache.commons.lang3.StringUtils; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.metatype.annotations.Designate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component( + service = WorkersListenersService.class, + immediate = true) +@Designate(ocd = WorkersListenersFactoryConfig.class) +public class WorkersListenersService { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkersListenersService.class); + + @Reference + private JmsConnection jmsConnection; + + @Reference + private WebDriverProvider webDriverProvider; + + @Reference + private CollectorDispatcher collectorDispatcher; + + @Reference + private ComparatorDispatcher comparatorDispatcher; + + private WorkersListenersFactoryConfig config; + + private Session jmsSession; + private FeedbackQueue collectorFeedbackQueue; + private FeedbackQueue comparatorFeedbackQueue; + private Set consumers; + + @Activate + void activate(WorkersListenersFactoryConfig config) { + this.config = config; + try { + jmsSession = jmsConnection.getJmsSession(); + collectorFeedbackQueue = new FeedbackQueue(jmsSession, config.collectorProducerQueueName()); + comparatorFeedbackQueue = new FeedbackQueue(jmsSession, config.comparatorProducerQueueName()); + consumers = new HashSet<>(); + consumers.addAll(spawnCollectors(config)); + consumers.addAll(spawnComparators(config)); + } catch (JMSException e) { + LOGGER.error("Failed to activate WorkersListenersService", e); + throw new IllegalStateException(e.getMessage(), e); + } + } + + private Set spawnListeners(String consumerQueueName, String prefetchSize, + int noOfInstances, + Function getListenerInstance) { + final Set consumersSet = new HashSet<>(); + final String queueName = consumerQueueName + "?consumer.prefetchSize=" + prefetchSize; + IntStream.of(noOfInstances) + .forEach(no -> { + try { + MessageConsumer consumer = jmsSession.createConsumer(jmsSession.createQueue(queueName)); + MessageListener messageListener = getListenerInstance.apply(no); + consumer.setMessageListener(messageListener); + consumersSet.add(consumer); + } catch (JMSException e) { + LOGGER.error("Failed to create consumer {} for {}", no, consumerQueueName, e); + throw new ConsumerInitException( + String.format("Failed to create consumer %s for %s", no, consumerQueueName), e); + } + }); + return consumersSet; + } + + private Integer getenvOrDefault(String envVarName, int defaultValue) { + return Optional.ofNullable(System.getenv(envVarName)) + .filter(StringUtils::isNotBlank) + .map(Integer::parseInt) + .orElse(defaultValue); + } + + private Set spawnCollectors(WorkersListenersFactoryConfig config) { + return spawnListeners(config.collectorConsumerQueueName(), config.collectorPrefetchSize(), + getenvOrDefault(WorkersListenersFactoryConfig.COLLECTORS_NO_ENV, + config.collectorInstancesNo()), + no -> new CollectorMessageListener("Collector" + no, collectorDispatcher, + collectorFeedbackQueue, webDriverProvider)); + } + + private Set spawnComparators(WorkersListenersFactoryConfig config) { + return spawnListeners(config.comparatorConsumerQueueName(), config.comparatorPrefetchSize(), + getenvOrDefault(WorkersListenersFactoryConfig.COMPARATORS_NO_ENV, + config.comparatorInstancesNo()), + no -> new ComparatorMessageListener("Comparator" + no, comparatorDispatcher, + comparatorFeedbackQueue)); + } + + @Deactivate + void deactivate() { + if (collectorFeedbackQueue != null) { + collectorFeedbackQueue.close(); + } + if (comparatorFeedbackQueue != null) { + comparatorFeedbackQueue.close(); + } + consumers.forEach(JmsUtils::closeQuietly); + JmsUtils.closeQuietly(jmsSession); + } + +} diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-B.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-B.cfg deleted file mode 100644 index 502908159..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-B.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Collector2 -consumerQueueName=AET.collectorJobs -producerQueueName=AET.collectorResults -pf=1 diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-C.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-C.cfg deleted file mode 100644 index f61f21fec..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-C.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Collector3 -consumerQueueName=AET.collectorJobs -producerQueueName=AET.collectorResults -pf=1 diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-D.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-D.cfg deleted file mode 100644 index a470ac23e..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-D.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Collector4 -consumerQueueName=AET.collectorJobs -producerQueueName=AET.collectorResults -pf=1 diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-E.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-E.cfg deleted file mode 100644 index 365056125..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-E.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Collector5 -consumerQueueName=AET.collectorJobs -producerQueueName=AET.collectorResults -pf=1 diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-A.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-A.cfg deleted file mode 100644 index f0785d7f1..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-A.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Comparator1 -consumerQueueName=AET.comparatorJobs -producerQueueName=AET.comparatorResults -pf=1 \ No newline at end of file diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-B.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-B.cfg deleted file mode 100644 index f84693e0d..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-B.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Comparator2 -consumerQueueName=AET.comparatorJobs -producerQueueName=AET.comparatorResults -pf=1 \ No newline at end of file diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-C.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-C.cfg deleted file mode 100644 index 5882f53c0..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-C.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Comparator3 -consumerQueueName=AET.comparatorJobs -producerQueueName=AET.comparatorResults -pf=1 \ No newline at end of file diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-D.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-D.cfg deleted file mode 100644 index f35657964..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-D.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Comparator4 -consumerQueueName=AET.comparatorJobs -producerQueueName=AET.comparatorResults -pf=1 \ No newline at end of file diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-E.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-E.cfg deleted file mode 100644 index a7848c2ea..000000000 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.ComparatorMessageListenerImpl-E.cfg +++ /dev/null @@ -1,22 +0,0 @@ -# -# AET -# -# Copyright (C) 2013 Cognifide Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name=Comparator5 -consumerQueueName=AET.comparatorJobs -producerQueueName=AET.comparatorResults -pf=1 \ No newline at end of file diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-A.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg similarity index 67% rename from osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-A.cfg rename to osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg index 49ce73857..cf8bda1dd 100644 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.CollectorMessageListenerImpl-A.cfg +++ b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg @@ -16,7 +16,11 @@ # limitations under the License. # -name=Collector1 -consumerQueueName=AET.collectorJobs -producerQueueName=AET.collectorResults -pf=1 +collectorConsumerQueueName=AET.collectorJobs +collectorProducerQueueName=AET.collectorResults +collectorPrefetchSize=1 +collectorInstancesNo=5 +comparatorConsumerQueueName=AET.comparatorJobs +comparatorProducerQueueName=AET.comparatorResults +comparatorPrefetchSize=1 +comparatorInstancesNo=5 From 33a3f9607d0acc463a56a03ac5b19c76d512f85e Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Mon, 10 Dec 2018 13:42:56 +0100 Subject: [PATCH 02/11] Enable configuring number of workers by single number --- .../listeners/CollectorMessageListener.java | 15 ++- .../listeners/ComparatorMessageListener.java | 17 ++-- .../listeners/WorkerMessageListener.java | 85 +++++++++++++++++ .../listeners/WorkersListenersService.java | 93 +++++++------------ ...ava => WorkersListenersServiceConfig.java} | 2 +- 5 files changed, 130 insertions(+), 82 deletions(-) create mode 100644 core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkerMessageListener.java rename core/worker/src/main/java/com/cognifide/aet/worker/listeners/{WorkersListenersFactoryConfig.java => WorkersListenersServiceConfig.java} (98%) diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java index 2f81e05d6..03c6bab0b 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/CollectorMessageListener.java @@ -21,34 +21,31 @@ import com.cognifide.aet.communication.api.metadata.CollectorStepResult; import com.cognifide.aet.communication.api.metadata.Step; import com.cognifide.aet.communication.api.metadata.Url; +import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.util.ExecutionTimer; import com.cognifide.aet.job.api.collector.WebCommunicationWrapper; import com.cognifide.aet.queues.JmsUtils; import com.cognifide.aet.worker.api.CollectorDispatcher; import com.cognifide.aet.worker.drivers.WebDriverProvider; import com.cognifide.aet.worker.exceptions.WorkerException; -import com.cognifide.aet.worker.results.FeedbackQueue; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageListener; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class CollectorMessageListener implements MessageListener { +class CollectorMessageListener extends WorkerMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListener.class); - private final String name; private final CollectorDispatcher dispatcher; - private final FeedbackQueue feedbackQueue; private final WebDriverProvider webDriverProvider; - CollectorMessageListener(String name, CollectorDispatcher dispatcher, FeedbackQueue feedbackQueue, - WebDriverProvider webDriverProvider) { - this.name = name; + CollectorMessageListener(String name, CollectorDispatcher dispatcher, + WebDriverProvider webDriverProvider, JmsConnection jmsConnection, String consumerQueueName, + String producerQueueName) { + super(name, jmsConnection, consumerQueueName, producerQueueName); this.dispatcher = dispatcher; - this.feedbackQueue = feedbackQueue; this.webDriverProvider = webDriverProvider; } diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java index 36e7b10f6..c68de78e2 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/ComparatorMessageListener.java @@ -22,30 +22,26 @@ import com.cognifide.aet.communication.api.metadata.Comparator; import com.cognifide.aet.communication.api.metadata.ComparatorStepResult; import com.cognifide.aet.communication.api.metadata.Step; +import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.job.api.comparator.ComparatorProperties; import com.cognifide.aet.queues.JmsUtils; import com.cognifide.aet.worker.api.ComparatorDispatcher; -import com.cognifide.aet.worker.results.FeedbackQueue; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageListener; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class ComparatorMessageListener implements MessageListener { +class ComparatorMessageListener extends WorkerMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(ComparatorMessageListener.class); - private final String name; private final ComparatorDispatcher dispatcher; - private final FeedbackQueue feedbackQueue; - ComparatorMessageListener(String name, - ComparatorDispatcher dispatcher, FeedbackQueue feedbackQueue) { - this.name = name; + ComparatorMessageListener(String name, ComparatorDispatcher dispatcher, + JmsConnection jmsConnection, String consumerQueueName, String producerQueueName) { + super(name, jmsConnection, consumerQueueName, producerQueueName); this.dispatcher = dispatcher; - this.feedbackQueue = feedbackQueue; } @Override @@ -89,7 +85,8 @@ public void onMessage(final Message message) { resultBuilder.withComparisonResult(processedComparator) .withStatus(JobStatus.SUCCESS); } catch (Exception e) { - LOGGER.error("[{}] Exception during compare. CorrelationId: {}", name, jmsCorrelationId, e); + LOGGER + .error("[{}] Exception during compare. CorrelationId: {}", name, jmsCorrelationId, e); final ComparatorStepResult errorResult = new ComparatorStepResult(null, ComparatorStepResult.Status.PROCESSING_ERROR); errorResult.addError(e.getMessage()); diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkerMessageListener.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkerMessageListener.java new file mode 100644 index 000000000..7fa994287 --- /dev/null +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkerMessageListener.java @@ -0,0 +1,85 @@ +/** + * AET + * + * Copyright (C) 2013 Cognifide Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.cognifide.aet.worker.listeners; + +import com.cognifide.aet.communication.api.queues.JmsConnection; +import com.cognifide.aet.queues.JmsUtils; +import com.cognifide.aet.worker.results.FeedbackQueue; +import java.util.Objects; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class WorkerMessageListener implements MessageListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerMessageListener.class); + + private final Session jmsSession; + private final MessageConsumer consumer; + + final String name; + final FeedbackQueue feedbackQueue; + + WorkerMessageListener(String name, JmsConnection jmsConnection, + String consumerQueueName, + String producerQueueName) { + LOGGER.info("Starting {}", name); + this.name = name; + try { + jmsSession = jmsConnection.getJmsSession(); + feedbackQueue = new FeedbackQueue(jmsSession, producerQueueName); + consumer = jmsSession.createConsumer(jmsSession.createQueue(consumerQueueName)); + consumer.setMessageListener(this); + } catch (JMSException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + void close() { + if (feedbackQueue != null) { + feedbackQueue.close(); + } + JmsUtils.closeQuietly(consumer); + JmsUtils.closeQuietly(jmsSession); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerMessageListener that = (WorkerMessageListener) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java index 04683ea9c..9cf4da5f5 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java @@ -16,21 +16,14 @@ package com.cognifide.aet.worker.listeners; import com.cognifide.aet.communication.api.queues.JmsConnection; -import com.cognifide.aet.queues.JmsUtils; import com.cognifide.aet.worker.api.CollectorDispatcher; import com.cognifide.aet.worker.api.ComparatorDispatcher; import com.cognifide.aet.worker.drivers.WebDriverProvider; -import com.cognifide.aet.worker.exceptions.ConsumerInitException; -import com.cognifide.aet.worker.results.FeedbackQueue; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.IntStream; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; import org.apache.commons.lang3.StringUtils; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -43,7 +36,7 @@ @Component( service = WorkersListenersService.class, immediate = true) -@Designate(ocd = WorkersListenersFactoryConfig.class) +@Designate(ocd = WorkersListenersServiceConfig.class) public class WorkersListenersService { private static final Logger LOGGER = LoggerFactory.getLogger(WorkersListenersService.class); @@ -60,46 +53,24 @@ public class WorkersListenersService { @Reference private ComparatorDispatcher comparatorDispatcher; - private WorkersListenersFactoryConfig config; + private WorkersListenersServiceConfig config; - private Session jmsSession; - private FeedbackQueue collectorFeedbackQueue; - private FeedbackQueue comparatorFeedbackQueue; - private Set consumers; + private Set consumers; @Activate - void activate(WorkersListenersFactoryConfig config) { - this.config = config; - try { - jmsSession = jmsConnection.getJmsSession(); - collectorFeedbackQueue = new FeedbackQueue(jmsSession, config.collectorProducerQueueName()); - comparatorFeedbackQueue = new FeedbackQueue(jmsSession, config.comparatorProducerQueueName()); - consumers = new HashSet<>(); - consumers.addAll(spawnCollectors(config)); - consumers.addAll(spawnComparators(config)); - } catch (JMSException e) { - LOGGER.error("Failed to activate WorkersListenersService", e); - throw new IllegalStateException(e.getMessage(), e); - } + void activate(WorkersListenersServiceConfig config) { + consumers = new HashSet<>(); + consumers.addAll(spawnCollectors(config)); + consumers.addAll(spawnComparators(config)); } - private Set spawnListeners(String consumerQueueName, String prefetchSize, - int noOfInstances, - Function getListenerInstance) { - final Set consumersSet = new HashSet<>(); - final String queueName = consumerQueueName + "?consumer.prefetchSize=" + prefetchSize; - IntStream.of(noOfInstances) + private Set spawnListeners(int noOfInstances, + Function getListenerInstance) { + final Set consumersSet = new HashSet<>(); + IntStream.rangeClosed(1, noOfInstances) .forEach(no -> { - try { - MessageConsumer consumer = jmsSession.createConsumer(jmsSession.createQueue(queueName)); - MessageListener messageListener = getListenerInstance.apply(no); - consumer.setMessageListener(messageListener); - consumersSet.add(consumer); - } catch (JMSException e) { - LOGGER.error("Failed to create consumer {} for {}", no, consumerQueueName, e); - throw new ConsumerInitException( - String.format("Failed to create consumer %s for %s", no, consumerQueueName), e); - } + WorkerMessageListener listener = getListenerInstance.apply(no); + consumersSet.add(listener); }); return consumersSet; } @@ -111,32 +82,30 @@ private Integer getenvOrDefault(String envVarName, int defaultValue) { .orElse(defaultValue); } - private Set spawnCollectors(WorkersListenersFactoryConfig config) { - return spawnListeners(config.collectorConsumerQueueName(), config.collectorPrefetchSize(), - getenvOrDefault(WorkersListenersFactoryConfig.COLLECTORS_NO_ENV, - config.collectorInstancesNo()), - no -> new CollectorMessageListener("Collector" + no, collectorDispatcher, - collectorFeedbackQueue, webDriverProvider)); + private Set spawnCollectors(WorkersListenersServiceConfig config) { + final String queueName = + config.collectorConsumerQueueName() + "?consumer.prefetchSize=" + config + .collectorPrefetchSize(); + return spawnListeners(getenvOrDefault(WorkersListenersServiceConfig.COLLECTORS_NO_ENV, + config.collectorInstancesNo()), + no -> new CollectorMessageListener("Collector-" + no, collectorDispatcher, + webDriverProvider, jmsConnection, queueName, config.collectorProducerQueueName())); } - private Set spawnComparators(WorkersListenersFactoryConfig config) { - return spawnListeners(config.comparatorConsumerQueueName(), config.comparatorPrefetchSize(), - getenvOrDefault(WorkersListenersFactoryConfig.COMPARATORS_NO_ENV, - config.comparatorInstancesNo()), - no -> new ComparatorMessageListener("Comparator" + no, comparatorDispatcher, - comparatorFeedbackQueue)); + private Set spawnComparators(WorkersListenersServiceConfig config) { + final String queueName = + config.comparatorConsumerQueueName() + "?consumer.prefetchSize=" + config + .comparatorPrefetchSize(); + return spawnListeners(getenvOrDefault(WorkersListenersServiceConfig.COMPARATORS_NO_ENV, + config.comparatorInstancesNo()), + no -> new ComparatorMessageListener("Comparator-" + no, comparatorDispatcher, + jmsConnection, queueName, config.comparatorProducerQueueName())); } @Deactivate void deactivate() { - if (collectorFeedbackQueue != null) { - collectorFeedbackQueue.close(); - } - if (comparatorFeedbackQueue != null) { - comparatorFeedbackQueue.close(); - } - consumers.forEach(JmsUtils::closeQuietly); - JmsUtils.closeQuietly(jmsSession); + LOGGER.info("Closing Workers Listeners with: {}", consumers); + consumers.forEach(WorkerMessageListener::close); } } diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java similarity index 98% rename from core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java rename to core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java index 9d26f206d..784f66914 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersFactoryConfig.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java @@ -20,7 +20,7 @@ import org.osgi.service.metatype.annotations.ObjectClassDefinition; @ObjectClassDefinition(name = "AET Workers Listener Factory") -public @interface WorkersListenersFactoryConfig { +public @interface WorkersListenersServiceConfig { String COLLECTORS_NO_ENV = "COLLECTORS_NO"; String COMPARATORS_NO_ENV = "COMPARATORS_NO"; From 95768a1040301121273d295f65bcefa24bd898c7 Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Mon, 10 Dec 2018 14:58:30 +0100 Subject: [PATCH 03/11] Unify workers constants --- .../cognifide/aet/queues/QueuesConstant.java | 40 +++++++++++++++++++ .../cognifide/aet/queues/WorkerConfig.java | 30 ++++++++++++++ .../processing/steps/CollectDispatcher.java | 5 ++- .../steps/CollectionResultsRouter.java | 6 +-- .../steps/ComparisonResultsRouter.java | 4 +- .../listeners/WorkersListenersService.java | 13 +++--- .../WorkersListenersServiceConfig.java | 28 ------------- ...rker.listeners.WorkersListenersService.cfg | 4 -- 8 files changed, 85 insertions(+), 45 deletions(-) create mode 100644 core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java create mode 100644 core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java diff --git a/core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java b/core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java new file mode 100644 index 000000000..c725ee418 --- /dev/null +++ b/core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java @@ -0,0 +1,40 @@ +/** + * AET + * + * Copyright (C) 2013 Cognifide Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.cognifide.aet.queues; + +public enum QueuesConstant implements WorkerConfig { + COLLECTOR("collectorJobs", "collectorResults"), + COMPARATOR("comparatorJobs", "comparatorResults"); + + final String jobsQueueName; + final String resultsQueueName; + private static final String NAMESPACE = "AET."; + + QueuesConstant(String jobsQueueName, String resultsQueueName) { + this.jobsQueueName = jobsQueueName; + this.resultsQueueName = resultsQueueName; + } + + @Override + public String getJobsQueueName() { + return NAMESPACE + jobsQueueName; + } + + @Override + public String getResultsQueueName() { + return NAMESPACE + resultsQueueName; + } +} diff --git a/core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java b/core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java new file mode 100644 index 000000000..97cc001c7 --- /dev/null +++ b/core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java @@ -0,0 +1,30 @@ +/** + * AET + * + * Copyright (C) 2013 Cognifide Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.cognifide.aet.queues; + +public interface WorkerConfig { + + /** + * @return name of the jobs queue + */ + String getJobsQueueName(); + + /** + * @return name of the results queue name + */ + String getResultsQueueName(); + +} diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java index 68fb1b214..d2293b353 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java @@ -21,11 +21,12 @@ import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.wrappers.MetadataRunDecorator; import com.cognifide.aet.communication.api.wrappers.UrlRunWrapper; +import com.cognifide.aet.queues.QueuesConstant; import com.cognifide.aet.runner.RunnerConfiguration; +import com.cognifide.aet.runner.processing.TimeoutWatch; import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper; import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService; import com.cognifide.aet.runner.scheduler.MessageWithDestination; -import com.cognifide.aet.runner.processing.TimeoutWatch; import com.google.common.collect.Queues; import java.util.ArrayList; import java.util.Collections; @@ -105,7 +106,7 @@ protected String getQueueInName() { @Override protected String getQueueOutName() { - return "AET.collectorJobs"; + return QueuesConstant.COLLECTOR.getJobsQueueName(); } @Override diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java index 7dd844d21..cbb4e5a3f 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java @@ -25,7 +25,7 @@ import com.cognifide.aet.communication.api.metadata.Url; import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.util.ExecutionTimer; -import com.cognifide.aet.runner.MessagesManager; +import com.cognifide.aet.queues.QueuesConstant; import com.cognifide.aet.runner.RunnerConfiguration; import com.cognifide.aet.runner.processing.TimeoutWatch; import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper; @@ -184,12 +184,12 @@ public boolean isFinished() { @Override protected String getQueueInName() { - return MessagesManager.createFullQueueName("collectorResults"); + return QueuesConstant.COLLECTOR.getResultsQueueName(); } @Override protected String getQueueOutName() { - return MessagesManager.createFullQueueName("comparatorJobs"); + return QueuesConstant.COMPARATOR.getJobsQueueName(); } @Override diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java index c2349a1b1..6f5055041 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java @@ -25,7 +25,7 @@ import com.cognifide.aet.communication.api.metadata.Url; import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.util.ExecutionTimer; -import com.cognifide.aet.runner.MessagesManager; +import com.cognifide.aet.queues.QueuesConstant; import com.cognifide.aet.runner.RunnerConfiguration; import com.cognifide.aet.runner.processing.TimeoutWatch; import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper; @@ -150,7 +150,7 @@ public void abort() { @Override protected String getQueueInName() { - return MessagesManager.createFullQueueName("comparatorResults"); + return QueuesConstant.COMPARATOR.getResultsQueueName(); } @Override diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java index 9cf4da5f5..b19406074 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java @@ -16,6 +16,7 @@ package com.cognifide.aet.worker.listeners; import com.cognifide.aet.communication.api.queues.JmsConnection; +import com.cognifide.aet.queues.QueuesConstant; import com.cognifide.aet.worker.api.CollectorDispatcher; import com.cognifide.aet.worker.api.ComparatorDispatcher; import com.cognifide.aet.worker.drivers.WebDriverProvider; @@ -53,12 +54,11 @@ public class WorkersListenersService { @Reference private ComparatorDispatcher comparatorDispatcher; - private WorkersListenersServiceConfig config; - private Set consumers; @Activate void activate(WorkersListenersServiceConfig config) { + LOGGER.info("Starting Workers Listeners Service with {}", config); consumers = new HashSet<>(); consumers.addAll(spawnCollectors(config)); consumers.addAll(spawnComparators(config)); @@ -84,22 +84,23 @@ private Integer getenvOrDefault(String envVarName, int defaultValue) { private Set spawnCollectors(WorkersListenersServiceConfig config) { final String queueName = - config.collectorConsumerQueueName() + "?consumer.prefetchSize=" + config + QueuesConstant.COLLECTOR.getJobsQueueName() + "?consumer.prefetchSize=" + config .collectorPrefetchSize(); return spawnListeners(getenvOrDefault(WorkersListenersServiceConfig.COLLECTORS_NO_ENV, config.collectorInstancesNo()), no -> new CollectorMessageListener("Collector-" + no, collectorDispatcher, - webDriverProvider, jmsConnection, queueName, config.collectorProducerQueueName())); + webDriverProvider, jmsConnection, queueName, + QueuesConstant.COLLECTOR.getResultsQueueName())); } private Set spawnComparators(WorkersListenersServiceConfig config) { final String queueName = - config.comparatorConsumerQueueName() + "?consumer.prefetchSize=" + config + QueuesConstant.COMPARATOR.getJobsQueueName() + "?consumer.prefetchSize=" + config .comparatorPrefetchSize(); return spawnListeners(getenvOrDefault(WorkersListenersServiceConfig.COMPARATORS_NO_ENV, config.comparatorInstancesNo()), no -> new ComparatorMessageListener("Comparator-" + no, comparatorDispatcher, - jmsConnection, queueName, config.comparatorProducerQueueName())); + jmsConnection, queueName, QueuesConstant.COMPARATOR.getResultsQueueName())); } @Deactivate diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java index 784f66914..dd1340eae 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java @@ -25,12 +25,6 @@ String COLLECTORS_NO_ENV = "COLLECTORS_NO"; String COMPARATORS_NO_ENV = "COMPARATORS_NO"; - String COLLECTOR_CONSUMER_QUEUE_NAME_LABEL = "Collectors consumer queue name"; - String COLLECTOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE = "AET.collectorJobs"; - - String COLLECTOR_PRODUCER_QUEUE_NAME_LABEL = "Collectors producer queue name"; - String COLLECTOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE = "AET.collectorResults"; - String COLLECTOR_PREFETCH_SIZE_LABEL = "Collectors prefetch size"; String COLLECTOR_PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; String COLLECTOR_PREFETCH_SIZE_DEFAULT_VALUE = "1"; @@ -38,12 +32,6 @@ String COLLECTOR_INSTANCES_NO_LABEL = "Number of collector instances, might be overwritten by env variable " + COLLECTORS_NO_ENV; int COLLECTOR_INSTANCES_NO_DEFAULT_VALUE = 5; - String COMPARATOR_CONSUMER_QUEUE_NAME_LABEL = "Comparator consumer queue name"; - String COMPARATOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE = "AET.comparatorJobs"; - - String COMPARATOR_PRODUCER_QUEUE_NAME_LABEL = "Comparator producer queue name"; - String COMPARATOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE = "AET.comparatorResults"; - String COMPARATOR_PREFETCH_SIZE_LABEL = "Comparators prefetch size"; String COMPARATOR_PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; String COMPARATOR_PREFETCH_SIZE_DEFAULT_VALUE = "1"; @@ -51,14 +39,6 @@ String COMPARATOR_INSTANCES_NO_LABEL = "Number of comparator instances, might be overwritten by env variable " + COMPARATORS_NO_ENV; int COMPARATOR_INSTANCES_NO_DEFAULT_VALUE = 5; - @AttributeDefinition( - name = COLLECTOR_CONSUMER_QUEUE_NAME_LABEL) - String collectorConsumerQueueName() default COLLECTOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = COLLECTOR_PRODUCER_QUEUE_NAME_LABEL) - String collectorProducerQueueName() default COLLECTOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE; - @AttributeDefinition( name = COLLECTOR_PREFETCH_SIZE_LABEL, description = COLLECTOR_PREFETCH_SIZE_DESC) @@ -67,14 +47,6 @@ @AttributeDefinition(name = COLLECTOR_INSTANCES_NO_LABEL, type = AttributeType.INTEGER) int collectorInstancesNo() default COLLECTOR_INSTANCES_NO_DEFAULT_VALUE; - @AttributeDefinition( - name = COMPARATOR_CONSUMER_QUEUE_NAME_LABEL) - String comparatorConsumerQueueName() default COMPARATOR_CONSUMER_QUEUE_NAME_DEFAULT_VALUE; - - @AttributeDefinition( - name = COMPARATOR_PRODUCER_QUEUE_NAME_LABEL) - String comparatorProducerQueueName() default COMPARATOR_PRODUCER_QUEUE_NAME_DEFAULT_VALUE; - @AttributeDefinition( name = COMPARATOR_PREFETCH_SIZE_LABEL, description = COMPARATOR_PREFETCH_SIZE_DESC) diff --git a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg index cf8bda1dd..6ba804e2a 100644 --- a/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg +++ b/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg @@ -16,11 +16,7 @@ # limitations under the License. # -collectorConsumerQueueName=AET.collectorJobs -collectorProducerQueueName=AET.collectorResults collectorPrefetchSize=1 collectorInstancesNo=5 -comparatorConsumerQueueName=AET.comparatorJobs -comparatorProducerQueueName=AET.comparatorResults comparatorPrefetchSize=1 comparatorInstancesNo=5 From e8522e14bac37f5987dba67d830b3d9bc10fdeeb Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Mon, 10 Dec 2018 15:33:23 +0100 Subject: [PATCH 04/11] Update docs --- .../WorkersListenersServiceConfig.java | 14 ++++++--- documentation/src/main/wiki/AdvancedSetup.md | 29 ++++++++----------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java index dd1340eae..e5334fa4d 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java @@ -29,14 +29,16 @@ String COLLECTOR_PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; String COLLECTOR_PREFETCH_SIZE_DEFAULT_VALUE = "1"; - String COLLECTOR_INSTANCES_NO_LABEL = "Number of collector instances, might be overwritten by env variable " + COLLECTORS_NO_ENV; + String COLLECTOR_INSTANCES_NO_LABEL = "Number of collector instances"; + String COLLECTOR_INSTANCES_NO_DESC = "Might be overwritten by env variable" + COLLECTORS_NO_ENV; int COLLECTOR_INSTANCES_NO_DEFAULT_VALUE = 5; String COMPARATOR_PREFETCH_SIZE_LABEL = "Comparators prefetch size"; String COMPARATOR_PREFETCH_SIZE_DESC = "http://activemq.apache.org/what-is-the-prefetch-limit-for.html"; String COMPARATOR_PREFETCH_SIZE_DEFAULT_VALUE = "1"; - String COMPARATOR_INSTANCES_NO_LABEL = "Number of comparator instances, might be overwritten by env variable " + COMPARATORS_NO_ENV; + String COMPARATOR_INSTANCES_NO_LABEL = "Number of comparator instances"; + String COMPARATOR_INSTANCES_NO_DESC = "Might be overwritten by env variable" + COMPARATORS_NO_ENV; int COMPARATOR_INSTANCES_NO_DEFAULT_VALUE = 5; @AttributeDefinition( @@ -44,7 +46,9 @@ description = COLLECTOR_PREFETCH_SIZE_DESC) String collectorPrefetchSize() default COLLECTOR_PREFETCH_SIZE_DEFAULT_VALUE; - @AttributeDefinition(name = COLLECTOR_INSTANCES_NO_LABEL, type = AttributeType.INTEGER) + @AttributeDefinition(name = COLLECTOR_INSTANCES_NO_LABEL, + description = COLLECTOR_INSTANCES_NO_DESC, + type = AttributeType.INTEGER) int collectorInstancesNo() default COLLECTOR_INSTANCES_NO_DEFAULT_VALUE; @AttributeDefinition( @@ -52,7 +56,9 @@ description = COMPARATOR_PREFETCH_SIZE_DESC) String comparatorPrefetchSize() default COMPARATOR_PREFETCH_SIZE_DEFAULT_VALUE; - @AttributeDefinition(name = COMPARATOR_INSTANCES_NO_LABEL, type = AttributeType.INTEGER) + @AttributeDefinition(name = COMPARATOR_INSTANCES_NO_LABEL, + description = COMPARATOR_INSTANCES_NO_DESC, + type = AttributeType.INTEGER) int comparatorInstancesNo() default COMPARATOR_INSTANCES_NO_DEFAULT_VALUE; } diff --git a/documentation/src/main/wiki/AdvancedSetup.md b/documentation/src/main/wiki/AdvancedSetup.md index 529c711db..6464019cf 100644 --- a/documentation/src/main/wiki/AdvancedSetup.md +++ b/documentation/src/main/wiki/AdvancedSetup.md @@ -63,23 +63,18 @@ of Apache server which hosts the AET Report application - e.g. `http://aet-repor ##### Collectors and comparators configuration - The services are **AET Collector Message Listener** and **AET Comparator Message Listener**. There must be at least one of each of those services configured. Below there are listed the properties of each of above mentioned services with required values. - -###### AET Collector Message Listener - -| Property name | Value | -| ------------- | ----- | -| Collector name | Has to be unique within Collector Message Listeners. | -| Consumer queue name | Fixed value `AET.collectorJobs` | -| Producer queue name | Fixed value `AET.collectorResults` | -| Embedded Proxy Server Port | Has to be unique within Collector Message Listeners. | - -###### AET Comparator Message Listener -| Property name | Value | -| ------------- | ----- | -| Comparator name | Has to be unique within Comparator Message Listeners. | -| Consumer queue name | Fixed value `AET.comparatorJobs` | -| Producer queue name | Fixed value `AET.comparatorResults` | +The service is **AET Workers Listeners Service**. +To enable proper working of AET instance, you should configure at least 1 collector and 1 comparator. + +| Property name | Default value | Comment | +| ------------- | ----- | ----- | +| Number of collector instances | `6` | Might be overwritten by env variable `COLLECTORS_NO` | +| Collectors prefetch size | `1` | Read more [here](http://activemq.apache.org/what-is-the-prefetch-limit-for.html) | +| Number of comparator instances | `5` | Might be overwritten by env variable `COMPARATORS_NO` | +| Comparators prefetch size | `1` | Read more [here](http://activemq.apache.org/what-is-the-prefetch-limit-for.html) | + +> **Important note** +> Number of collector instances should be the number of browsers available through all Selenium Grid Nodes. ##### Chrome options configuration From a3b9424903bfade39dbb9b8c1bfe57189e715fa7 Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Mon, 10 Dec 2018 15:51:51 +0100 Subject: [PATCH 05/11] Update upgrade notes --- documentation/src/main/wiki/UpgradeNotes.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/documentation/src/main/wiki/UpgradeNotes.md b/documentation/src/main/wiki/UpgradeNotes.md index 1a9bed72e..7ad5886be 100644 --- a/documentation/src/main/wiki/UpgradeNotes.md +++ b/documentation/src/main/wiki/UpgradeNotes.md @@ -7,6 +7,16 @@ You may see all changes in the [Changelog](https://github.com/Cognifide/aet/blob ## Unreleased +### Users + +### Admins + +#### [PR-451](https://github.com/Cognifide/aet/pull/451) Collectors and comparators configured by single config number + +Remove all `CollectorMessageListenerImpl` and `ComparatorMessageListenerImpl` config files. +Create `com.cognifide.aet.worker.listeners.WorkersListenersService.cfg` and configure proper +number of collectors and comparators there. + ## Version 3.0.0 From 680f5ad0897356cdc418a549d1c50b36e00213cc Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Mon, 10 Dec 2018 15:52:18 +0100 Subject: [PATCH 06/11] Updated changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 03c18f46f..6894b266a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to AET will be documented in this file. **List of changes that are finished but not yet released in any final version.** +- [PR-451](https://github.com/Cognifide/aet/pull/451) Collectors and comparators configured by single config number - [PR-449](https://github.com/Cognifide/aet/pull/449) Improvements to the Winter Edition Theme - [PR-354](https://github.com/Cognifide/aet/pull/354) Remove jmsEndpointConfig information from communication settings endpoint ([#352](https://github.com/Cognifide/aet/issues/352)) - [PR-412](https://github.com/Cognifide/aet/pull/412) ([PR-336](https://github.com/Cognifide/aet/pull/336), [PR-337](https://github.com/Cognifide/aet/pull/337), [PR-395](https://github.com/Cognifide/aet/pull/395)) - Added rerun functionality for suite, test and url From 18fc957c0def545a8a8973aef8a2b15f408a9376 Mon Sep 17 00:00:00 2001 From: Wiiitek Date: Tue, 11 Dec 2018 23:35:09 +0100 Subject: [PATCH 07/11] refactor queue constants and namespace --- api/communication-api/pom.xml | 2 +- .../api}/queues/QueuesConstant.java | 5 +++-- .../api}/queues/WorkerConfig.java | 2 +- api/datastorage-api/pom.xml | 2 +- api/jobs-api/pom.xml | 2 +- api/pom.xml | 2 +- api/validation-api/pom.xml | 2 +- client/aet-maven-plugin/pom.xml | 2 +- client/client-core/pom.xml | 2 +- client/pom.xml | 2 +- core/cleaner/pom.xml | 2 +- core/communication/pom.xml | 2 +- core/datastorage/pom.xml | 2 +- core/jobs/pom.xml | 2 +- core/pom.xml | 2 +- core/runner/pom.xml | 2 +- .../cognifide/aet/runner/MessagesManager.java | 14 +++----------- .../aet/runner/RunnerMessageListener.java | 7 +++---- .../processing/steps/CollectDispatcher.java | 2 +- .../steps/CollectionResultsRouter.java | 2 +- .../steps/ComparisonResultsRouter.java | 2 +- .../aet/runner/MessagesManagerTest.java | 18 ------------------ core/validation/pom.xml | 2 +- core/worker/pom.xml | 2 +- .../listeners/WorkersListenersService.java | 2 +- documentation/pom.xml | 2 +- integration-tests/sample-site/pom.xml | 2 +- integration-tests/test-suite/pom.xml | 2 +- osgi-dependencies/configs/pom.xml | 2 +- osgi-dependencies/pom.xml | 2 +- osgi-dependencies/proxy/pom.xml | 2 +- osgi-dependencies/selenium/pom.xml | 2 +- osgi-dependencies/w3chtml5validator/pom.xml | 2 +- pom.xml | 2 +- report/pom.xml | 2 +- rest-endpoint/pom.xml | 2 +- test-executor/pom.xml | 2 +- .../cognifide/aet/executor/SuiteExecutor.java | 3 ++- zip/pom.xml | 2 +- 39 files changed, 45 insertions(+), 70 deletions(-) rename {core/communication/src/main/java/com/cognifide/aet => api/communication-api/src/main/java/com/cognifide/aet/communication/api}/queues/QueuesConstant.java (92%) rename {core/communication/src/main/java/com/cognifide/aet => api/communication-api/src/main/java/com/cognifide/aet/communication/api}/queues/WorkerConfig.java (93%) diff --git a/api/communication-api/pom.xml b/api/communication-api/pom.xml index 8251446be..e7f39ce09 100644 --- a/api/communication-api/pom.xml +++ b/api/communication-api/pom.xml @@ -24,7 +24,7 @@ api com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT communication-api diff --git a/core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java b/api/communication-api/src/main/java/com/cognifide/aet/communication/api/queues/QueuesConstant.java similarity index 92% rename from core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java rename to api/communication-api/src/main/java/com/cognifide/aet/communication/api/queues/QueuesConstant.java index c725ee418..36de1d741 100644 --- a/core/communication/src/main/java/com/cognifide/aet/queues/QueuesConstant.java +++ b/api/communication-api/src/main/java/com/cognifide/aet/communication/api/queues/QueuesConstant.java @@ -13,15 +13,16 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package com.cognifide.aet.queues; +package com.cognifide.aet.communication.api.queues; public enum QueuesConstant implements WorkerConfig { COLLECTOR("collectorJobs", "collectorResults"), COMPARATOR("comparatorJobs", "comparatorResults"); + public static final String NAMESPACE = "AET."; + final String jobsQueueName; final String resultsQueueName; - private static final String NAMESPACE = "AET."; QueuesConstant(String jobsQueueName, String resultsQueueName) { this.jobsQueueName = jobsQueueName; diff --git a/core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java b/api/communication-api/src/main/java/com/cognifide/aet/communication/api/queues/WorkerConfig.java similarity index 93% rename from core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java rename to api/communication-api/src/main/java/com/cognifide/aet/communication/api/queues/WorkerConfig.java index 97cc001c7..43329dd70 100644 --- a/core/communication/src/main/java/com/cognifide/aet/queues/WorkerConfig.java +++ b/api/communication-api/src/main/java/com/cognifide/aet/communication/api/queues/WorkerConfig.java @@ -13,7 +13,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package com.cognifide.aet.queues; +package com.cognifide.aet.communication.api.queues; public interface WorkerConfig { diff --git a/api/datastorage-api/pom.xml b/api/datastorage-api/pom.xml index 368a413e6..8cf1b55f8 100644 --- a/api/datastorage-api/pom.xml +++ b/api/datastorage-api/pom.xml @@ -24,7 +24,7 @@ api com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT datastorage-api diff --git a/api/jobs-api/pom.xml b/api/jobs-api/pom.xml index 40d3ba7c0..812b27b5e 100644 --- a/api/jobs-api/pom.xml +++ b/api/jobs-api/pom.xml @@ -24,7 +24,7 @@ api com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT jobs-api diff --git a/api/pom.xml b/api/pom.xml index 4eaff1a3b..c1704e244 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT api diff --git a/api/validation-api/pom.xml b/api/validation-api/pom.xml index b0a86b4f7..ea16425f8 100644 --- a/api/validation-api/pom.xml +++ b/api/validation-api/pom.xml @@ -24,7 +24,7 @@ api com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT validation-api diff --git a/client/aet-maven-plugin/pom.xml b/client/aet-maven-plugin/pom.xml index 5f510c860..13b39fa21 100644 --- a/client/aet-maven-plugin/pom.xml +++ b/client/aet-maven-plugin/pom.xml @@ -24,7 +24,7 @@ client com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT aet-maven-plugin diff --git a/client/client-core/pom.xml b/client/client-core/pom.xml index 4aa5b88da..46020733e 100644 --- a/client/client-core/pom.xml +++ b/client/client-core/pom.xml @@ -24,7 +24,7 @@ client com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT client-core diff --git a/client/pom.xml b/client/pom.xml index e63221418..7bf7962a6 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT client diff --git a/core/cleaner/pom.xml b/core/cleaner/pom.xml index f51377427..635c0b1b9 100644 --- a/core/cleaner/pom.xml +++ b/core/cleaner/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT cleaner diff --git a/core/communication/pom.xml b/core/communication/pom.xml index 2d088dba5..6d3d1a02b 100644 --- a/core/communication/pom.xml +++ b/core/communication/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT communication diff --git a/core/datastorage/pom.xml b/core/datastorage/pom.xml index d93226fb2..bea369adf 100644 --- a/core/datastorage/pom.xml +++ b/core/datastorage/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT datastorage diff --git a/core/jobs/pom.xml b/core/jobs/pom.xml index 8ef6359c0..77bff8c7d 100644 --- a/core/jobs/pom.xml +++ b/core/jobs/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT jobs diff --git a/core/pom.xml b/core/pom.xml index 8af9e42b4..419a6a560 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT core diff --git a/core/runner/pom.xml b/core/runner/pom.xml index 093c0f716..1dc5ab862 100644 --- a/core/runner/pom.xml +++ b/core/runner/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT runner diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/MessagesManager.java b/core/runner/src/main/java/com/cognifide/aet/runner/MessagesManager.java index 1fd752e43..03f48cbf5 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/MessagesManager.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/MessagesManager.java @@ -16,6 +16,7 @@ package com.cognifide.aet.runner; import com.cognifide.aet.communication.api.exceptions.AETException; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.runner.configuration.MessagesManagerConf; import java.io.IOException; import java.util.HashSet; @@ -25,7 +26,6 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; -import org.apache.commons.lang3.StringUtils; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.metatype.annotations.Designate; @@ -48,8 +48,6 @@ public class MessagesManager { private static final String QUEUES_ATTRIBUTE = "Queues"; - private static final String AET_QUEUE_DOMAIN = "AET."; - static final String DESTINATION_NAME_PROPERTY = "destinationName"; private MessagesManagerConf config; @@ -83,13 +81,6 @@ public void remove(String correlationID) throws AETException { } } - public static String createFullQueueName(String name) { - if (StringUtils.isBlank(name)) { - throw new IllegalArgumentException("Queue name can't be null or empty string!"); - } - return AET_QUEUE_DOMAIN + name; - } - protected Set getAetQueuesObjects(MBeanServerConnection connection) throws AETException { ObjectName[] queues; @@ -106,7 +97,8 @@ protected Set getAetQueuesObjects(MBeanServerConnection connection) private Set filter(ObjectName[] queuesObjects) { Set queues = new HashSet<>(); for (ObjectName queueObject : queuesObjects) { - if (queueObject.getKeyProperty(DESTINATION_NAME_PROPERTY).startsWith(AET_QUEUE_DOMAIN)) { + // filters all with the AET queue namespace + if (queueObject.getKeyProperty(DESTINATION_NAME_PROPERTY).startsWith(QueuesConstant.NAMESPACE)) { queues.add(queueObject); } } diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java b/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java index 1e3a06de5..43c60372c 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java @@ -22,6 +22,7 @@ import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.wrappers.Run; import com.cognifide.aet.queues.JmsUtils; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService; import javax.jms.Destination; import javax.jms.JMSException; @@ -45,11 +46,9 @@ public class RunnerMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(RunnerMessageListener.class); - private static final String API_QUEUE_IN = MessagesManager - .createFullQueueName("runner-in"); + private static final String API_QUEUE_IN = QueuesConstant.NAMESPACE + ".runner-in"; - private static final String MAINTENANCE_QUEUE_IN = MessagesManager - .createFullQueueName("maintenance-in"); + private static final String MAINTENANCE_QUEUE_IN = QueuesConstant.NAMESPACE + "maintenance-in"; private MessageConsumer inConsumer; diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java index d2293b353..51db09021 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectDispatcher.java @@ -21,7 +21,7 @@ import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.wrappers.MetadataRunDecorator; import com.cognifide.aet.communication.api.wrappers.UrlRunWrapper; -import com.cognifide.aet.queues.QueuesConstant; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.runner.RunnerConfiguration; import com.cognifide.aet.runner.processing.TimeoutWatch; import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper; diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java index cbb4e5a3f..fd16cfd6c 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/CollectionResultsRouter.java @@ -25,7 +25,7 @@ import com.cognifide.aet.communication.api.metadata.Url; import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.util.ExecutionTimer; -import com.cognifide.aet.queues.QueuesConstant; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.runner.RunnerConfiguration; import com.cognifide.aet.runner.processing.TimeoutWatch; import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper; diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java index 6f5055041..cf3d19892 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/processing/steps/ComparisonResultsRouter.java @@ -25,7 +25,7 @@ import com.cognifide.aet.communication.api.metadata.Url; import com.cognifide.aet.communication.api.queues.JmsConnection; import com.cognifide.aet.communication.api.util.ExecutionTimer; -import com.cognifide.aet.queues.QueuesConstant; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.runner.RunnerConfiguration; import com.cognifide.aet.runner.processing.TimeoutWatch; import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper; diff --git a/core/runner/src/test/java/com/cognifide/aet/runner/MessagesManagerTest.java b/core/runner/src/test/java/com/cognifide/aet/runner/MessagesManagerTest.java index 43efece64..df0a19a22 100644 --- a/core/runner/src/test/java/com/cognifide/aet/runner/MessagesManagerTest.java +++ b/core/runner/src/test/java/com/cognifide/aet/runner/MessagesManagerTest.java @@ -15,8 +15,6 @@ */ package com.cognifide.aet.runner; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; @@ -41,22 +39,6 @@ public class MessagesManagerTest { @Mock private MessagesManagerConf config; - @Test(expected = IllegalArgumentException.class) - public void createFullQueueName_whenNameIsNull_expectException() throws Exception { - MessagesManager.createFullQueueName(null); - } - - @Test(expected = IllegalArgumentException.class) - public void createFullQueueName_whenNameIsEmpty_expectException() throws Exception { - MessagesManager.createFullQueueName(""); - } - - @Test - public void createFullQueueName_expectFullName() throws Exception { - String fullQueueName = MessagesManager.createFullQueueName("test"); - assertThat(fullQueueName, is("AET.test")); - } - @Test public void remove_ExpectRemovingInvoked() throws Exception { MessagesManager messagesManager = new MessagesManager(); diff --git a/core/validation/pom.xml b/core/validation/pom.xml index 9eb039ee3..09f25dc6b 100644 --- a/core/validation/pom.xml +++ b/core/validation/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT validation diff --git a/core/worker/pom.xml b/core/worker/pom.xml index fde635fcf..ca82dbb44 100644 --- a/core/worker/pom.xml +++ b/core/worker/pom.xml @@ -24,7 +24,7 @@ core com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT worker diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java index b19406074..735dc0ae2 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersService.java @@ -16,7 +16,7 @@ package com.cognifide.aet.worker.listeners; import com.cognifide.aet.communication.api.queues.JmsConnection; -import com.cognifide.aet.queues.QueuesConstant; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.worker.api.CollectorDispatcher; import com.cognifide.aet.worker.api.ComparatorDispatcher; import com.cognifide.aet.worker.drivers.WebDriverProvider; diff --git a/documentation/pom.xml b/documentation/pom.xml index 5f97da63f..0fc7b577d 100644 --- a/documentation/pom.xml +++ b/documentation/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT documentation diff --git a/integration-tests/sample-site/pom.xml b/integration-tests/sample-site/pom.xml index 6324df3aa..d29fdcd05 100644 --- a/integration-tests/sample-site/pom.xml +++ b/integration-tests/sample-site/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT ../.. diff --git a/integration-tests/test-suite/pom.xml b/integration-tests/test-suite/pom.xml index bd5b7cad9..2ae90039c 100644 --- a/integration-tests/test-suite/pom.xml +++ b/integration-tests/test-suite/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT ../.. diff --git a/osgi-dependencies/configs/pom.xml b/osgi-dependencies/configs/pom.xml index 9ee1d1650..999880ec1 100644 --- a/osgi-dependencies/configs/pom.xml +++ b/osgi-dependencies/configs/pom.xml @@ -24,7 +24,7 @@ osgi-dependencies com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT configs diff --git a/osgi-dependencies/pom.xml b/osgi-dependencies/pom.xml index d38985ce5..1378388ab 100644 --- a/osgi-dependencies/pom.xml +++ b/osgi-dependencies/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT osgi-dependencies diff --git a/osgi-dependencies/proxy/pom.xml b/osgi-dependencies/proxy/pom.xml index f5781a8c5..2f0e2c5ce 100644 --- a/osgi-dependencies/proxy/pom.xml +++ b/osgi-dependencies/proxy/pom.xml @@ -24,7 +24,7 @@ osgi-dependencies com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT proxy diff --git a/osgi-dependencies/selenium/pom.xml b/osgi-dependencies/selenium/pom.xml index be863eeba..8211e4322 100644 --- a/osgi-dependencies/selenium/pom.xml +++ b/osgi-dependencies/selenium/pom.xml @@ -24,7 +24,7 @@ osgi-dependencies com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT selenium diff --git a/osgi-dependencies/w3chtml5validator/pom.xml b/osgi-dependencies/w3chtml5validator/pom.xml index 3beb98084..b32a4eef8 100644 --- a/osgi-dependencies/w3chtml5validator/pom.xml +++ b/osgi-dependencies/w3chtml5validator/pom.xml @@ -24,7 +24,7 @@ osgi-dependencies com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT w3chtml5validator diff --git a/pom.xml b/pom.xml index ed9ac74c4..cae830f74 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.cognifide.aet aet-root - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT pom AET diff --git a/report/pom.xml b/report/pom.xml index a84b5315e..74d9baf52 100644 --- a/report/pom.xml +++ b/report/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT report diff --git a/rest-endpoint/pom.xml b/rest-endpoint/pom.xml index 70133dda8..f65c8f4de 100644 --- a/rest-endpoint/pom.xml +++ b/rest-endpoint/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT rest-endpoint diff --git a/test-executor/pom.xml b/test-executor/pom.xml index 4bd337399..4810f0c1a 100644 --- a/test-executor/pom.xml +++ b/test-executor/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT test-executor diff --git a/test-executor/src/main/java/com/cognifide/aet/executor/SuiteExecutor.java b/test-executor/src/main/java/com/cognifide/aet/executor/SuiteExecutor.java index 79acb2597..a547613a6 100644 --- a/test-executor/src/main/java/com/cognifide/aet/executor/SuiteExecutor.java +++ b/test-executor/src/main/java/com/cognifide/aet/executor/SuiteExecutor.java @@ -21,6 +21,7 @@ import com.cognifide.aet.communication.api.metadata.Suite; import com.cognifide.aet.communication.api.metadata.ValidatorException; import com.cognifide.aet.communication.api.queues.JmsConnection; +import com.cognifide.aet.communication.api.queues.QueuesConstant; import com.cognifide.aet.executor.configuration.SuiteExecutorConf; import com.cognifide.aet.executor.http.HttpSuiteExecutionResultWrapper; import com.cognifide.aet.executor.model.TestRun; @@ -64,7 +65,7 @@ public class SuiteExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(SuiteExecutor.class); - private static final String RUNNER_IN_QUEUE = "AET.runner-in"; + private static final String RUNNER_IN_QUEUE = QueuesConstant.NAMESPACE + "runner-in"; private static final String HTML_REPORT_URL_FORMAT = "%s/report.html?company=%s&project=%s&correlationId=%s"; diff --git a/zip/pom.xml b/zip/pom.xml index c61c6f510..a495868bb 100644 --- a/zip/pom.xml +++ b/zip/pom.xml @@ -24,7 +24,7 @@ aet-root com.cognifide.aet - 3.1.1-SNAPSHOT + 3.2.0-SNAPSHOT zip From 9c232be48930abe2b06417aa743792170c0d2e20 Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Fri, 14 Dec 2018 10:09:25 +0100 Subject: [PATCH 08/11] Update AdvancedSetup.md --- documentation/src/main/wiki/AdvancedSetup.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/documentation/src/main/wiki/AdvancedSetup.md b/documentation/src/main/wiki/AdvancedSetup.md index 6464019cf..e72f765c1 100644 --- a/documentation/src/main/wiki/AdvancedSetup.md +++ b/documentation/src/main/wiki/AdvancedSetup.md @@ -68,7 +68,7 @@ To enable proper working of AET instance, you should configure at least 1 collec | Property name | Default value | Comment | | ------------- | ----- | ----- | -| Number of collector instances | `6` | Might be overwritten by env variable `COLLECTORS_NO` | +| Number of collector instances | `5` | Might be overwritten by env variable `COLLECTORS_NO` | | Collectors prefetch size | `1` | Read more [here](http://activemq.apache.org/what-is-the-prefetch-limit-for.html) | | Number of comparator instances | `5` | Might be overwritten by env variable `COMPARATORS_NO` | | Comparators prefetch size | `1` | Read more [here](http://activemq.apache.org/what-is-the-prefetch-limit-for.html) | @@ -79,4 +79,4 @@ To enable proper working of AET instance, you should configure at least 1 collec ##### Chrome options configuration The `AET Chrome WebDriver Factory` component configuration (`chromeOptions` property) allows you to configure a list of options/arguments -which will be passed to the Chrome browser binary. The default list of Chrome options is: `--disable-plugins`, `--headless`, `--hide-scrollbars`, `--disable-gpu`. \ No newline at end of file +which will be passed to the Chrome browser binary. The default list of Chrome options is: `--disable-plugins`, `--headless`, `--hide-scrollbars`, `--disable-gpu`. From dfaeda5d8439a04421fab5c787906ea9459547bd Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Fri, 14 Dec 2018 10:10:50 +0100 Subject: [PATCH 09/11] Update UpgradeNotes.md --- documentation/src/main/wiki/UpgradeNotes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/documentation/src/main/wiki/UpgradeNotes.md b/documentation/src/main/wiki/UpgradeNotes.md index 7ad5886be..e98483a22 100644 --- a/documentation/src/main/wiki/UpgradeNotes.md +++ b/documentation/src/main/wiki/UpgradeNotes.md @@ -16,6 +16,7 @@ You may see all changes in the [Changelog](https://github.com/Cognifide/aet/blob Remove all `CollectorMessageListenerImpl` and `ComparatorMessageListenerImpl` config files. Create `com.cognifide.aet.worker.listeners.WorkersListenersService.cfg` and configure proper number of collectors and comparators there. +You can find example config file [here](https://github.com/Cognifide/aet/blob/master/osgi-dependencies/configs/src/main/resources/com.cognifide.aet.worker.listeners.WorkersListenersService.cfg). ## Version 3.0.0 @@ -84,4 +85,4 @@ allowAutoCreate=true server=192.168.123.100 port=8080 ``` - \ No newline at end of file + From 6d9ba47831cb459f4013b4215ca6d239ff77b4a7 Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Tue, 18 Dec 2018 09:33:13 +0100 Subject: [PATCH 10/11] fixed invalid aet queue runner in --- .../java/com/cognifide/aet/runner/RunnerMessageListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java b/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java index 43c60372c..9f3dd840c 100644 --- a/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java +++ b/core/runner/src/main/java/com/cognifide/aet/runner/RunnerMessageListener.java @@ -46,7 +46,7 @@ public class RunnerMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(RunnerMessageListener.class); - private static final String API_QUEUE_IN = QueuesConstant.NAMESPACE + ".runner-in"; + private static final String API_QUEUE_IN = QueuesConstant.NAMESPACE + "runner-in"; private static final String MAINTENANCE_QUEUE_IN = QueuesConstant.NAMESPACE + "maintenance-in"; From 1a0ce0d517484826104563a4dc304b2dc203eaab Mon Sep 17 00:00:00 2001 From: Maciej Laskowski Date: Tue, 18 Dec 2018 14:15:01 +0100 Subject: [PATCH 11/11] unify label and service name --- .../aet/worker/listeners/WorkersListenersServiceConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java index e5334fa4d..f2264340c 100644 --- a/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java +++ b/core/worker/src/main/java/com/cognifide/aet/worker/listeners/WorkersListenersServiceConfig.java @@ -19,7 +19,7 @@ import org.osgi.service.metatype.annotations.AttributeType; import org.osgi.service.metatype.annotations.ObjectClassDefinition; -@ObjectClassDefinition(name = "AET Workers Listener Factory") +@ObjectClassDefinition(name = "AET Workers Listener Service") public @interface WorkersListenersServiceConfig { String COLLECTORS_NO_ENV = "COLLECTORS_NO";