From 82b7ef00ab56250dde7782699761db6b7f3eccd5 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Tue, 5 Dec 2023 23:08:15 +0530 Subject: [PATCH] Use event bus for gradle (#214) Co-authored-by: Sonu Kumar --- build.gradle | 2 +- .../rqueue/config/RqueueEventBusConfig.java | 40 ++++ .../config/RqueueListenerBaseConfig.java | 54 ++++- .../sonus21/rqueue/core/MessageScheduler.java | 42 ++-- .../core/ProcessingQueueMessageScheduler.java | 17 +- .../rqueue/core/RqueueBeanProvider.java | 6 +- .../core/ScheduledQueueMessageScheduler.java | 15 +- .../core/eventbus/EventBusErrorHandler.java | 32 +++ .../rqueue/core/eventbus/RqueueEventBus.java | 46 +++++ .../listener/PostProcessingHandler.java | 12 +- .../RqueueMessageListenerContainer.java | 14 +- .../sonus21/rqueue/metrics/RqueueMetrics.java | 20 +- .../RqueueJobMetricsAggregatorService.java | 33 ++-- .../impl/RqueueSystemManagerServiceImpl.java | 15 +- .../core/MessageSchedulerDisabledTest.java | 27 ++- .../rqueue/core/MessageSchedulerTest.java | 11 +- .../rqueue/core/MessageSchedulingTest.java | 20 +- .../ProcessingQueueMessageSchedulerTest.java | 46 +++-- .../core/RedisAndNormalSchedulingTest.java | 40 +++- .../core/RedisScheduleTriggerHandlerTest.java | 19 +- .../ScheduledQueueMessageSchedulerTest.java | 48 ++--- .../listener/ConcurrentListenerTest.java | 8 +- .../listener/PriorityGroupListenerTest.java | 19 +- .../rqueue/listener/RqueueExecutorTest.java | 7 +- .../RqueueMessageListenerContainerTest.java | 184 ++++++++++-------- .../rqueue/listener/RqueueMiddlewareTest.java | 8 +- .../rqueue/metrics/RqueueMetricsTest.java | 14 +- .../RqueueSystemManagerServiceTest.java | 10 +- ...queueTaskMetricsAggregatorServiceTest.java | 8 +- .../RqueueSystemManagerServiceImplTest.java | 10 +- .../spring/boot/RqueueMetricsAutoConfig.java | 9 +- .../unit/RqueueMetricsAutoConfigTest.java | 22 ++- .../test/PauseUnpauseEventListener.java | 16 +- .../test/service/QueueRegistryUpdater.java | 21 +- .../test/service/RqueueEventListener.java | 14 +- .../rqueue/spring/RqueueListenerConfig.java | 7 +- 36 files changed, 641 insertions(+), 275 deletions(-) create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueEventBusConfig.java create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/EventBusErrorHandler.java create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/RqueueEventBus.java diff --git a/build.gradle b/build.gradle index 76900cad..cfdc37cf 100644 --- a/build.gradle +++ b/build.gradle @@ -74,7 +74,7 @@ ext { subprojects { group = 'com.github.sonus21' - version = '2.13.1-RELEASE' + version = '2.13.2-SNAPSHOT' dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueEventBusConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueEventBusConfig.java new file mode 100644 index 00000000..b03e14e0 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueEventBusConfig.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.config; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Getter +@Setter +public class RqueueEventBusConfig { + + @Value("${rqueue.event.bus.core.pool.size:2}") + private int corePoolSize; + + @Value("${rqueue.event.bus.max.pool.size:10}") + private int maxPoolSize; + + @Value("${rqueue.event.bus.queue.capacity:100}") + private int queueCapacity; + + @Value("${rqueue.event.bus.keep.alive.time:60000}") + private int keepAliveTime; +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java index a6681c7f..fb9adb51 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory; import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler; +import com.github.sonus21.rqueue.core.eventbus.EventBusErrorHandler; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; import com.github.sonus21.rqueue.dao.RqueueStringDao; import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl; @@ -37,6 +39,8 @@ import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled; import com.github.sonus21.rqueue.utils.pebble.ResourceLoader; import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension; +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; import com.mitchellbosecke.pebble.PebbleEngine; import com.mitchellbosecke.pebble.spring.extension.SpringExtension; import com.mitchellbosecke.pebble.spring.reactive.PebbleReactiveViewResolver; @@ -45,11 +49,13 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs @@ -105,8 +111,8 @@ protected MessageConverterProvider getMessageConverterProvider() { * Database for different ops. * * @param beanFactory configurable bean factory - * @param versionKey Rqueue db version key - * @param dbVersion database version + * @param versionKey Rqueue db version key + * @param dbVersion database version * @return {@link RedisConnectionFactory} object. */ @Bean @@ -150,6 +156,11 @@ public RqueueWebConfig rqueueWebConfig() { return new RqueueWebConfig(); } + @Bean + public RqueueEventBusConfig rqueueEventBusConfig() { + return new RqueueEventBusConfig(); + } + @Bean public RqueueSchedulerConfig rqueueSchedulerConfig() { return new RqueueSchedulerConfig(); @@ -190,8 +201,14 @@ public RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory() * @return {@link ScheduledQueueMessageScheduler} object */ @Bean - public ScheduledQueueMessageScheduler scheduledMessageScheduler() { - return new ScheduledQueueMessageScheduler(); + public ScheduledQueueMessageScheduler scheduledMessageScheduler( + RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, + RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + @Qualifier("rqueueRedisLongTemplate") + RedisTemplate redisTemplate) { + return new ScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate); } /** @@ -201,8 +218,14 @@ public ScheduledQueueMessageScheduler scheduledMessageScheduler() { * @return {@link ProcessingQueueMessageScheduler} object */ @Bean - public ProcessingQueueMessageScheduler processingMessageScheduler() { - return new ProcessingQueueMessageScheduler(); + public ProcessingQueueMessageScheduler processingMessageScheduler( + RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, + RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + @Qualifier("rqueueRedisLongTemplate") + RedisTemplate redisTemplate) { + return new ProcessingQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate); } @Bean @@ -265,7 +288,7 @@ public RqueueInternalPubSubChannel rqueueInternalPubSubChannel( RqueueConfig rqueueConfig, RqueueBeanProvider rqueueBeanProvider, @Qualifier("stringRqueueRedisTemplate") - RqueueRedisTemplate stringRqueueRedisTemplate) { + RqueueRedisTemplate stringRqueueRedisTemplate) { return new RqueueInternalPubSubChannel( rqueueRedisListenerContainerFactory, rqueueMessageListenerContainer, @@ -273,4 +296,19 @@ public RqueueInternalPubSubChannel rqueueInternalPubSubChannel( stringRqueueRedisTemplate, rqueueBeanProvider); } + + @Bean + public RqueueEventBus rqueueEventBus(ApplicationEventPublisher applicationEventPublisher, + RqueueEventBusConfig rqueueEventBusConfig) { + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); + threadPoolTaskExecutor.setCorePoolSize(rqueueEventBusConfig.getCorePoolSize()); + threadPoolTaskExecutor.setMaxPoolSize(rqueueEventBusConfig.getMaxPoolSize()); + threadPoolTaskExecutor.setKeepAliveSeconds(rqueueEventBusConfig.getKeepAliveTime()); + threadPoolTaskExecutor.setQueueCapacity(rqueueEventBusConfig.getQueueCapacity()); + threadPoolTaskExecutor.setThreadNamePrefix("RqueueEventBusAsyncExecutor-"); + threadPoolTaskExecutor.initialize(); + EventBus eventBus = new AsyncEventBus(threadPoolTaskExecutor); + eventBus.register(new EventBusErrorHandler()); + return new RqueueEventBus(eventBus, applicationEventPublisher); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java index 2998b6ad..fee1d463 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 Sonu Kumar + * Copyright (c) 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * You may not use this file except in compliance with the License. @@ -21,10 +21,13 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.ThreadUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.eventbus.Subscribe; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -33,29 +36,23 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; -import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.ApplicationListener; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultScriptExecutor; import org.springframework.data.redis.core.script.RedisScript; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -public abstract class MessageScheduler implements DisposableBean, - ApplicationListener { +public abstract class MessageScheduler implements DisposableBean { private final Object monitor = new Object(); - @Autowired - protected RqueueSchedulerConfig rqueueSchedulerConfig; - @Autowired - protected RqueueConfig rqueueConfig; + protected final RqueueSchedulerConfig rqueueSchedulerConfig; + protected final RqueueConfig rqueueConfig; + private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; + private final RedisTemplate redisTemplate; private RedisScript redisScript; private DefaultScriptExecutor defaultScriptExecutor; private Map queueRunningState; @@ -66,14 +63,21 @@ public abstract class MessageScheduler implements DisposableBean, protected RedisScheduleTriggerHandler redisScheduleTriggerHandler; private ThreadPoolTaskScheduler scheduler; - @Autowired - private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; - @Autowired - @Qualifier("rqueueRedisLongTemplate") - private RedisTemplate redisTemplate; private Map errorCount; + protected MessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, + RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + RedisTemplate redisTemplate) { + this.rqueueSchedulerConfig = rqueueSchedulerConfig; + this.rqueueConfig = rqueueConfig; + this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory; + this.redisTemplate = redisTemplate; + eventBus.register(this); + } + protected abstract Logger getLogger(); protected abstract long getNextScheduleTime(String queueName, long currentTime, Long value); @@ -224,9 +228,9 @@ private void initQueue(String queueName) { queueRunningState.put(queueName, false); } - @Override - @Async + @Subscribe public void onApplicationEvent(RqueueBootstrapEvent event) { + getLogger().info("{} Even received", event); synchronized (monitor) { doStop(); if (!rqueueSchedulerConfig.isEnabled()) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageScheduler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageScheduler.java index 0fc6082f..086af754 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageScheduler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,20 +18,31 @@ import static java.lang.Long.max; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import com.github.sonus21.rqueue.utils.Constants; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; +import org.springframework.data.redis.core.RedisTemplate; @Slf4j public class ProcessingQueueMessageScheduler extends MessageScheduler { private Map queueNameToDelay; + public ProcessingQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + RedisTemplate redisTemplate) { + super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, + redisTemplate); + } + + @Override protected void initialize() { super.initialize(); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java index e0ec8189..fc02f5bd 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import com.github.sonus21.rqueue.common.RqueueLockManager; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueWebConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.core.support.MessageProcessor; import com.github.sonus21.rqueue.dao.RqueueJobDao; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; @@ -28,7 +29,6 @@ import lombok.Getter; import lombok.Setter; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationEventPublisher; @Getter @Setter @@ -38,7 +38,7 @@ public class RqueueBeanProvider { @Autowired private RqueueSystemConfigDao rqueueSystemConfigDao; @Autowired private RqueueJobDao rqueueJobDao; @Autowired private RqueueWebConfig rqueueWebConfig; - @Autowired private ApplicationEventPublisher applicationEventPublisher; + @Autowired private RqueueEventBus rqueueEventBus; @Autowired private RqueueLockManager rqueueLockManager; @Autowired(required = false) diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageScheduler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageScheduler.java index 04eaeece..02815f04 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageScheduler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,25 @@ package com.github.sonus21.rqueue.core; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; +import org.springframework.data.redis.core.RedisTemplate; @Slf4j public class ScheduledQueueMessageScheduler extends MessageScheduler { + + public ScheduledQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + RedisTemplate redisTemplate) { + super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, + redisTemplate); + } + @Override protected Logger getLogger() { return log; diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/EventBusErrorHandler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/EventBusErrorHandler.java new file mode 100644 index 00000000..ebed1dbb --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/EventBusErrorHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2023 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.core.eventbus; + +import com.google.common.eventbus.SubscriberExceptionContext; +import com.google.common.eventbus.SubscriberExceptionHandler; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EventBusErrorHandler implements SubscriberExceptionHandler { + + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + if (exception instanceof RuntimeException) { + log.error("Error while running event {}", context.getEvent(), exception); + } + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/RqueueEventBus.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/RqueueEventBus.java new file mode 100644 index 00000000..b200ba57 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/eventbus/RqueueEventBus.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.core.eventbus; + +import com.google.common.eventbus.EventBus; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; + +public class RqueueEventBus { + + private final EventBus eventBus; + private final ApplicationEventPublisher applicationEventPublisher; + + public RqueueEventBus(EventBus eventBus, ApplicationEventPublisher applicationEventPublisher) { + this.eventBus = eventBus; + this.applicationEventPublisher = applicationEventPublisher; + } + + public void publish(ApplicationEvent event) { + applicationEventPublisher.publishEvent(event); + eventBus.post(event); + } + + public void register(Object listener) { + eventBus.register(listener); + } + + public void unregister(Object listener) { + eventBus.unregister(listener); + } + +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java index ac8c49b2..8640ffbe 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import com.github.sonus21.rqueue.config.RqueueWebConfig; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.exception.UnknownSwitchCase; import com.github.sonus21.rqueue.models.db.QueueConfig; @@ -32,13 +33,12 @@ import java.io.Serializable; import lombok.extern.slf4j.Slf4j; import org.slf4j.event.Level; -import org.springframework.context.ApplicationEventPublisher; @Slf4j @SuppressWarnings("java:S107") class PostProcessingHandler extends PrefixLogger { - private final ApplicationEventPublisher applicationEventPublisher; + private final RqueueEventBus eventBus; private final RqueueWebConfig rqueueWebConfig; private final RqueueMessageTemplate rqueueMessageTemplate; private final TaskExecutionBackOff taskExecutionBackoff; @@ -47,13 +47,13 @@ class PostProcessingHandler extends PrefixLogger { PostProcessingHandler( RqueueWebConfig rqueueWebConfig, - ApplicationEventPublisher applicationEventPublisher, + RqueueEventBus eventBus, RqueueMessageTemplate rqueueMessageTemplate, TaskExecutionBackOff taskExecutionBackoff, MessageProcessorHandler messageProcessorHandler, RqueueSystemConfigDao rqueueSystemConfigDao) { super(log, null); - this.applicationEventPublisher = applicationEventPublisher; + this.eventBus = eventBus; this.rqueueWebConfig = rqueueWebConfig; this.rqueueMessageTemplate = rqueueMessageTemplate; this.taskExecutionBackoff = taskExecutionBackoff; @@ -109,7 +109,7 @@ private void publishEvent(JobImpl job, RqueueMessage rqueueMessage, MessageStatu updateMetadata(job, rqueueMessage, messageStatus); if (rqueueWebConfig.isCollectListenerStats()) { RqueueExecutionEvent event = new RqueueExecutionEvent(job); - applicationEventPublisher.publishEvent(event); + eventBus.publish(event); } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index a2d9d847..79404d1f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -284,7 +284,7 @@ private void initialize() { this.postProcessingHandler = new PostProcessingHandler( rqueueBeanProvider.getRqueueWebConfig(), - rqueueBeanProvider.getApplicationEventPublisher(), + rqueueBeanProvider.getRqueueEventBus(), rqueueMessageTemplate, taskExecutionBackOff, new MessageProcessorHandler( @@ -443,8 +443,8 @@ public void start() { running = true; doStart(); rqueueBeanProvider - .getApplicationEventPublisher() - .publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true)); + .getRqueueEventBus() + .publish(new RqueueBootstrapEvent(EVENT_SOURCE, true)); lifecycleMgr.notifyAll(); } } @@ -564,8 +564,8 @@ public void stop() { synchronized (lifecycleMgr) { running = false; rqueueBeanProvider - .getApplicationEventPublisher() - .publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, false)); + .getRqueueEventBus() + .publish(new RqueueBootstrapEvent(EVENT_SOURCE, false)); doStop(); lifecycleMgr.notifyAll(); } @@ -737,7 +737,7 @@ void pauseUnpauseQueue(String queue, boolean pause) { unpause(queue); } RqueueQueuePauseEvent event = new RqueueQueuePauseEvent(EVENT_SOURCE, queue, pause); - rqueueBeanProvider.getApplicationEventPublisher().publishEvent(event); + rqueueBeanProvider.getRqueueEventBus().publish(event); } private void unpause(String queue) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java index 18d60f59..693934c6 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,21 +18,26 @@ import com.github.sonus21.rqueue.config.MetricsProperties; import com.github.sonus21.rqueue.core.EndpointRegistry; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueStringDao; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; +import com.google.common.eventbus.Subscribe; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Gauge.Builder; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; + + /** * RqueueMetrics register metrics related to queue. A queue can have 4 types of metrics like * queue.size, processing.queue.size and scheduled.queue.size. Some messages can be in dead letter * queue if dead letter queue is configured. */ +@Slf4j public class RqueueMetrics implements RqueueMetricsRegistry { static final String QUEUE_KEY = "key"; @@ -41,12 +46,15 @@ public class RqueueMetrics implements RqueueMetricsRegistry { private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size"; private static final String DEAD_LETTER_QUEUE_SIZE = "dead.letter.queue.size"; private final QueueCounter queueCounter; - @Autowired private MetricsProperties metricsProperties; + private final RqueueEventBus eventBus; + @Autowired private MetricsProperties metricsProperties; @Autowired private MeterRegistry meterRegistry; @Autowired private RqueueStringDao rqueueStringDao; - public RqueueMetrics(QueueCounter queueCounter) { + public RqueueMetrics(QueueCounter queueCounter, RqueueEventBus eventBus) { this.queueCounter = queueCounter; + this.eventBus = eventBus; + this.eventBus.register(this); } private long size(String name, boolean isZset) { @@ -98,9 +106,9 @@ private void monitor() { } } - @Override - @Async + @Subscribe public void onApplicationEvent(RqueueBootstrapEvent event) { + log.info("{} Event received", event); if (event.isStartup()) { monitor(); } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/RqueueJobMetricsAggregatorService.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/RqueueJobMetricsAggregatorService.java index 702db8ee..5f1e9e8e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/RqueueJobMetricsAggregatorService.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/RqueueJobMetricsAggregatorService.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueWebConfig; import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueQStatsDao; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.aggregator.QueueEvents; @@ -32,24 +33,32 @@ import com.github.sonus21.rqueue.utils.DateTimeUtils; import com.github.sonus21.rqueue.utils.ThreadUtils; import com.github.sonus21.rqueue.utils.TimeoutUtils; +import com.google.common.eventbus.Subscribe; +import java.time.Duration; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; import org.springframework.context.SmartLifecycle; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.time.Duration; -import java.time.LocalDate; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.*; @Component @Slf4j -public class RqueueJobMetricsAggregatorService - implements ApplicationListener, DisposableBean, SmartLifecycle { +public class RqueueJobMetricsAggregatorService implements DisposableBean, SmartLifecycle { private final RqueueConfig rqueueConfig; private final RqueueWebConfig rqueueWebConfig; @@ -68,11 +77,13 @@ public RqueueJobMetricsAggregatorService( RqueueConfig rqueueConfig, RqueueWebConfig rqueueWebConfig, RqueueLockManager rqueueLockManager, - RqueueQStatsDao rqueueQStatsDao) { + RqueueQStatsDao rqueueQStatsDao, + RqueueEventBus eventBus) { this.rqueueConfig = rqueueConfig; this.rqueueWebConfig = rqueueWebConfig; this.rqueueLockManager = rqueueLockManager; this.rqueueQStatsDao = rqueueQStatsDao; + eventBus.register(this); } @Override @@ -153,7 +164,7 @@ public boolean isRunning() { } } - @Override + @Subscribe public void onApplicationEvent(RqueueExecutionEvent event) { synchronized (aggregatorLock) { if (log.isTraceEnabled()) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java index 1e7e3d8f..8d9aabc7 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.core.EndpointRegistry; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueStringDao; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.listener.QueueDetail; @@ -40,9 +41,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.google.common.eventbus.Subscribe; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import reactor.core.publisher.Mono; @@ -55,18 +56,22 @@ public class RqueueSystemManagerServiceImpl implements RqueueSystemManagerServic private final RqueueStringDao rqueueStringDao; private final RqueueSystemConfigDao rqueueSystemConfigDao; private final RqueueMessageMetadataService rqueueMessageMetadataService; + private ScheduledExecutorService executorService; + @Autowired public RqueueSystemManagerServiceImpl( RqueueConfig rqueueConfig, RqueueStringDao rqueueStringDao, RqueueSystemConfigDao rqueueSystemConfigDao, - RqueueMessageMetadataService rqueueMessageMetadataService) { + RqueueMessageMetadataService rqueueMessageMetadataService, + RqueueEventBus eventBus) { this.rqueueConfig = rqueueConfig; this.rqueueStringDao = rqueueStringDao; this.rqueueSystemConfigDao = rqueueSystemConfigDao; this.rqueueMessageMetadataService = rqueueMessageMetadataService; + eventBus.register(this); } private List queueKeys(QueueConfig queueConfig) { @@ -172,9 +177,9 @@ private void createOrUpdateConfigs(List queueDetails) { } } - @Override - @Async + @Subscribe public void onApplicationEvent(RqueueBootstrapEvent event) { + log.info("{} event received", event); if (event.isStartup()) { List queueDetails = EndpointRegistry.getActiveQueueDetails(); if (queueDetails.isEmpty()) { diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerDisabledTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerDisabledTest.java index e01c05d4..b7d877fd 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerDisabledTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerDisabledTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import com.github.sonus21.rqueue.CoreUnitTest; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.utils.TestUtils; @@ -32,7 +33,6 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -42,19 +42,28 @@ @CoreUnitTest class MessageSchedulerDisabledTest extends TestBase { - @InjectMocks - private final ScheduledQueueMessageScheduler messageScheduler = - new ScheduledQueueMessageScheduler(); - private final String slowQueue = "slow-queue"; private final QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); - @Mock private RqueueSchedulerConfig rqueueSchedulerConfig; - @Mock private RqueueConfig rqueueConfig; - @Mock private RedisTemplate redisTemplate; + @Mock + private RqueueSchedulerConfig rqueueSchedulerConfig; + @Mock + private RqueueConfig rqueueConfig; + @Mock + private RedisTemplate redisTemplate; + + @Mock + private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; + + @Mock + private RqueueEventBus rqueueEventBus; + + private ScheduledQueueMessageScheduler messageScheduler; @BeforeEach public void init() { MockitoAnnotations.openMocks(this); + messageScheduler = new ScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, + rqueueEventBus, rqueueRedisListenerContainerFactory, redisTemplate); EndpointRegistry.delete(); EndpointRegistry.register(slowQueueDetail); } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java index bafefd9a..8111cb7a 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 Sonu Kumar + * Copyright (c) 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * You may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; import com.github.sonus21.rqueue.core.ScheduledQueueMessageSchedulerTest.TestScheduledQueueMessageScheduler; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.utils.TestUtils; @@ -33,11 +34,9 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; @CoreUnitTest class MessageSchedulerTest extends TestBase { @@ -52,15 +51,17 @@ class MessageSchedulerTest extends TestBase { @Mock private RqueueConfig rqueueConfig; @Mock - private RedisMessageListenerContainer rqueueRedisMessageListenerContainer; + private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; @Mock private RedisTemplate redisTemplate; - @InjectMocks + @Mock + private RqueueEventBus eventBus; private TestScheduledQueueMessageScheduler messageScheduler; @BeforeEach public void init() { MockitoAnnotations.openMocks(this); + messageScheduler = new TestScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate); queueNameToQueueDetail.put(slowQueue, slowQueueDetail); queueNameToQueueDetail.put(fastQueue, fastQueueDetail); } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulingTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulingTest.java index e5bf012c..6179a6cb 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulingTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 Sonu Kumar + * Copyright (c) 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * You may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import static com.github.sonus21.rqueue.utils.TimeoutUtils.sleep; import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -30,18 +29,15 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; import com.github.sonus21.rqueue.core.ProcessingQueueMessageSchedulerTest.ProcessingQTestMessageScheduler; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; -import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.TestUtils; import com.github.sonus21.rqueue.utils.ThreadUtils; -import com.github.sonus21.rqueue.utils.TimeoutUtils; import com.github.sonus21.test.TestTaskScheduler; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -50,8 +46,6 @@ import org.springframework.data.redis.RedisConnectionFailureException; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.TooManyClusterRedirectionsException; -import org.springframework.data.redis.connection.DefaultMessage; -import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; @@ -59,22 +53,28 @@ @SuppressWarnings("unchecked") class MessageSchedulingTest extends TestBase { - @InjectMocks - private final ProcessingQTestMessageScheduler messageScheduler = new ProcessingQTestMessageScheduler(); private final String queue = "queue"; private final QueueDetail queueDetail = TestUtils.createQueueDetail(queue); @Mock private RqueueSchedulerConfig rqueueSchedulerConfig; @Mock private RqueueConfig rqueueConfig; + + @Mock + private RqueueEventBus rqueueEventBus; + @Mock private RedisTemplate redisTemplate; @Mock private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; + private ProcessingQTestMessageScheduler messageScheduler; + @BeforeEach public void init() { MockitoAnnotations.openMocks(this); + messageScheduler = new ProcessingQTestMessageScheduler(rqueueSchedulerConfig, rqueueConfig, + rqueueEventBus, rqueueRedisListenerContainerFactory, redisTemplate); EndpointRegistry.delete(); EndpointRegistry.register(queueDetail); doReturn(1).when(rqueueSchedulerConfig).getProcessingMessageThreadPoolSize(); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageSchedulerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageSchedulerTest.java index 7cfd8dac..c32dd139 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageSchedulerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,25 +16,26 @@ package com.github.sonus21.rqueue.core; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.doReturn; - import com.github.sonus21.TestBase; import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.utils.TestUtils; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doReturn; @CoreUnitTest class ProcessingQueueMessageSchedulerTest extends TestBase { @@ -43,14 +44,23 @@ class ProcessingQueueMessageSchedulerTest extends TestBase { private final String fastQueue = "fast-queue"; private final QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); private final QueueDetail fastQueueDetail = TestUtils.createQueueDetail(fastQueue); - @Mock private RedisTemplate redisTemplate; - @Mock private RqueueSchedulerConfig rqueueSchedulerConfig; - @Mock private RedisMessageListenerContainer redisMessageListenerContainer; - @InjectMocks private ProcessingQueueMessageScheduler messageScheduler; + @Mock + private RedisTemplate redisTemplate; + @Mock + private RqueueSchedulerConfig rqueueSchedulerConfig; + @Mock + private RqueueEventBus eventBus; + @Mock + private RqueueConfig rqueueConfig; + @Mock + private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; + private ProcessingQueueMessageScheduler messageScheduler; @BeforeEach public void init() { MockitoAnnotations.openMocks(this); + messageScheduler = new ProcessingQTestMessageScheduler(rqueueSchedulerConfig, rqueueConfig, + eventBus, rqueueRedisListenerContainerFactory, redisTemplate); EndpointRegistry.delete(); EndpointRegistry.register(slowQueueDetail); EndpointRegistry.register(fastQueueDetail); @@ -93,7 +103,13 @@ static class ProcessingQTestMessageScheduler extends ProcessingQueueMessageSched private final AtomicInteger schedulesCalls; private final AtomicInteger addTaskCalls; - ProcessingQTestMessageScheduler() { + ProcessingQTestMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, + RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + RedisTemplate redisTemplate) { + super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, + redisTemplate); this.schedulesCalls = new AtomicInteger(0); this.addTaskCalls = new AtomicInteger(0); } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisAndNormalSchedulingTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisAndNormalSchedulingTest.java index 684e9ed9..10ba1ec3 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisAndNormalSchedulingTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisAndNormalSchedulingTest.java @@ -1,10 +1,34 @@ +/* + * Copyright 2023 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + package com.github.sonus21.rqueue.core; +import static com.github.sonus21.rqueue.utils.TimeoutUtils.sleep; +import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; + import com.github.sonus21.TestBase; import com.github.sonus21.rqueue.CoreUnitTest; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; import com.github.sonus21.rqueue.core.ScheduledQueueMessageSchedulerTest.TestScheduledQueueMessageScheduler; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.utils.Constants; @@ -12,6 +36,8 @@ import com.github.sonus21.rqueue.utils.ThreadUtils; import com.github.sonus21.rqueue.utils.TimeoutUtils; import com.github.sonus21.test.TestTaskScheduler; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -23,15 +49,6 @@ import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.github.sonus21.rqueue.utils.TimeoutUtils.sleep; -import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; @CoreUnitTest @Slf4j @@ -49,12 +66,15 @@ class RedisAndNormalSchedulingTest extends TestBase { private RedisTemplate redisTemplate; @Mock private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; - @InjectMocks + @Mock + private RqueueEventBus rqueueEventBus; private TestScheduledQueueMessageScheduler messageScheduler; @BeforeEach public void init() { MockitoAnnotations.openMocks(this); + messageScheduler = new TestScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, + rqueueEventBus, rqueueRedisListenerContainerFactory, redisTemplate); EndpointRegistry.delete(); EndpointRegistry.register(fastQueueDetail); EndpointRegistry.register(slowQueueDetail); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisScheduleTriggerHandlerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisScheduleTriggerHandlerTest.java index 4c4eaa4a..5c6f61b2 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisScheduleTriggerHandlerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/RedisScheduleTriggerHandlerTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + package com.github.sonus21.rqueue.core; import com.github.sonus21.TestBase; @@ -14,6 +30,7 @@ import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.connection.MessageListener; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -74,7 +91,7 @@ public void init() { scheduler = new Scheduler(); redisScheduleTriggerHandler = new RedisScheduleTriggerHandler(log, rqueueRedisListenerContainerFactory, rqueueSchedulerConfig, - List.of(slowQueue), scheduler, + Collections.singletonList(slowQueue), scheduler, (e) -> { return slowQueueDetail.getScheduledQueueChannelName(); }); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageSchedulerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageSchedulerTest.java index 25e91f1e..99954af7 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageSchedulerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 Sonu Kumar + * Copyright (c) 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * You may not use this file except in compliance with the License. @@ -16,39 +16,20 @@ package com.github.sonus21.rqueue.core; -import static com.github.sonus21.rqueue.utils.TimeoutUtils.sleep; -import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; - import com.github.sonus21.TestBase; import com.github.sonus21.rqueue.CoreUnitTest; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; -import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.TestUtils; import com.github.sonus21.rqueue.utils.ThreadUtils; import com.github.sonus21.rqueue.utils.TimeoutUtils; import com.github.sonus21.test.TestTaskScheduler; -import java.util.Map; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -60,6 +41,18 @@ import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.github.sonus21.rqueue.utils.TimeoutUtils.sleep; +import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; @CoreUnitTest @SuppressWarnings("unchecked") @@ -74,15 +67,18 @@ class ScheduledQueueMessageSchedulerTest extends TestBase { @Mock private RqueueConfig rqueueConfig; @Mock + private RqueueEventBus eventBus; + @Mock private RedisTemplate redisTemplate; @Mock private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; - @InjectMocks private TestScheduledQueueMessageScheduler messageScheduler; @BeforeEach public void init() { MockitoAnnotations.openMocks(this); + messageScheduler = new TestScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, + eventBus, rqueueRedisListenerContainerFactory, redisTemplate); EndpointRegistry.delete(); EndpointRegistry.register(fastQueueDetail); EndpointRegistry.register(slowQueueDetail); @@ -319,7 +315,13 @@ static class TestScheduledQueueMessageScheduler extends ScheduledQueueMessageSch final AtomicInteger scheduleCounter; final AtomicInteger addTaskCounter; - TestScheduledQueueMessageScheduler() { + public TestScheduledQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig, + RqueueConfig rqueueConfig, + RqueueEventBus eventBus, + RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, + RedisTemplate redisTemplate) { + super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, + redisTemplate); this.scheduleCounter = new AtomicInteger(0); this.addTaskCounter = new AtomicInteger(0); } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/ConcurrentListenerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/ConcurrentListenerTest.java index 1fed4dad..d896bf23 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/ConcurrentListenerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/ConcurrentListenerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.models.enums.MessageStatus; @@ -52,7 +53,6 @@ import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.support.StaticApplicationContext; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -67,7 +67,7 @@ class ConcurrentListenerTest extends TestBase { private static final long executionTime = 50L; @Mock private RqueueMessageHandler rqueueMessageHandler; @Mock private RedisConnectionFactory redisConnectionFactory; - @Mock private ApplicationEventPublisher applicationEventPublisher; + @Mock private RqueueEventBus rqueueEventBus; @Mock private RqueueMessageTemplate rqueueMessageTemplate; @Mock private RqueueSystemConfigDao rqueueSystemConfigDao; @Mock private RqueueMessageMetadataService rqueueMessageMetadataService; @@ -83,7 +83,7 @@ public void init() throws IllegalAccessException { beanProvider.setRqueueConfig(rqueueConfig); beanProvider.setRqueueMessageHandler(rqueueMessageHandler); beanProvider.setRqueueSystemConfigDao(rqueueSystemConfigDao); - beanProvider.setApplicationEventPublisher(applicationEventPublisher); + beanProvider.setRqueueEventBus(rqueueEventBus); beanProvider.setRqueueMessageTemplate(rqueueMessageTemplate); beanProvider.setRqueueMessageMetadataService(rqueueMessageMetadataService); beanProvider.setRqueueLockManager(rqueueLockManager); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/PriorityGroupListenerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/PriorityGroupListenerTest.java index 7546bcc0..63262ad9 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/PriorityGroupListenerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/PriorityGroupListenerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainerTest.BootstrapEventListener; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainerTest.TestEventBroadcaster; @@ -43,6 +44,7 @@ import java.util.Collections; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.eventbus.EventBus; import lombok.Getter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -65,11 +67,13 @@ class PriorityGroupListenerTest extends TestBase { private static final long VISIBILITY_TIMEOUT = 900000L; @Mock private RqueueMessageHandler rqueueMessageHandler; @Mock private RedisConnectionFactory redisConnectionFactory; - @Mock private ApplicationEventPublisher applicationEventPublisher; + @Mock private RqueueEventBus rqueueEventBus; @Mock private RqueueMessageTemplate rqueueMessageTemplate; @Mock private RqueueSystemConfigDao rqueueSystemConfigDao; @Mock private RqueueMessageMetadataService rqueueMessageMetadataService; + @Mock private ApplicationEventPublisher applicationEventPublisher; private RqueueBeanProvider beanProvider; + private EventBus eventBus; @BeforeEach public void init() throws IllegalAccessException { @@ -79,9 +83,10 @@ public void init() throws IllegalAccessException { beanProvider.setRqueueConfig(rqueueConfig); beanProvider.setRqueueMessageHandler(rqueueMessageHandler); beanProvider.setRqueueSystemConfigDao(rqueueSystemConfigDao); - beanProvider.setApplicationEventPublisher(applicationEventPublisher); + beanProvider.setRqueueEventBus(rqueueEventBus); beanProvider.setRqueueMessageTemplate(rqueueMessageTemplate); beanProvider.setRqueueMessageMetadataService(rqueueMessageMetadataService); + eventBus = new EventBus(); } @Test @@ -94,11 +99,13 @@ void priorityGroupListener() throws Exception { "slowMessageListener", SlowMessageListenerWithPriority.class); applicationContext.registerSingleton( "fastMessageListener", FastMessageListenerWithPriority.class); - applicationContext.registerSingleton("applicationEventPublisher", TestEventBroadcaster.class); + applicationContext.registerSingleton("rqueueEventBus", TestEventBroadcaster.class); + RqueueMessageHandler messageHandler = applicationContext.getBean("messageHandler", RqueueMessageHandler.class); + TestEventBroadcaster eventBroadcaster = - applicationContext.getBean("applicationEventPublisher", TestEventBroadcaster.class); + (TestEventBroadcaster) applicationContext.getBean("rqueueEventBus", eventBus, applicationEventPublisher); eventBroadcaster.subscribe(listener); messageHandler.setApplicationContext(applicationContext); @@ -110,7 +117,7 @@ void priorityGroupListener() throws Exception { .when(rqueueSystemConfigDao) .getConfigByNames(any()); - beanProvider.setApplicationEventPublisher(eventBroadcaster); + beanProvider.setRqueueEventBus(eventBroadcaster); beanProvider.setRqueueMessageHandler(messageHandler); RqueueMessageListenerContainer container = new TestListenerContainer(messageHandler); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueExecutorTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueExecutorTest.java index 66d509a3..181d294b 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueExecutorTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueExecutorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.core.support.MessageProcessor; import com.github.sonus21.rqueue.core.support.RqueueMessageUtils; import com.github.sonus21.rqueue.dao.RqueueJobDao; @@ -91,7 +92,7 @@ class RqueueExecutorTest extends TestBase { @Mock private RedisTemplate redisTemplate; @Mock - private ApplicationEventPublisher applicationEventPublisher; + private RqueueEventBus rqueueEventBus; @Mock private RqueueMessageTemplate messageTemplate; @@ -118,7 +119,7 @@ public void init() throws IllegalAccessException { postProcessingHandler = new PostProcessingHandler( rqueueWebConfig, - applicationEventPublisher, + rqueueEventBus, messageTemplate, taskBackOff, messageProcessorHandler, diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java index 46684c50..88696c93 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.models.db.QueueConfig; @@ -46,6 +47,7 @@ import com.github.sonus21.rqueue.utils.TimeoutUtils; import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService; import com.github.sonus21.test.TestTaskExecutor; +import com.google.common.eventbus.EventBus; import io.lettuce.core.RedisCommandExecutionException; import java.util.Collections; import java.util.LinkedList; @@ -79,14 +81,26 @@ class RqueueMessageListenerContainerTest extends TestBase { private static final String fastProcessingQueueChannel = "rqueue-processing-channel::" + fastQueue; private static final long VISIBILITY_TIMEOUT = 900000L; - @Mock private RqueueMessageHandler rqueueMessageHandler; - @Mock private RedisConnectionFactory redisConnectionFactory; - @Mock private ApplicationEventPublisher applicationEventPublisher; - @Mock private RqueueMessageTemplate rqueueMessageTemplate; - @Mock private RqueueSystemConfigDao rqueueSystemConfigDao; - @Mock private RqueueMessageMetadataService rqueueMessageMetadataService; - @Mock private RqueueWebConfig rqueueWebConfig; - @Mock private RqueueLockManager rqueueLockManager; + @Mock + private RqueueMessageHandler rqueueMessageHandler; + @Mock + private RedisConnectionFactory redisConnectionFactory; + @Mock + private RqueueEventBus rqueueEventBus; + @Mock + private RqueueMessageTemplate rqueueMessageTemplate; + @Mock + private RqueueSystemConfigDao rqueueSystemConfigDao; + @Mock + private RqueueMessageMetadataService rqueueMessageMetadataService; + @Mock + private RqueueWebConfig rqueueWebConfig; + @Mock + private RqueueLockManager rqueueLockManager; + + @Mock + private ApplicationEventPublisher applicationEventPublisher; + private EventBus eventBus; private RqueueMessageListenerContainer container; private RqueueBeanProvider beanProvider; @@ -98,12 +112,13 @@ public void init() throws IllegalAccessException { beanProvider.setRqueueConfig(rqueueConfig); beanProvider.setRqueueMessageHandler(rqueueMessageHandler); beanProvider.setRqueueSystemConfigDao(rqueueSystemConfigDao); - beanProvider.setApplicationEventPublisher(applicationEventPublisher); + beanProvider.setRqueueEventBus(rqueueEventBus); beanProvider.setRqueueMessageTemplate(rqueueMessageTemplate); beanProvider.setRqueueMessageMetadataService(rqueueMessageMetadataService); beanProvider.setRqueueWebConfig(rqueueWebConfig); beanProvider.setRqueueLockManager(rqueueLockManager); container = new TestListenerContainer(rqueueMessageHandler); + eventBus = new EventBus(); } @Test @@ -219,29 +234,34 @@ void messagesAreGettingFetchedFromRedis() throws Exception { applicationContext.registerSingleton("messageHandler", RqueueMessageHandler.class); applicationContext.registerSingleton("slowMessageListener", SlowMessageListener.class); applicationContext.registerSingleton("fastMessageListener", FastMessageListener.class); + applicationContext.registerSingleton("rqueueEventBus", TestEventBroadcaster.class); RqueueMessageHandler messageHandler = applicationContext.getBean("messageHandler", RqueueMessageHandler.class); + + TestEventBroadcaster eventBroadcaster = + (TestEventBroadcaster) applicationContext.getBean("rqueueEventBus", eventBus, applicationEventPublisher); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); beanProvider.setRqueueMessageHandler(rqueueMessageHandler); + RqueueMessageListenerContainer container = new TestListenerContainer(messageHandler); AtomicInteger fastQueueCounter = new AtomicInteger(0); AtomicInteger slowQueueCounter = new AtomicInteger(0); doAnswer( - invocation -> { - fastQueueCounter.incrementAndGet(); - return null; - }) + invocation -> { + fastQueueCounter.incrementAndGet(); + return null; + }) .when(rqueueMessageTemplate) .pop(fastQueue, fastProcessingQueue, fastProcessingQueueChannel, VISIBILITY_TIMEOUT, 1); doAnswer( - invocation -> { - slowQueueCounter.incrementAndGet(); - return null; - }) + invocation -> { + slowQueueCounter.incrementAndGet(); + return null; + }) .when(rqueueMessageTemplate) .pop(slowQueue, slowProcessingQueue, slowProcessingChannel, VISIBILITY_TIMEOUT, 1); container.afterPropertiesSet(); @@ -273,15 +293,16 @@ void messageFetcherRetryWorking() throws Exception { messageHandler.afterPropertiesSet(); Map messageMetadataMap = new ConcurrentHashMap<>(); doReturn(true).when(rqueueLockManager).acquireLock(anyString(), anyString(), any()); - doAnswer(i-> messageMetadataMap.get(i.getArgument(0))).when(rqueueMessageMetadataService).get(any()); + doAnswer(i -> messageMetadataMap.get(i.getArgument(0))).when(rqueueMessageMetadataService) + .get(any()); doAnswer( - i -> { - RqueueMessage rqueueMessage = i.getArgument(0); - MessageMetadata messageMetadata = - new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED); - messageMetadataMap.put(messageMetadata.getId(), messageMetadata); - return messageMetadata; - }) + i -> { + RqueueMessage rqueueMessage = i.getArgument(0); + MessageMetadata messageMetadata = + new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED); + messageMetadataMap.put(messageMetadata.getId(), messageMetadata); + return messageMetadata; + }) .when(rqueueMessageMetadataService) .getOrCreateMessageMetadata(any()); @@ -289,15 +310,15 @@ void messageFetcherRetryWorking() throws Exception { RqueueMessageListenerContainer container = new TestListenerContainer(messageHandler); doAnswer( - invocation -> { - if (fastQueueCounter.get() < 2) { - if (fastQueueCounter.incrementAndGet() == 1) { - throw new RedisCommandExecutionException("Some error occurred"); - } - return Collections.singletonList(message); - } - return null; - }) + invocation -> { + if (fastQueueCounter.get() < 2) { + if (fastQueueCounter.incrementAndGet() == 1) { + throw new RedisCommandExecutionException("Some error occurred"); + } + return Collections.singletonList(message); + } + return null; + }) .when(rqueueMessageTemplate) .pop(fastQueue, fastProcessingQueue, fastProcessingQueueChannel, VISIBILITY_TIMEOUT, 1); FastMessageListener fastMessageListener = @@ -335,49 +356,50 @@ void messageHandlersAreInvoked() throws Exception { String slowQueueMessage = "This is slow queue"; Map messageMetadataMap = new ConcurrentHashMap<>(); doAnswer( - i -> { - RqueueMessage rqueueMessage = i.getArgument(0); - MessageMetadata messageMetadata = - new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED); - messageMetadataMap.put(messageMetadata.getId(), messageMetadata); - return messageMetadata; - }) + i -> { + RqueueMessage rqueueMessage = i.getArgument(0); + MessageMetadata messageMetadata = + new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED); + messageMetadataMap.put(messageMetadata.getId(), messageMetadata); + return messageMetadata; + }) .when(rqueueMessageMetadataService) .getOrCreateMessageMetadata(any()); - doAnswer(i->messageMetadataMap.get(i.getArgument(0))).when(rqueueMessageMetadataService).get(any()); + doAnswer(i -> messageMetadataMap.get(i.getArgument(0))).when(rqueueMessageMetadataService) + .get(any()); doAnswer( - invocation -> { - if (slowQueueCounter.get() == 0) { - slowQueueCounter.incrementAndGet(); - return Collections.singletonList( - RqueueMessage.builder() - .queueName(slowQueue) - .message(slowQueueMessage) - .processAt(System.currentTimeMillis()) - .queuedTime(System.nanoTime()) - .id(UUID.randomUUID().toString()) - .build()); - } - return null; - }) + invocation -> { + if (slowQueueCounter.get() == 0) { + slowQueueCounter.incrementAndGet(); + return Collections.singletonList( + RqueueMessage.builder() + .queueName(slowQueue) + .message(slowQueueMessage) + .processAt(System.currentTimeMillis()) + .queuedTime(System.nanoTime()) + .id(UUID.randomUUID().toString()) + .build()); + } + return null; + }) .when(rqueueMessageTemplate) .pop(slowQueue, slowProcessingQueue, slowProcessingChannel, VISIBILITY_TIMEOUT, 1); doAnswer( - invocation -> { - if (fastQueueCounter.get() == 0) { - fastQueueCounter.incrementAndGet(); - return Collections.singletonList( - RqueueMessage.builder() - .queueName(fastQueue) - .message(fastQueueMessage) - .processAt(System.currentTimeMillis()) - .queuedTime(System.nanoTime()) - .id(UUID.randomUUID().toString()) - .build()); - } - return null; - }) + invocation -> { + if (fastQueueCounter.get() == 0) { + fastQueueCounter.incrementAndGet(); + return Collections.singletonList( + RqueueMessage.builder() + .queueName(fastQueue) + .message(fastQueueMessage) + .processAt(System.currentTimeMillis()) + .queuedTime(System.nanoTime()) + .id(UUID.randomUUID().toString()) + .build()); + } + return null; + }) .when(rqueueMessageTemplate) .pop(fastQueue, fastProcessingQueue, fastProcessingQueueChannel, VISIBILITY_TIMEOUT, 1); container.afterPropertiesSet(); @@ -424,12 +446,12 @@ void pausedQueueShouldNotBePolled() throws Exception { StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("messageHandler", RqueueMessageHandler.class); applicationContext.registerSingleton("slowMessageListener", SlowMessageListener.class); - applicationContext.registerSingleton("applicationEventPublisher", TestEventBroadcaster.class); + applicationContext.registerSingleton("rqueueEventBus", TestEventBroadcaster.class); RqueueMessageHandler messageHandler = applicationContext.getBean("messageHandler", RqueueMessageHandler.class); TestEventBroadcaster eventBroadcaster = - applicationContext.getBean("applicationEventPublisher", TestEventBroadcaster.class); + (TestEventBroadcaster) applicationContext.getBean("rqueueEventBus", eventBus, applicationEventPublisher); eventBroadcaster.subscribe(listener); messageHandler.setApplicationContext(applicationContext); @@ -438,7 +460,7 @@ void pausedQueueShouldNotBePolled() throws Exception { queueConfig.setPaused(true); doReturn(queueConfig).when(rqueueSystemConfigDao).getConfigByName(slowQueue); - beanProvider.setApplicationEventPublisher(eventBroadcaster); + beanProvider.setRqueueEventBus(eventBroadcaster); beanProvider.setRqueueMessageHandler(messageHandler); RqueueMessageListenerContainer container = new TestListenerContainer(messageHandler); @@ -496,23 +518,25 @@ public void onApplicationEvent(RqueueBootstrapEvent event) { } @SuppressWarnings("unchecked") - static class TestEventBroadcaster implements ApplicationEventPublisher { + static class TestEventBroadcaster extends RqueueEventBus { - List listenerList = new LinkedList<>(); + private final List listenerList = new LinkedList<>(); + + public TestEventBroadcaster(EventBus eventBus, + ApplicationEventPublisher applicationEventPublisher) { + super(eventBus, applicationEventPublisher); + } void subscribe(ApplicationListener listener) { listenerList.add(listener); } @Override - public void publishEvent(ApplicationEvent event) { + public void publish(ApplicationEvent event) { for (ApplicationListener listener : listenerList) { listener.onApplicationEvent(event); } } - - @Override - public void publishEvent(Object event) {} } private class TestListenerContainer extends RqueueMessageListenerContainer { diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMiddlewareTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMiddlewareTest.java index e2d820e9..e8ce2acb 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMiddlewareTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMiddlewareTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.core.context.Context; import com.github.sonus21.rqueue.core.context.DefaultContext; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.core.middleware.ContextMiddleware; import com.github.sonus21.rqueue.core.middleware.Middleware; import com.github.sonus21.rqueue.core.middleware.PermissionMiddleware; @@ -75,7 +76,6 @@ import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.converter.MessageConverter; @CoreUnitTest @@ -106,7 +106,7 @@ class RqueueMiddlewareTest extends TestBase { @Mock private RqueueSystemConfigDao rqueueSystemConfigDao; @Mock - private ApplicationEventPublisher applicationEventPublisher; + private RqueueEventBus rqueueEventBus; private RqueueMessage rqueueMessage = new RqueueMessage(); private PostProcessingHandler postProcessingHandler; private MessageMetadata defaultMessageMetadata; @@ -132,7 +132,7 @@ null, new MessageProcessor() { postProcessingHandler = new PostProcessingHandler( rqueueWebConfig, - applicationEventPublisher, + rqueueEventBus, messageTemplate, taskBackOff, messageProcessorHandler, diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java index 27d3926b..2feb6a30 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import com.github.sonus21.rqueue.CoreUnitTest; import com.github.sonus21.rqueue.config.MetricsProperties; import com.github.sonus21.rqueue.core.EndpointRegistry; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueStringDao; import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; @@ -54,6 +55,7 @@ class RqueueMetricsTest extends TestBase { TestUtils.createQueueDetail(simpleQueue, deadLetterQueue); @Mock private RqueueStringDao rqueueStringDao; @Mock private QueueCounter queueCounter; + @Mock private RqueueEventBus eventBus; @BeforeEach public void init() { @@ -91,7 +93,7 @@ private void verifyQueueStatistics( private void verifyCounterRegisterMethodIsCalled(Tags tags) throws IllegalAccessException { MeterRegistry meterRegistry = new SimpleMeterRegistry(); metricsProperties.setMetricTags(tags); - RqueueMetrics metrics = rqueueMetrics(meterRegistry, metricsProperties); + RqueueMetrics metrics = rqueueMetrics(meterRegistry, metricsProperties, eventBus); metrics.onApplicationEvent(new RqueueBootstrapEvent("Test", true)); verify(queueCounter, times(1)) .registerQueue( @@ -109,9 +111,11 @@ private void verifyCounterRegisterMethodIsCalled(Tags tags) throws IllegalAccess } private RqueueMetrics rqueueMetrics( - MeterRegistry meterRegistry, MetricsProperties metricsProperties) + MeterRegistry meterRegistry, + MetricsProperties metricsProperties, + RqueueEventBus eventBus) throws IllegalAccessException { - RqueueMetrics metrics = new RqueueMetrics(queueCounter); + RqueueMetrics metrics = new RqueueMetrics(queueCounter, eventBus); FieldUtils.writeField(metrics, "meterRegistry", meterRegistry, true); FieldUtils.writeField(metrics, "metricsProperties", metricsProperties, true); return metrics; @@ -153,7 +157,7 @@ void queueStatistics() throws IllegalAccessException { .when(rqueueStringDao) .getListSize(anyString()); MeterRegistry meterRegistry = new SimpleMeterRegistry(); - RqueueMetrics metrics = rqueueMetrics(meterRegistry, metricsProperties); + RqueueMetrics metrics = rqueueMetrics(meterRegistry, metricsProperties,eventBus); FieldUtils.writeField(metrics, "rqueueStringDao", rqueueStringDao, true); metrics.onApplicationEvent(new RqueueBootstrapEvent("Test", true)); verifyQueueStatistics(meterRegistry, simpleQueue, 100, 10, 300, 0); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java index 91b328d3..5c101234 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import com.github.sonus21.TestBase; import com.github.sonus21.rqueue.CoreUnitTest; import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueStringDao; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.listener.QueueDetail; @@ -59,6 +60,9 @@ class RqueueSystemManagerServiceTest extends TestBase { @Mock private RqueueStringDao rqueueStringDao; @Mock private RqueueSystemConfigDao rqueueSystemConfigDao; @Mock private RqueueMessageMetadataService rqueueMessageMetadataService; + + @Mock + private RqueueEventBus eventBus; private RqueueSystemManagerService rqueueSystemManagerService; private Set queues; @@ -67,7 +71,9 @@ public void init() { MockitoAnnotations.openMocks(this); rqueueSystemManagerService = new RqueueSystemManagerServiceImpl( - rqueueConfig, rqueueStringDao, rqueueSystemConfigDao, rqueueMessageMetadataService); + rqueueConfig, rqueueStringDao, + rqueueSystemConfigDao, + rqueueMessageMetadataService, eventBus); queues = new HashSet<>(); queues.add(slowQueue); queues.add(fastQueue); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskMetricsAggregatorServiceTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskMetricsAggregatorServiceTest.java index bc835062..a65ea23c 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskMetricsAggregatorServiceTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskMetricsAggregatorServiceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import com.github.sonus21.rqueue.core.Job; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueJobDao; import com.github.sonus21.rqueue.dao.RqueueQStatsDao; import com.github.sonus21.rqueue.exception.TimedOutException; @@ -70,6 +71,9 @@ class RqueueTaskMetricsAggregatorServiceTest extends TestBase { @Mock private RqueueMessageMetadataService rqueueMessageMetadataService; @Mock private RqueueJobDao rqueueJobDao; @Mock private RqueueMessageTemplate rqueueMessageTemplate; + + @Mock + private RqueueEventBus eventBus; private RqueueJobMetricsAggregatorService rqueueJobMetricsAggregatorService; @BeforeEach @@ -77,7 +81,7 @@ public void initService() throws IllegalAccessException { MockitoAnnotations.openMocks(this); rqueueJobMetricsAggregatorService = new RqueueJobMetricsAggregatorService( - rqueueConfig, rqueueWebConfig, rqueueLockManager, rqueueQStatsDao); + rqueueConfig, rqueueWebConfig, rqueueLockManager, rqueueQStatsDao, eventBus); doReturn(true).when(rqueueWebConfig).isCollectListenerStats(); doReturn(1).when(rqueueWebConfig).getStatsAggregatorThreadCount(); doReturn(100).when(rqueueWebConfig).getAggregateEventWaitTimeInSecond(); diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java index aaa6f1dd..27e56376 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import com.github.sonus21.rqueue.CoreUnitTest; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.core.EndpointRegistry; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.dao.RqueueStringDao; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.listener.QueueDetail; @@ -65,6 +66,9 @@ class RqueueSystemManagerServiceImplTest extends TestBase { @Mock private RqueueSystemConfigDao rqueueSystemConfigDao; @Mock private RqueueConfig rqueueConfig; @Mock private RqueueMessageMetadataService rqueueMessageMetadataService; + + @Mock + private RqueueEventBus eventBus; private RqueueSystemManagerServiceImpl rqueueSystemManagerService; @BeforeEach @@ -73,7 +77,9 @@ public void init() { EndpointRegistry.delete(); rqueueSystemManagerService = new RqueueSystemManagerServiceImpl( - rqueueConfig, rqueueStringDao, rqueueSystemConfigDao, rqueueMessageMetadataService); + rqueueConfig, rqueueStringDao, + rqueueSystemConfigDao, rqueueMessageMetadataService, + eventBus); slowQueueConfig.setId(TestUtils.getQueueConfigKey(slowQueue)); fastQueueConfig.setId(TestUtils.getQueueConfigKey(fastQueue)); EndpointRegistry.register(slowQueueDetail); diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java index 0a331066..9162984a 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package com.github.sonus21.rqueue.spring.boot; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.metrics.QueueCounter; import com.github.sonus21.rqueue.metrics.RqueueCounter; import com.github.sonus21.rqueue.metrics.RqueueMetrics; @@ -46,7 +47,9 @@ public class RqueueMetricsAutoConfig { @Bean public RqueueMetricsRegistry rqueueMetricsRegistry( - MetricsProperties metricsProperties, RqueueMetricsProperties rqueueMetricsProperties) { + MetricsProperties metricsProperties, + RqueueMetricsProperties rqueueMetricsProperties, + RqueueEventBus eventBus) { Tags actualTags = Tags.empty(); for (Entry e : getTags(metricsProperties).entrySet()) { actualTags = Tags.concat(actualTags, e.getKey(), e.getValue()); @@ -56,7 +59,7 @@ public RqueueMetricsRegistry rqueueMetricsRegistry( } rqueueMetricsProperties.setMetricTags(actualTags); QueueCounter queueCounter = new QueueCounter(); - return new RqueueMetrics(queueCounter); + return new RqueueMetrics(queueCounter, eventBus); } @SuppressWarnings("unchecked") diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueMetricsAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueMetricsAutoConfigTest.java index da6fdc58..e132740c 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueMetricsAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueMetricsAutoConfigTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import com.github.sonus21.TestBase; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.metrics.RqueueMetricsRegistry; import com.github.sonus21.rqueue.spring.boot.RqueueMetricsAutoConfig; import com.github.sonus21.rqueue.spring.boot.RqueueMetricsProperties; @@ -28,20 +29,33 @@ import java.util.Collections; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.springframework.boot.actuate.autoconfigure.metrics.MetricsProperties; @SpringBootUnitTest @Slf4j class RqueueMetricsAutoConfigTest extends TestBase { + @Mock + private RqueueEventBus rqueueEventBus; + + @BeforeEach + public void init() throws IllegalAccessException { + MockitoAnnotations.openMocks(this); + } + + @Test void rqueueMetricsRegistryNoAdditionalTags() { RqueueMetricsAutoConfig rqueueMetricsAutoConfig = new RqueueMetricsAutoConfig(); MetricsProperties metricsProperties = new MetricsProperties(); RqueueMetricsProperties rqueueMetricsProperties = new RqueueMetricsProperties(); RqueueMetricsRegistry rqueueMetricsRegistry = - rqueueMetricsAutoConfig.rqueueMetricsRegistry(metricsProperties, rqueueMetricsProperties); + rqueueMetricsAutoConfig.rqueueMetricsRegistry(metricsProperties, + rqueueMetricsProperties, rqueueEventBus); assertNotNull(rqueueMetricsRegistry); assertEquals(Tags.empty(), rqueueMetricsProperties.getMetricTags()); } @@ -53,7 +67,7 @@ void rqueueMetricsRegistryTagsFromRqueueProperties() { RqueueMetricsProperties rqueueMetricsProperties = new RqueueMetricsProperties(); rqueueMetricsProperties.setTags(Collections.singletonMap("dc", "test")); RqueueMetricsRegistry rqueueMetricsRegistry = - rqueueMetricsAutoConfig.rqueueMetricsRegistry(metricsProperties, rqueueMetricsProperties); + rqueueMetricsAutoConfig.rqueueMetricsRegistry(metricsProperties, rqueueMetricsProperties, rqueueEventBus); assertNotNull(rqueueMetricsRegistry); assertEquals(Tags.of("dc", "test"), rqueueMetricsProperties.getMetricTags()); } @@ -73,7 +87,7 @@ void rqueueMetricsRegistryMergedTags() throws IllegalAccessException { FieldUtils.writeField( metricsProperties, "tags", Collections.singletonMap("region", "ap-south-1"), true); RqueueMetricsRegistry rqueueMetricsRegistry = - rqueueMetricsAutoConfig.rqueueMetricsRegistry(metricsProperties, rqueueMetricsProperties); + rqueueMetricsAutoConfig.rqueueMetricsRegistry(metricsProperties, rqueueMetricsProperties, rqueueEventBus); assertNotNull(rqueueMetricsRegistry); assertEquals( Tags.of("region", "ap-south-1", "dc", "test"), rqueueMetricsProperties.getMetricTags()); diff --git a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/PauseUnpauseEventListener.java b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/PauseUnpauseEventListener.java index b9664fda..ca7e49d8 100644 --- a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/PauseUnpauseEventListener.java +++ b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/PauseUnpauseEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,20 +16,28 @@ package com.github.sonus21.rqueue.test; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.models.event.RqueueQueuePauseEvent; +import com.google.common.eventbus.Subscribe; import java.util.LinkedList; import java.util.List; import lombok.Getter; -import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @Component @Getter -public class PauseUnpauseEventListener implements ApplicationListener { +public class PauseUnpauseEventListener { + + private final RqueueEventBus eventBus; + + public PauseUnpauseEventListener(RqueueEventBus eventBus) { + this.eventBus = eventBus; + eventBus.register(this); + } private final List eventList = new LinkedList<>(); - @Override + @Subscribe public void onApplicationEvent(RqueueQueuePauseEvent event) { eventList.add(event); } diff --git a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/QueueRegistryUpdater.java b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/QueueRegistryUpdater.java index 83be90d8..6c97c1be 100644 --- a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/QueueRegistryUpdater.java +++ b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/QueueRegistryUpdater.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,23 +19,32 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.core.RqueueEndpointManager; import com.github.sonus21.rqueue.core.RqueueMessageSender; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.models.enums.RqueueMode; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; -import lombok.AllArgsConstructor; +import com.google.common.eventbus.Subscribe; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @Component -@AllArgsConstructor @Slf4j -public class QueueRegistryUpdater implements ApplicationListener { +public class QueueRegistryUpdater { private final RqueueMessageSender rqueueMessageSender; private final RqueueEndpointManager rqueueEndpointManager; private final RqueueConfig rqueueConfig; - @Override + public QueueRegistryUpdater(RqueueMessageSender rqueueMessageSender, + RqueueEndpointManager rqueueEndpointManager, RqueueConfig rqueueConfig, + RqueueEventBus eventBus) { + this.rqueueMessageSender = rqueueMessageSender; + this.rqueueEndpointManager = rqueueEndpointManager; + this.rqueueConfig = rqueueConfig; + eventBus.register(this); + } + + + @Subscribe public void onApplicationEvent(RqueueBootstrapEvent event) { log.info("Mode {} Event {}", rqueueConfig.getMode(), event); if (!RqueueMode.PRODUCER.equals(rqueueConfig.getMode()) || !event.isStartup()) { diff --git a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/RqueueEventListener.java b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/RqueueEventListener.java index 0f8742b5..a5f588b6 100644 --- a/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/RqueueEventListener.java +++ b/rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/service/RqueueEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,15 +15,21 @@ */ package com.github.sonus21.rqueue.test.service; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; +import com.google.common.eventbus.Subscribe; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.springframework.stereotype.Component; @Component public class RqueueEventListener { + public RqueueEventListener(RqueueEventBus eventBus) { + eventBus.register(this); + } + + private Queue executionEvents = new ConcurrentLinkedQueue<>(); public void clearQueue() { @@ -34,7 +40,7 @@ public int getEventCount() { return executionEvents.size(); } - @EventListener + @Subscribe public void listen(RqueueExecutionEvent event) { executionEvents.add(event); } diff --git a/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java b/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java index 5f421bc2..cd760c2b 100644 --- a/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java +++ b/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Sonu Kumar + * Copyright 2023 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import com.github.sonus21.rqueue.core.RqueueMessageManager; import com.github.sonus21.rqueue.core.RqueueMessageSender; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus; import com.github.sonus21.rqueue.core.impl.ReactiveRqueueMessageEnqueuerImpl; import com.github.sonus21.rqueue.core.impl.RqueueEndpointManagerImpl; import com.github.sonus21.rqueue.core.impl.RqueueMessageEnqueuerImpl; @@ -105,9 +106,9 @@ public RqueueMessageEnqueuer rqueueMessageEnqueuer( @Bean @Conditional(MetricsEnabled.class) @DependsOn({"meterRegistry", "rqueueMetricsProperties"}) - public RqueueMetricsRegistry rqueueMetricsRegistry() { + public RqueueMetricsRegistry rqueueMetricsRegistry(RqueueEventBus eventBus) { QueueCounter queueCounter = new QueueCounter(); - return new RqueueMetrics(queueCounter); + return new RqueueMetrics(queueCounter, eventBus); } @Bean