Skip to content

Commit

Permalink
Use event bus for gradle (#214)
Browse files Browse the repository at this point in the history
Co-authored-by: Sonu Kumar <sonu@git>
  • Loading branch information
sonus21 and Sonu Kumar authored Dec 5, 2023
1 parent c3d69a4 commit 82b7ef0
Show file tree
Hide file tree
Showing 36 changed files with 641 additions and 275 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.13.1-RELEASE'
version = '2.13.2-SNAPSHOT'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Getter
@Setter
public class RqueueEventBusConfig {

@Value("${rqueue.event.bus.core.pool.size:2}")
private int corePoolSize;

@Value("${rqueue.event.bus.max.pool.size:10}")
private int maxPoolSize;

@Value("${rqueue.event.bus.queue.capacity:100}")
private int queueCapacity;

@Value("${rqueue.event.bus.keep.alive.time:60000}")
private int keepAliveTime;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler;
import com.github.sonus21.rqueue.core.eventbus.EventBusErrorHandler;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
Expand All @@ -37,6 +39,8 @@
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
import com.github.sonus21.rqueue.utils.pebble.ResourceLoader;
import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.mitchellbosecke.pebble.PebbleEngine;
import com.mitchellbosecke.pebble.spring.extension.SpringExtension;
import com.mitchellbosecke.pebble.spring.reactive.PebbleReactiveViewResolver;
Expand All @@ -45,11 +49,13 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs
Expand Down Expand Up @@ -105,8 +111,8 @@ protected MessageConverterProvider getMessageConverterProvider() {
* Database for different ops.
*
* @param beanFactory configurable bean factory
* @param versionKey Rqueue db version key
* @param dbVersion database version
* @param versionKey Rqueue db version key
* @param dbVersion database version
* @return {@link RedisConnectionFactory} object.
*/
@Bean
Expand Down Expand Up @@ -150,6 +156,11 @@ public RqueueWebConfig rqueueWebConfig() {
return new RqueueWebConfig();
}

@Bean
public RqueueEventBusConfig rqueueEventBusConfig() {
return new RqueueEventBusConfig();
}

@Bean
public RqueueSchedulerConfig rqueueSchedulerConfig() {
return new RqueueSchedulerConfig();
Expand Down Expand Up @@ -190,8 +201,14 @@ public RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory()
* @return {@link ScheduledQueueMessageScheduler} object
*/
@Bean
public ScheduledQueueMessageScheduler scheduledMessageScheduler() {
return new ScheduledQueueMessageScheduler();
public ScheduledQueueMessageScheduler scheduledMessageScheduler(
RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig,
RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
@Qualifier("rqueueRedisLongTemplate")
RedisTemplate<String, Long> redisTemplate) {
return new ScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate);
}

/**
Expand All @@ -201,8 +218,14 @@ public ScheduledQueueMessageScheduler scheduledMessageScheduler() {
* @return {@link ProcessingQueueMessageScheduler} object
*/
@Bean
public ProcessingQueueMessageScheduler processingMessageScheduler() {
return new ProcessingQueueMessageScheduler();
public ProcessingQueueMessageScheduler processingMessageScheduler(
RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig,
RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
@Qualifier("rqueueRedisLongTemplate")
RedisTemplate<String, Long> redisTemplate) {
return new ProcessingQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate);
}

@Bean
Expand Down Expand Up @@ -265,12 +288,27 @@ public RqueueInternalPubSubChannel rqueueInternalPubSubChannel(
RqueueConfig rqueueConfig,
RqueueBeanProvider rqueueBeanProvider,
@Qualifier("stringRqueueRedisTemplate")
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
return new RqueueInternalPubSubChannel(
rqueueRedisListenerContainerFactory,
rqueueMessageListenerContainer,
rqueueConfig,
stringRqueueRedisTemplate,
rqueueBeanProvider);
}

@Bean
public RqueueEventBus rqueueEventBus(ApplicationEventPublisher applicationEventPublisher,
RqueueEventBusConfig rqueueEventBusConfig) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(rqueueEventBusConfig.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(rqueueEventBusConfig.getMaxPoolSize());
threadPoolTaskExecutor.setKeepAliveSeconds(rqueueEventBusConfig.getKeepAliveTime());
threadPoolTaskExecutor.setQueueCapacity(rqueueEventBusConfig.getQueueCapacity());
threadPoolTaskExecutor.setThreadNamePrefix("RqueueEventBusAsyncExecutor-");
threadPoolTaskExecutor.initialize();
EventBus eventBus = new AsyncEventBus(threadPoolTaskExecutor);
eventBus.register(new EventBusErrorHandler());
return new RqueueEventBus(eventBus, applicationEventPublisher);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,13 @@
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
Expand All @@ -33,29 +36,23 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public abstract class MessageScheduler implements DisposableBean,
ApplicationListener<RqueueBootstrapEvent> {
public abstract class MessageScheduler implements DisposableBean {

private final Object monitor = new Object();
@Autowired
protected RqueueSchedulerConfig rqueueSchedulerConfig;
@Autowired
protected RqueueConfig rqueueConfig;
protected final RqueueSchedulerConfig rqueueSchedulerConfig;
protected final RqueueConfig rqueueConfig;
private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
private final RedisTemplate<String, Long> redisTemplate;
private RedisScript<Long> redisScript;
private DefaultScriptExecutor<String> defaultScriptExecutor;
private Map<String, Boolean> queueRunningState;
Expand All @@ -66,14 +63,21 @@ public abstract class MessageScheduler implements DisposableBean,
protected RedisScheduleTriggerHandler redisScheduleTriggerHandler;

private ThreadPoolTaskScheduler scheduler;
@Autowired
private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;

@Autowired
@Qualifier("rqueueRedisLongTemplate")
private RedisTemplate<String, Long> redisTemplate;
private Map<String, Integer> errorCount;

protected MessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig,
RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
RedisTemplate<String, Long> redisTemplate) {
this.rqueueSchedulerConfig = rqueueSchedulerConfig;
this.rqueueConfig = rqueueConfig;
this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
this.redisTemplate = redisTemplate;
eventBus.register(this);
}

protected abstract Logger getLogger();

protected abstract long getNextScheduleTime(String queueName, long currentTime, Long value);
Expand Down Expand Up @@ -224,9 +228,9 @@ private void initQueue(String queueName) {
queueRunningState.put(queueName, false);
}

@Override
@Async
@Subscribe
public void onApplicationEvent(RqueueBootstrapEvent event) {
getLogger().info("{} Even received", event);
synchronized (monitor) {
doStop();
if (!rqueueSchedulerConfig.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,20 +18,31 @@

import static java.lang.Long.max;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.listener.QueueDetail;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.github.sonus21.rqueue.utils.Constants;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.springframework.data.redis.core.RedisTemplate;

@Slf4j
public class ProcessingQueueMessageScheduler extends MessageScheduler {

private Map<String, Long> queueNameToDelay;

public ProcessingQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig, RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
RedisTemplate<String, Long> redisTemplate) {
super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory,
redisTemplate);
}


@Override
protected void initialize() {
super.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.common.RqueueLockManager;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.dao.RqueueJobDao;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
Expand All @@ -28,7 +29,6 @@
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;

@Getter
@Setter
Expand All @@ -38,7 +38,7 @@ public class RqueueBeanProvider {
@Autowired private RqueueSystemConfigDao rqueueSystemConfigDao;
@Autowired private RqueueJobDao rqueueJobDao;
@Autowired private RqueueWebConfig rqueueWebConfig;
@Autowired private ApplicationEventPublisher applicationEventPublisher;
@Autowired private RqueueEventBus rqueueEventBus;
@Autowired private RqueueLockManager rqueueLockManager;

@Autowired(required = false)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,25 @@

package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.springframework.data.redis.core.RedisTemplate;

@Slf4j
public class ScheduledQueueMessageScheduler extends MessageScheduler {


public ScheduledQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig, RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
RedisTemplate<String, Long> redisTemplate) {
super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory,
redisTemplate);
}

@Override
protected Logger getLogger() {
return log;
Expand Down
Loading

0 comments on commit 82b7ef0

Please sign in to comment.