From cdd77b4f462d3d10fa18312744991ef573568067 Mon Sep 17 00:00:00 2001 From: dzh Date: Sat, 19 Nov 2022 17:32:43 +0800 Subject: [PATCH] add MnsDispatcher --- .../src/main/java/jframe/core/FrameEvent.java | 4 +- .../core/dispatch/DefDispatchFactory.java | 104 +++++------ .../jframe/core/plugin/DefPluginContext.java | 1 - .../loader/ext/PluginServiceClassLoader.java | 5 +- .../java/jframe/core/unit/PluginUnit.java | 28 ++- jframe/jframe-ext/pom.xml | 18 +- .../jframe/ext/dispatch/kafka/KafkaConst.java | 24 +++ .../ext/dispatch/kafka/KafkaDispatcher.java | 32 +--- .../jframe/ext/dispatch/mns/MnsConst.java | 23 +++ .../dispatch/mns/MnsConsumerDispatcher.java | 19 ++ .../ext/dispatch/mns/MnsDispatcher.java | 175 ++++++++++++++++++ .../dispatch/mns/MnsProducerDispatcher.java | 19 ++ .../ext/dispatch/ons/OnsDispatcher.java | 10 +- .../ext/dispatch/rocketmq/RmqConst.java | 15 +- .../ext/dispatch/rocketmq/RmqDispatcher.java | 10 +- .../{dispatch/rocketmq => msg}/MsgCodec.java | 2 +- .../rocketmq => msg}/TextMsgCodec.java | 3 +- .../java/jframe/ext/plugin/KafkaPlugin.java | 13 +- .../java/jframe/ext/plugin/MnsPlugin.java | 20 ++ .../main/java/jframe/ext/plugin/MqPlugin.java | 13 ++ .../jframe/ext/plugin/RocketmqPlugin.java | 11 +- 21 files changed, 416 insertions(+), 133 deletions(-) create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaConst.java create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConst.java create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConsumerDispatcher.java create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsDispatcher.java create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsProducerDispatcher.java rename jframe/jframe-ext/src/main/java/jframe/ext/{dispatch/rocketmq => msg}/MsgCodec.java (87%) rename jframe/jframe-ext/src/main/java/jframe/ext/{dispatch/rocketmq => msg}/TextMsgCodec.java (92%) create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/plugin/MnsPlugin.java create mode 100644 jframe/jframe-ext/src/main/java/jframe/ext/plugin/MqPlugin.java diff --git a/jframe/jframe-core/src/main/java/jframe/core/FrameEvent.java b/jframe/jframe-core/src/main/java/jframe/core/FrameEvent.java index ef5f92a..9649c12 100644 --- a/jframe/jframe-core/src/main/java/jframe/core/FrameEvent.java +++ b/jframe/jframe-core/src/main/java/jframe/core/FrameEvent.java @@ -17,9 +17,9 @@ public class FrameEvent extends EventObject { */ private static final long serialVersionUID = 1L; - private int type; + private final int type; - public static final int Init = 1 << 0; + public static final int Init = 1; public static final int Start = 1 << 1; public static final int Stop = 1 << 2; diff --git a/jframe/jframe-core/src/main/java/jframe/core/dispatch/DefDispatchFactory.java b/jframe/jframe-core/src/main/java/jframe/core/dispatch/DefDispatchFactory.java index 303be81..33d4be1 100644 --- a/jframe/jframe-core/src/main/java/jframe/core/dispatch/DefDispatchFactory.java +++ b/jframe/jframe-core/src/main/java/jframe/core/dispatch/DefDispatchFactory.java @@ -1,5 +1,5 @@ /** - * + * */ package jframe.core.dispatch; @@ -10,68 +10,68 @@ /** * dispatch管理管理工厂 - * + * * @author dzh * @date Jun 20, 2013 9:47:12 AM */ public class DefDispatchFactory implements DispatchFactory { - private final Object _lock = new Object(); + private final Object _lock = new Object(); - private List _dList = new ArrayList(2); + private List _dList = new ArrayList(2); - private DefDispatchFactory() { + private DefDispatchFactory() { - } + } - public static final DispatchFactory newInstance() { - return new DefDispatchFactory(); - } + public static DispatchFactory newInstance() { + return new DefDispatchFactory(); + } - public Dispatcher findDispatcher(String dispatcherID) { - List list = _dList; - Dispatcher dl = null; - synchronized (_lock) { - for (Dispatcher d : list) { - if (d.getID().equals(dispatcherID)) { - dl = d; - break; - } - } - } - return dl; - } + public Dispatcher findDispatcher(String dispatcherID) { + List list = _dList; + Dispatcher dl = null; + synchronized (_lock) { + for (Dispatcher d : list) { + if (d.getID().equals(dispatcherID)) { + dl = d; + break; + } + } + } + return dl; + } - public Dispatcher createDispatcher(String dispatcherID, Config config) { - Dispatcher d = DefDispatcher.newDispatcher(dispatcherID, config); - d.start(); - synchronized (_lock) { - _dList.add(d); - } - return d; - } + public Dispatcher createDispatcher(String dispatcherID, Config config) { + Dispatcher d = DefDispatcher.newDispatcher(dispatcherID, config); + d.start(); + synchronized (_lock) { + _dList.add(d); + } + return d; + } - /** - * if dispatcherID==Null, close all delegates - */ - public void removeDispatcher(String dispatcherID) { - List list = _dList; - if (dispatcherID == null) { - synchronized (_lock) { - for (Dispatcher d : list) { - d.close(); - } - } - list.clear(); - } else { - synchronized (_lock) { - Dispatcher d = findDispatcher(dispatcherID); - if (d != null) { - d.close(); - list.remove(d); - } - } - } - } + /** + * if dispatcherID==Null, close all delegates + */ + public void removeDispatcher(String dispatcherID) { + List list = _dList; + if (dispatcherID == null) { + synchronized (_lock) { + for (Dispatcher d : list) { + d.close(); + } + } + list.clear(); + } else { + synchronized (_lock) { + Dispatcher d = findDispatcher(dispatcherID); + if (d != null) { + d.close(); + list.remove(d); + } + } + } + } } diff --git a/jframe/jframe-core/src/main/java/jframe/core/plugin/DefPluginContext.java b/jframe/jframe-core/src/main/java/jframe/core/plugin/DefPluginContext.java index 1a40190..d6dda6e 100644 --- a/jframe/jframe-core/src/main/java/jframe/core/plugin/DefPluginContext.java +++ b/jframe/jframe-core/src/main/java/jframe/core/plugin/DefPluginContext.java @@ -297,7 +297,6 @@ public void notifyPluginEvent(PluginEvent event) { } /** - * * (non-Javadoc) * * @return if find PluginRef return it, or return null diff --git a/jframe/jframe-core/src/main/java/jframe/core/plugin/loader/ext/PluginServiceClassLoader.java b/jframe/jframe-core/src/main/java/jframe/core/plugin/loader/ext/PluginServiceClassLoader.java index 09cbcbf..2453207 100644 --- a/jframe/jframe-core/src/main/java/jframe/core/plugin/loader/ext/PluginServiceClassLoader.java +++ b/jframe/jframe-core/src/main/java/jframe/core/plugin/loader/ext/PluginServiceClassLoader.java @@ -1,5 +1,5 @@ /** - * + * */ package jframe.core.plugin.loader.ext; @@ -52,9 +52,8 @@ protected void injectImportService(Field f) { /** * register export-service - * + * * @param pc - * @param p */ public void loadService(PluginCase pc) { ServiceContext sc = plc.getServiceContext(); diff --git a/jframe/jframe-core/src/main/java/jframe/core/unit/PluginUnit.java b/jframe/jframe-core/src/main/java/jframe/core/unit/PluginUnit.java index 70d96b0..603814b 100644 --- a/jframe/jframe-core/src/main/java/jframe/core/unit/PluginUnit.java +++ b/jframe/jframe-core/src/main/java/jframe/core/unit/PluginUnit.java @@ -1,18 +1,8 @@ /** - * + * */ package jframe.core.unit; -import java.io.File; -import java.io.FilenameFilter; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import jframe.core.Frame; import jframe.core.conf.Config; import jframe.core.plugin.DefPluginContext; @@ -27,13 +17,22 @@ import jframe.core.plugin.loader.ext.PluginServiceCreator; import jframe.core.signal.Signal; import jframe.core.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; /** *

* Feature: *

  • load plug-in
  • *

    - * + * * @author dzh * @date Sep 23, 2013 2:47:41 PM * @since 1.0 @@ -75,7 +74,7 @@ public void start() throws UnitException { } /** - * + * */ private void cleanCache() { Config config = getFrame().getConfig(); @@ -89,8 +88,7 @@ private void cleanCache() { } /** - * - * @param path_plugin + * */ private List loadPlugin() { // create plugin diff --git a/jframe/jframe-ext/pom.xml b/jframe/jframe-ext/pom.xml index c8c5d9f..bd8b64e 100644 --- a/jframe/jframe-ext/pom.xml +++ b/jframe/jframe-ext/pom.xml @@ -17,40 +17,46 @@ com.google.code.gson gson - 2.8.5 + 2.10 org.apache.activemq activemq-client - 5.10.0 + 5.17.2 org.apache.activemq activemq-jms-pool - 5.15.7 + 5.17.2 org.apache.kafka kafka-clients - 2.1.0 + 3.3.1 org.apache.rocketmq rocketmq-client - 4.6.0 + 5.0.0 com.aliyun.openservices ons-client - 1.8.4.Final + 2.0.4.Final + + + + com.aliyun.mns + aliyun-sdk-mns + 1.1.9.1 diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaConst.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaConst.java new file mode 100644 index 0000000..0fee659 --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaConst.java @@ -0,0 +1,24 @@ +package jframe.ext.dispatch.kafka; + +/** + * @author dzh + * @date 2022/11/18 11:13 + */ +public interface KafkaConst { + + // default + String DEFAULT_TOPIC = "jframe"; + + // config + String FILE_KAFKA_PRODUCER = "file.kafka.producer"; + String FILE_KAFKA_CONSUMER = "file.kafka.consumer"; + String D_KAFKA_SUBSCRIBE = "d.kafka.subscribe"; + String D_KAFKA_SUBSCRIBE_REGEX = "d.kafka.subscribe.regex"; + + // msg meta + String M_KAFKA_TOPIC = "m.kafka.topic"; + String M_KAFKA_KEY = "m.kafka.key"; + String M_KAFKA_PARTITION = "m.kafka.partition"; + String M_KAFKA_TIMESTAMP = "m.kafka.timestamp"; + +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaDispatcher.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaDispatcher.java index 3d370e5..6790b9a 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaDispatcher.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/kafka/KafkaDispatcher.java @@ -18,7 +18,6 @@ import java.util.Collections; import java.util.Objects; import java.util.Properties; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** @@ -42,25 +41,10 @@ * @version 0.0.1 * @date Dec 26, 2018 5:16:26 PM */ -public class KafkaDispatcher extends AbstractDispatcher { +public class KafkaDispatcher extends AbstractDispatcher implements KafkaConst { static Logger LOG = LoggerFactory.getLogger(KafkaDispatcher.class); - // default - public static final String DEFAULT_TOPIC = "jframe"; - - // config - public static final String FILE_KAFKA_PRODUCER = "file.kafka.producer"; - public static final String FILE_KAFKA_CONSUMER = "file.kafka.consumer"; - public static final String D_KAFKA_SUBSCRIBE = "d.kafka.subscribe"; - public static final String D_KAFKA_SUBSCRIBE_REGEX = "d.kafka.subscribe.regex"; - - // msg meta - public static final String D_KAFKA_R_TOPIC = "d.kafka.r.topic"; - public static final String D_KAFKA_R_KEY = "d.kafka.r.key"; - public static final String D_KAFKA_R_PARTITION = "d.kafka.r.partition"; - public static final String D_KAFKA_R_TIMESTAMP = "d.kafka.r.timestamp"; - private Producer> producer; private Consumer> consumer; @@ -214,20 +198,20 @@ protected void loadDefaultConsumer(Properties props) { @Override public void receive(Msg msg) { if (producer != null) { - String topic = (String) msg.getMeta(D_KAFKA_R_TOPIC); + String topic = (String) msg.getMeta(M_KAFKA_TOPIC); if (Objects.isNull(topic)) { topic = DEFAULT_TOPIC; } Integer partition = partition(msg); Long timestamp = timestamp(msg); - String key = (String) msg.getMeta(D_KAFKA_R_KEY); + String key = (String) msg.getMeta(M_KAFKA_KEY); ProducerRecord> record = new ProducerRecord<>(topic, partition, timestamp, key, msg, null); producer.send(record); } } private Long timestamp(Msg msg) { - Object ts = msg.getMeta(D_KAFKA_R_TIMESTAMP); + Object ts = msg.getMeta(M_KAFKA_TIMESTAMP); if (ts == null) return null; if (ts instanceof Long) return (Long) ts; if (ts instanceof String) return Long.parseLong((String) ts); @@ -235,7 +219,7 @@ private Long timestamp(Msg msg) { } private Integer partition(Msg msg) { - Object p = msg.getMeta(D_KAFKA_R_PARTITION); + Object p = msg.getMeta(M_KAFKA_PARTITION); if (p == null) return null; if (p instanceof Integer) return (Integer) p; if (p instanceof String) return Integer.parseInt((String) p); @@ -254,11 +238,12 @@ public Consumer> getConsumer() { public void close() { if (closed) return; // close producer - if (enableProducer()) producer.close(WAIT_CLOSED_SECOND, TimeUnit.SECONDS); + if (enableProducer()) +// producer.close(WAIT_CLOSED_SECOND, TimeUnit.SECONDS); + producer.close(Duration.ofSeconds(WAIT_CLOSED_SECOND)); // close dispatcher and consumer if (enableConsumer()) { - closed = true; if (dispatchT != null) { try { dispatchT.join(WAIT_CLOSED_SECOND * 1000L); @@ -266,6 +251,7 @@ public void close() { } } } + closed = true; super.close(); } diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConst.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConst.java new file mode 100644 index 0000000..85497d2 --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConst.java @@ -0,0 +1,23 @@ +package jframe.ext.dispatch.mns; + +/** + * @author dzh + * @date 2022/11/18 11:06 + */ +public interface MnsConst { + + String FILE_MNS = "file.mns"; + + // conf field + String MNS_ACCESSKEYID = "mns.accesskeyid"; + String MNS_ACCESSKEYSECRET = "mns.accesskeysecret"; + String MNS_ACCOUNTENDPOINT = "mns.accountendpoint"; + String MNS_QUEUE = "mns.queue"; //queue name, e.g. queueName1 queueName2 + + // msg meta + String M_MNS_CODEC = "m.mns.codec"; + String M_MNS_QUEUE = "m.mns.queue"; + + // default topic + String DEFAULT_QUEUE = "jframe"; +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConsumerDispatcher.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConsumerDispatcher.java new file mode 100644 index 0000000..e7275c9 --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsConsumerDispatcher.java @@ -0,0 +1,19 @@ +package jframe.ext.dispatch.mns; + +import jframe.core.conf.Config; + +/** + * @author dzh + * @date 2022/11/17 19:20 + */ +public class MnsConsumerDispatcher extends MnsDispatcher { + + public MnsConsumerDispatcher(String id, Config config) { + super(id, config); + } + + @Override + protected boolean enableProducer() { + return false; + } +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsDispatcher.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsDispatcher.java new file mode 100644 index 0000000..6281b2c --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsDispatcher.java @@ -0,0 +1,175 @@ +package jframe.ext.dispatch.mns; + +import com.aliyun.mns.client.*; +import com.aliyun.mns.model.Message; +import jframe.core.conf.Config; +import jframe.core.dispatch.AbstractDispatcher; +import jframe.core.msg.Msg; +import jframe.ext.msg.MsgCodec; +import jframe.ext.msg.TextMsgCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.util.Objects; +import java.util.Properties; + +/** + * 限制 https://help.aliyun.com/document_detail/128343.html + * 接入点 https://help.aliyun.com/document_detail/175569.html + * http(s)://AccountId.mns.cn-shanghai.aliyuncs.com + * http://AccountId.mns.cn-shanghai-internal.aliyuncs.com + * + * @author dzh + * @date 2022/11/17 16:04 + */ +public class MnsDispatcher extends AbstractDispatcher implements AsyncCallback, MnsConst { + + static Logger LOG = LoggerFactory.getLogger(MnsDispatcher.class); + + private MNSClient client; + + private MsgCodec msgCodec; + + private String queue; //queue name, 目前支持单个队列消费 + + private volatile boolean closed; + + private Thread dispatchT; // consume dispatch thread + + public MnsDispatcher(String id, Config config) { + super(id, config); + } + + @Override + public void start() { + closed = false; + try { + initMNSClient(); + initMsgCodec(); + if (enableConsumer()) startConsumer(); + if (enableProducer()) startProducer(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + close(); + } + } + + private void initMsgCodec() { + String clazz = getConfig().getConfig(M_MNS_CODEC, TextMsgCodec.class.getName()); + try { + LOG.info("initMsgCodec {}", clazz); + msgCodec = (MsgCodec) Class.forName(clazz).newInstance(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + private void initMNSClient() { + String file = getConfig().getConfig(FILE_MNS); + if (Objects.isNull(file)) { + file = getConfig().getConfig(Config.APP_CONF) + "/mns.properties"; + } + Properties props = new Properties(); + try (FileInputStream fis = new FileInputStream(file)) { + props.load(fis); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return; + } + LOG.info("initMNSClient {}", props); + + CloudAccount account = new CloudAccount( + props.getProperty(MNS_ACCESSKEYID), + props.getProperty(MNS_ACCESSKEYSECRET), + props.getProperty(MNS_ACCOUNTENDPOINT)); + client = account.getMNSClient(); + this.queue = props.getProperty(MNS_QUEUE, DEFAULT_QUEUE); + } + + private void startProducer() { + + } + + private void startConsumer() { +// List queues = Arrays.asList(getConfig().getConfig(MnsConst.MNS_QUEUES, "").split("\\s+")); +// if (queues.isEmpty()) { +// LOG.error("msg.queue is not defined. failed to create consumer"); +// return; +// } + this.dispatchT = new Thread(() -> { + LOG.info("{} start", Thread.currentThread().getName()); + CloudQueue q = client.getQueueRef(this.queue); + while (!closed) { + try { +// for (String queue : queues) { + Message msg = q.popMessage(60); + if (msg != null) { + dispatch(msgCodec.decode(msg.getMessageBodyAsBytes())); + q.deleteMessage(msg.getReceiptHandle()); + } +// } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + LOG.info("{} closed", Thread.currentThread().getName()); + }, "MnsConsumeThread"); + dispatchT.start(); + } + + @Override + public void receive(Msg msg) { + if (enableProducer() && client != null && client.isOpen()) { + String q = (String) msg.getMeta(M_MNS_QUEUE); + if (Objects.isNull(q)) { + q = this.queue; //default queue + } + try { + AsyncResult r = client.getQueueRef(q).asyncPutMessage(new Message(msgCodec.encode(msg)), this); +// client.getQueueRef(q).putMessage(new Message(msgCodec.encode(msg))); + LOG.debug("send msg {}, result {}", msg, r); + } catch (Exception e) { + LOG.error(e.getMessage(), e);//todo + } + } else { + LOG.error("discard msg {}", msg); + } + } + + @Override + public void close() { + if (closed) return; + if (client != null) client.close(); + + // close dispatcher and consumer + if (enableConsumer()) { + if (dispatchT != null) { + try { + dispatchT.join(60 * 1000L); + } catch (InterruptedException e) { + } + } + } + closed = true; + super.close(); + } + + protected boolean enableProducer() { + return true; + } + + protected boolean enableConsumer() { + return true; + } + + @Override + public void onSuccess(Message msg) { + LOG.debug("mns put message {} successfully", msg); + } + + @Override + public void onFail(Exception e) { + LOG.error(e.getMessage(), e); + } +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsProducerDispatcher.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsProducerDispatcher.java new file mode 100644 index 0000000..b804b9e --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/mns/MnsProducerDispatcher.java @@ -0,0 +1,19 @@ +package jframe.ext.dispatch.mns; + +import jframe.core.conf.Config; + +/** + * @author dzh + * @date 2022/11/17 19:20 + */ +public class MnsProducerDispatcher extends MnsDispatcher { + + public MnsProducerDispatcher(String id, Config config) { + super(id, config); + } + + @Override + protected boolean enableConsumer() { + return false; + } +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/ons/OnsDispatcher.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/ons/OnsDispatcher.java index 3ae239d..4bba3eb 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/ons/OnsDispatcher.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/ons/OnsDispatcher.java @@ -4,9 +4,9 @@ import jframe.core.conf.Config; import jframe.core.dispatch.AbstractDispatcher; import jframe.core.msg.Msg; -import jframe.ext.dispatch.rocketmq.MsgCodec; import jframe.ext.dispatch.rocketmq.RmqConst; -import jframe.ext.dispatch.rocketmq.TextMsgCodec; +import jframe.ext.msg.MsgCodec; +import jframe.ext.msg.TextMsgCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,7 +152,7 @@ public Action consume(Message msg, ConsumeContext context) { } private void initMsgCodec() { - String clazz = getConfig().getConfig(D_RMQ_CODEC, TextMsgCodec.class.getName()); + String clazz = getConfig().getConfig(M_RMQ_CODEC, TextMsgCodec.class.getName()); try { msgCodec = (MsgCodec) Class.forName(clazz).newInstance(); } catch (Exception e) { @@ -163,14 +163,14 @@ private void initMsgCodec() { @Override public void receive(Msg msg) { if (producer != null) { - String topic = (String) msg.getMeta(D_RMQ_R_TOPIC); + String topic = (String) msg.getMeta(M_RMQ_TOPIC); if (Objects.isNull(topic)) { topic = DEFAULT_TOPIC; } try { Message rmqMsg = new Message(topic, - (String) msg.getMeta(D_RMQ_R_TAG), (String) msg.getMeta(D_RMQ_R_Key), + (String) msg.getMeta(M_RMQ_TAG), (String) msg.getMeta(M_RMQ_Key), msgCodec.encode(msg)); SendResult r = producer.send(rmqMsg); LOG.debug("send msg {}, sendResult {}", msg, r); diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqConst.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqConst.java index ef59b7a..6752897 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqConst.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqConst.java @@ -6,13 +6,14 @@ */ public interface RmqConst { - public static final String FILE_RMQ_PRODUCER = "file.rmq.producer"; - public static final String FILE_RMQ_CONSUMER = "file.rmq.consumer"; + String FILE_RMQ_PRODUCER = "file.rmq.producer"; + String FILE_RMQ_CONSUMER = "file.rmq.consumer"; - public static final String D_RMQ_CODEC = "d.rmq.codec"; // MsgCodec + // msg meta + String M_RMQ_CODEC = "m.rmq.codec"; // MsgCodec + String M_RMQ_TOPIC = "m.rmq.topic"; + String M_RMQ_TAG = "m.rmq.tag"; + String M_RMQ_Key = "m.rmq.key"; - public static final String DEFAULT_TOPIC = "jframe"; - public static final String D_RMQ_R_TOPIC = "d.rmq.r.topic"; - public static final String D_RMQ_R_TAG = "d.rmq.r.tag"; - public static final String D_RMQ_R_Key = "d.rmq.r.key"; + String DEFAULT_TOPIC = "jframe"; } diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqDispatcher.java b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqDispatcher.java index 4941e44..005ba57 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqDispatcher.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/RmqDispatcher.java @@ -3,6 +3,8 @@ import jframe.core.conf.Config; import jframe.core.dispatch.AbstractDispatcher; import jframe.core.msg.Msg; +import jframe.ext.msg.MsgCodec; +import jframe.ext.msg.TextMsgCodec; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -60,7 +62,7 @@ public void start() { } private void initMsgCodec() { - String clazz = getConfig().getConfig(D_RMQ_CODEC, TextMsgCodec.class.getName()); + String clazz = getConfig().getConfig(M_RMQ_CODEC, TextMsgCodec.class.getName()); try { msgCodec = (MsgCodec) Class.forName(clazz).newInstance(); } catch (Exception e) { @@ -162,7 +164,7 @@ private DefaultMQPushConsumer createConsumer(Properties props) throws MQClientEx consumer.setConsumeMessageBatchMaxSize(Integer.parseInt(val)); val = props.getProperty("consume.timeout", "5"); //minutes consumer.setConsumeTimeout(Long.parseLong(val)); - consumer.subscribe(props.getProperty("consume.topic", "jframe"), + consumer.subscribe(props.getProperty("consume.topic", RmqConst.DEFAULT_TOPIC), props.getProperty("consume.subExpression", "*")); consumer.setVipChannelEnabled(false); val = props.getProperty("consume.thread.max"); @@ -214,14 +216,14 @@ private DefaultMQPushConsumer createConsumer(Properties props) throws MQClientEx @Override public void receive(Msg msg) { if (producer != null) { - String topic = (String) msg.getMeta(D_RMQ_R_TOPIC); + String topic = (String) msg.getMeta(M_RMQ_TOPIC); if (Objects.isNull(topic)) { topic = DEFAULT_TOPIC; } try { Message rmqMsg = new Message(topic, - (String) msg.getMeta(D_RMQ_R_TAG), (String) msg.getMeta(D_RMQ_R_Key), + (String) msg.getMeta(M_RMQ_TAG), (String) msg.getMeta(M_RMQ_Key), msgCodec.encode(msg)); SendResult r = producer.send(rmqMsg); LOG.debug("send msg {}, sendResult {}", msg, r); diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/MsgCodec.java b/jframe/jframe-ext/src/main/java/jframe/ext/msg/MsgCodec.java similarity index 87% rename from jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/MsgCodec.java rename to jframe/jframe-ext/src/main/java/jframe/ext/msg/MsgCodec.java index dc586da..3343ce7 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/MsgCodec.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/msg/MsgCodec.java @@ -1,4 +1,4 @@ -package jframe.ext.dispatch.rocketmq; +package jframe.ext.msg; import jframe.core.msg.Msg; diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/TextMsgCodec.java b/jframe/jframe-ext/src/main/java/jframe/ext/msg/TextMsgCodec.java similarity index 92% rename from jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/TextMsgCodec.java rename to jframe/jframe-ext/src/main/java/jframe/ext/msg/TextMsgCodec.java index edeca6a..a45e349 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/dispatch/rocketmq/TextMsgCodec.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/msg/TextMsgCodec.java @@ -1,9 +1,10 @@ -package jframe.ext.dispatch.rocketmq; +package jframe.ext.msg; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import jframe.core.msg.Msg; import jframe.core.msg.TextMsg; +import jframe.ext.msg.MsgCodec; import java.io.IOException; diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/plugin/KafkaPlugin.java b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/KafkaPlugin.java index 86dd0d0..b1fdd22 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/plugin/KafkaPlugin.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/KafkaPlugin.java @@ -1,23 +1,22 @@ package jframe.ext.plugin; import jframe.core.msg.Msg; -import jframe.core.plugin.PluginSender; -import jframe.ext.dispatch.kafka.KafkaDispatcher; +import jframe.ext.dispatch.kafka.KafkaConst; /** * @author dzh * @version 0.0.1 * @date Dec 27, 2018 3:05:36 PM */ -public abstract class KafkaPlugin extends PluginSender { +public class KafkaPlugin extends MqPlugin { public void send(Msg msg, String topic, Integer partition, Long timestamp, String key) { if (msg == null) return; - msg.setMeta(KafkaDispatcher.D_KAFKA_R_TOPIC, topic); - msg.setMeta(KafkaDispatcher.D_KAFKA_R_PARTITION, partition); - msg.setMeta(KafkaDispatcher.D_KAFKA_R_TIMESTAMP, timestamp); - msg.setMeta(KafkaDispatcher.D_KAFKA_R_KEY, key); + msg.setMeta(KafkaConst.M_KAFKA_TOPIC, topic); + msg.setMeta(KafkaConst.M_KAFKA_PARTITION, partition); + msg.setMeta(KafkaConst.M_KAFKA_TIMESTAMP, timestamp); + msg.setMeta(KafkaConst.M_KAFKA_KEY, key); send(msg); } diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/plugin/MnsPlugin.java b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/MnsPlugin.java new file mode 100644 index 0000000..dd428d1 --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/MnsPlugin.java @@ -0,0 +1,20 @@ +package jframe.ext.plugin; + +import jframe.core.msg.Msg; +import jframe.ext.dispatch.mns.MnsConst; + +/** + * @author dzh + * @date 2022/11/17 18:06 + */ +public class MnsPlugin extends MqPlugin { + + public void send(Msg msg, String queue) { + if (msg == null) return; + + msg.setMeta(MnsConst.M_MNS_QUEUE, queue); + + send(msg); + } + +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/plugin/MqPlugin.java b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/MqPlugin.java new file mode 100644 index 0000000..3d006b7 --- /dev/null +++ b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/MqPlugin.java @@ -0,0 +1,13 @@ +package jframe.ext.plugin; + +import jframe.core.plugin.PluginSender; + +/** + * @author dzh + * @date 2022/11/18 09:48 + */ +public class MqPlugin extends PluginSender { + + + +} diff --git a/jframe/jframe-ext/src/main/java/jframe/ext/plugin/RocketmqPlugin.java b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/RocketmqPlugin.java index 610a76e..6e1e634 100644 --- a/jframe/jframe-ext/src/main/java/jframe/ext/plugin/RocketmqPlugin.java +++ b/jframe/jframe-ext/src/main/java/jframe/ext/plugin/RocketmqPlugin.java @@ -1,8 +1,7 @@ package jframe.ext.plugin; import jframe.core.msg.Msg; -import jframe.core.plugin.PluginSender; -import jframe.ext.dispatch.rocketmq.RmqDispatcher; +import jframe.ext.dispatch.rocketmq.RmqConst; /** * Rocketmq Plugin @@ -10,14 +9,14 @@ * @author dzh * @date 2019/12/25 15:22 */ -public abstract class RocketmqPlugin extends PluginSender { +public class RocketmqPlugin extends MqPlugin { public void send(Msg msg, String topic, String tag, String key) { if (msg == null) return; - msg.setMeta(RmqDispatcher.D_RMQ_R_TOPIC, topic); - msg.setMeta(RmqDispatcher.D_RMQ_R_TAG, tag); - msg.setMeta(RmqDispatcher.D_RMQ_R_Key, key); + msg.setMeta(RmqConst.M_RMQ_TOPIC, topic); + msg.setMeta(RmqConst.M_RMQ_TAG, tag); + msg.setMeta(RmqConst.M_RMQ_Key, key); send(msg); }