Skip to content

Commit

Permalink
add MnsDispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
dzh committed Nov 19, 2022
1 parent 7f0d98a commit cdd77b4
Show file tree
Hide file tree
Showing 21 changed files with 416 additions and 133 deletions.
4 changes: 2 additions & 2 deletions jframe/jframe-core/src/main/java/jframe/core/FrameEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ public class FrameEvent extends EventObject {
*/
private static final long serialVersionUID = 1L;

private int type;
private final int type;

public static final int Init = 1 << 0;
public static final int Init = 1;
public static final int Start = 1 << 1;
public static final int Stop = 1 << 2;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
*
*
*/
package jframe.core.dispatch;

Expand All @@ -10,68 +10,68 @@

/**
* dispatch管理管理工厂
*
*
* @author dzh
* @date Jun 20, 2013 9:47:12 AM
*/
public class DefDispatchFactory implements DispatchFactory {

private final Object _lock = new Object();
private final Object _lock = new Object();

private List<Dispatcher> _dList = new ArrayList<Dispatcher>(2);
private List<Dispatcher> _dList = new ArrayList<Dispatcher>(2);

private DefDispatchFactory() {
private DefDispatchFactory() {

}
}

public static final DispatchFactory newInstance() {
return new DefDispatchFactory();
}
public static DispatchFactory newInstance() {
return new DefDispatchFactory();
}

public Dispatcher findDispatcher(String dispatcherID) {
List<Dispatcher> list = _dList;
Dispatcher dl = null;
synchronized (_lock) {
for (Dispatcher d : list) {
if (d.getID().equals(dispatcherID)) {
dl = d;
break;
}
}
}
return dl;
}
public Dispatcher findDispatcher(String dispatcherID) {
List<Dispatcher> list = _dList;
Dispatcher dl = null;
synchronized (_lock) {
for (Dispatcher d : list) {
if (d.getID().equals(dispatcherID)) {
dl = d;
break;
}
}
}
return dl;
}

public Dispatcher createDispatcher(String dispatcherID, Config config) {
Dispatcher d = DefDispatcher.newDispatcher(dispatcherID, config);
d.start();
synchronized (_lock) {
_dList.add(d);
}
return d;
}
public Dispatcher createDispatcher(String dispatcherID, Config config) {
Dispatcher d = DefDispatcher.newDispatcher(dispatcherID, config);
d.start();
synchronized (_lock) {
_dList.add(d);
}
return d;
}

/**
* if dispatcherID==Null, close all delegates
*/
public void removeDispatcher(String dispatcherID) {
List<Dispatcher> list = _dList;
if (dispatcherID == null) {
synchronized (_lock) {
for (Dispatcher d : list) {
d.close();
}
}
list.clear();
} else {
synchronized (_lock) {
Dispatcher d = findDispatcher(dispatcherID);
if (d != null) {
d.close();
list.remove(d);
}
}
}
}
/**
* if dispatcherID==Null, close all delegates
*/
public void removeDispatcher(String dispatcherID) {
List<Dispatcher> list = _dList;
if (dispatcherID == null) {
synchronized (_lock) {
for (Dispatcher d : list) {
d.close();
}
}
list.clear();
} else {
synchronized (_lock) {
Dispatcher d = findDispatcher(dispatcherID);
if (d != null) {
d.close();
list.remove(d);
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ public void notifyPluginEvent(PluginEvent event) {
}

/**
*
* (non-Javadoc)
*
* @return if find PluginRef return it, or return null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
*
*
*/
package jframe.core.plugin.loader.ext;

Expand Down Expand Up @@ -52,9 +52,8 @@ protected void injectImportService(Field f) {

/**
* register export-service
*
*
* @param pc
* @param p
*/
public void loadService(PluginCase pc) {
ServiceContext sc = plc.getServiceContext();
Expand Down
28 changes: 13 additions & 15 deletions jframe/jframe-core/src/main/java/jframe/core/unit/PluginUnit.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
/**
*
*
*/
package jframe.core.unit;

import java.io.File;
import java.io.FilenameFilter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jframe.core.Frame;
import jframe.core.conf.Config;
import jframe.core.plugin.DefPluginContext;
Expand All @@ -27,13 +17,22 @@
import jframe.core.plugin.loader.ext.PluginServiceCreator;
import jframe.core.signal.Signal;
import jframe.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FilenameFilter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* <p>
* Feature:
* <li>load plug-in</li>
* </p>
*
*
* @author dzh
* @date Sep 23, 2013 2:47:41 PM
* @since 1.0
Expand Down Expand Up @@ -75,7 +74,7 @@ public void start() throws UnitException {
}

/**
*
*
*/
private void cleanCache() {
Config config = getFrame().getConfig();
Expand All @@ -89,8 +88,7 @@ private void cleanCache() {
}

/**
*
* @param path_plugin
*
*/
private List<Plugin> loadPlugin() {
// create plugin
Expand Down
18 changes: 12 additions & 6 deletions jframe/jframe-ext/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,46 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
<version>2.10</version>
</dependency>

<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.10.0</version>
<version>5.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
<version>5.15.7</version>
<version>5.17.2</version>
</dependency>

<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
<version>3.3.1</version>
</dependency>

<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
<version>5.0.0</version>
</dependency>

<!-- rocketmq aliyun-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
<version>2.0.4.Final</version>
</dependency>
<!-- aliyun mns-->
<dependency>
<groupId>com.aliyun.mns</groupId>
<artifactId>aliyun-sdk-mns</artifactId>
<version>1.1.9.1</version>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jframe.ext.dispatch.kafka;

/**
* @author dzh
* @date 2022/11/18 11:13
*/
public interface KafkaConst {

// default
String DEFAULT_TOPIC = "jframe";

// config
String FILE_KAFKA_PRODUCER = "file.kafka.producer";
String FILE_KAFKA_CONSUMER = "file.kafka.consumer";
String D_KAFKA_SUBSCRIBE = "d.kafka.subscribe";
String D_KAFKA_SUBSCRIBE_REGEX = "d.kafka.subscribe.regex";

// msg meta
String M_KAFKA_TOPIC = "m.kafka.topic";
String M_KAFKA_KEY = "m.kafka.key";
String M_KAFKA_PARTITION = "m.kafka.partition";
String M_KAFKA_TIMESTAMP = "m.kafka.timestamp";

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

/**
Expand All @@ -42,25 +41,10 @@
* @version 0.0.1
* @date Dec 26, 2018 5:16:26 PM
*/
public class KafkaDispatcher extends AbstractDispatcher {
public class KafkaDispatcher extends AbstractDispatcher implements KafkaConst {

static Logger LOG = LoggerFactory.getLogger(KafkaDispatcher.class);

// default
public static final String DEFAULT_TOPIC = "jframe";

// config
public static final String FILE_KAFKA_PRODUCER = "file.kafka.producer";
public static final String FILE_KAFKA_CONSUMER = "file.kafka.consumer";
public static final String D_KAFKA_SUBSCRIBE = "d.kafka.subscribe";
public static final String D_KAFKA_SUBSCRIBE_REGEX = "d.kafka.subscribe.regex";

// msg meta
public static final String D_KAFKA_R_TOPIC = "d.kafka.r.topic";
public static final String D_KAFKA_R_KEY = "d.kafka.r.key";
public static final String D_KAFKA_R_PARTITION = "d.kafka.r.partition";
public static final String D_KAFKA_R_TIMESTAMP = "d.kafka.r.timestamp";

private Producer<String, Msg<?>> producer;
private Consumer<String, Msg<?>> consumer;

Expand Down Expand Up @@ -214,28 +198,28 @@ protected void loadDefaultConsumer(Properties props) {
@Override
public void receive(Msg<?> msg) {
if (producer != null) {
String topic = (String) msg.getMeta(D_KAFKA_R_TOPIC);
String topic = (String) msg.getMeta(M_KAFKA_TOPIC);
if (Objects.isNull(topic)) {
topic = DEFAULT_TOPIC;
}
Integer partition = partition(msg);
Long timestamp = timestamp(msg);
String key = (String) msg.getMeta(D_KAFKA_R_KEY);
String key = (String) msg.getMeta(M_KAFKA_KEY);
ProducerRecord<String, Msg<?>> record = new ProducerRecord<>(topic, partition, timestamp, key, msg, null);
producer.send(record);
}
}

private Long timestamp(Msg<?> msg) {
Object ts = msg.getMeta(D_KAFKA_R_TIMESTAMP);
Object ts = msg.getMeta(M_KAFKA_TIMESTAMP);
if (ts == null) return null;
if (ts instanceof Long) return (Long) ts;
if (ts instanceof String) return Long.parseLong((String) ts);
return null;
}

private Integer partition(Msg<?> msg) {
Object p = msg.getMeta(D_KAFKA_R_PARTITION);
Object p = msg.getMeta(M_KAFKA_PARTITION);
if (p == null) return null;
if (p instanceof Integer) return (Integer) p;
if (p instanceof String) return Integer.parseInt((String) p);
Expand All @@ -254,18 +238,20 @@ public Consumer<String, Msg<?>> getConsumer() {
public void close() {
if (closed) return;
// close producer
if (enableProducer()) producer.close(WAIT_CLOSED_SECOND, TimeUnit.SECONDS);
if (enableProducer())
// producer.close(WAIT_CLOSED_SECOND, TimeUnit.SECONDS);
producer.close(Duration.ofSeconds(WAIT_CLOSED_SECOND));

// close dispatcher and consumer
if (enableConsumer()) {
closed = true;
if (dispatchT != null) {
try {
dispatchT.join(WAIT_CLOSED_SECOND * 1000L);
} catch (InterruptedException e) {
}
}
}
closed = true;
super.close();
}

Expand Down
Loading

0 comments on commit cdd77b4

Please sign in to comment.