Skip to content

Commit

Permalink
Netty改造
Browse files Browse the repository at this point in the history
  • Loading branch information
hupengfei123 committed Jun 19, 2019
1 parent 1fa7e32 commit e4ea9ed
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.example.rpcclient.client;

import Domain.Response;

/**
* @program: springBootPractice
* @description:
* @author: hu_pf
* @create: 2019-06-18 18:39
**/
public class MessageFuture {

private volatile boolean success = false;
private Response response;
private final Object object = new Object();

public Response getMessage(){
synchronized (object){
while (!success){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}

public void setMessage(Response response){
synchronized (object){
this.response = response;
this.success = true;
object.notify();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.example.rpcclient.client;

import Domain.Response;
import com.google.gson.Gson;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
* @program: springBootPractice
* @description:客户端连接
* @author: hu_pf
* @create: 2019-06-18 18:31
**/
public class NettyClientConnect {

private final Map<Long,MessageFuture> futureMap = new ConcurrentHashMap<>();
private CountDownLatch countDownLatch = new CountDownLatch(1);

public void connect(String requestJson,Long threadId){
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().
addLast(new StringDecoder()).
addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Gson gson = new Gson();
Response response = gson.fromJson(msg, Response.class);
MessageFuture messageFuture = futureMap.get(threadId);
messageFuture.setMessage(response);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
futureMap.put(threadId,new MessageFuture());
countDownLatch.countDown();
ByteBuf encoded = ctx.alloc().buffer(4 * requestJson.length());
encoded.writeBytes(requestJson.getBytes());
ctx.writeAndFlush(encoded);
}
});
}
}).connect("127.0.0.1", 20006);
}

public Response getResponse(Long threadId){
MessageFuture messageFuture = null;
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
messageFuture = futureMap.get(threadId);
return messageFuture.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
import Domain.Response;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

Expand All @@ -30,28 +37,32 @@ public class RpcDynamicPro implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String requestJson = objectToJson(method, args);
Socket client = new Socket("127.0.0.1", 20006);
client.setSoTimeout(10000);
//获取Socket的输出流,用来发送数据到服务端
PrintStream out = new PrintStream(client.getOutputStream());
//获取Socket的输入流,用来接收从服务端发送过来的数据
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
//发送数据到服务端
out.println(requestJson);
Response response = new Response();
Gson gson = new Gson();
try {
//从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常
String responsJson = buf.readLine();
response = gson.fromJson(responsJson, Response.class);
} catch (SocketTimeoutException e) {
log.info("Time out, No response");
}
if (client != null) {
//如果构造函数建立起了连接,则关闭套接字,如果没有建立起连接,自然不用关闭
client.close(); //只关闭socket,其关联的输入输出流也会被关闭
}
return response.getResult();
String returnMsg = "";
// Socket client = new Socket("127.0.0.1", 20006);
// client.setSoTimeout(10000);
// //获取Socket的输出流,用来发送数据到服务端
// PrintStream out = new PrintStream(client.getOutputStream());
// //获取Socket的输入流,用来接收从服务端发送过来的数据
// BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
// //发送数据到服务端
// out.println(requestJson);
// Response response = new Response();
// Gson gson = new Gson();
// try {
// //从服务器端接收数据有个时间限制(系统自设,也可以自己设置),超过了这个时间,便会抛出该异常
// String responsJson = buf.readLine();
// response = gson.fromJson(responsJson, Response.class);
// } catch (SocketTimeoutException e) {
// log.info("Time out, No response");
// }
// if (client != null) {
// //如果构造函数建立起了连接,则关闭套接字,如果没有建立起连接,自然不用关闭
// client.close(); //只关闭socket,其关联的输入输出流也会被关闭
// }
Long threadId = Thread.currentThread().getId();
NettyClientConnect nettyClientConnect = new NettyClientConnect();
nettyClientConnect.connect(requestJson,threadId);
return nettyClientConnect.getResponse(threadId).getResult();
}

public String objectToJson(Method method, Object[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package com.example.rpcserver.configuration;

import com.example.rpcserver.rpcHandle.CommonDeal;
import com.example.rpcserver.rpcHandle.ServerThread;
import com.example.rpcserver.service.SendMessageImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import lombok.extern.log4j.Log4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
Expand Down Expand Up @@ -49,18 +60,40 @@ private String getClassName(String beanClassName) {
return className;
}

public void startPort() throws IOException {
//服务端在20006端口监听客户端请求的TCP连接
ServerSocket server = new ServerSocket(20006);
Socket client = null;
boolean f = true;
while (f) {
//等待客户端的连接,如果没有获取连接
client = server.accept();
System.out.println("与客户端连接成功!");
//为每个客户端连接开启一个线程
new Thread(new ServerThread(client)).start();
}
server.close();
public void startPort() throws IOException{
// //服务端在20006端口监听客户端请求的TCP连接
// ServerSocket server = new ServerSocket(20006);
// Socket client = null;
// boolean f = true;
// while (f) {
// //等待客户端的连接,如果没有获取连接
// client = server.accept();
// System.out.println("与客户端连接成功!");
// //为每个客户端连接开启一个线程
// new Thread(new ServerThread(client)).start();
// }
// server.close();

ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获得实现类处理过后的返回值
String invokeMethodMes = CommonDeal.getInvokeMethodMes(msg);
ByteBuf encoded = ctx.alloc().buffer(4 * invokeMethodMes.length());
encoded.writeBytes(invokeMethodMes.getBytes());
ctx.writeAndFlush(encoded);
}
});
}
}).bind(20006);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.example.rpcserver.rpcHandle;

import Domain.ClassTypeAdapterFactory;
import Domain.Request;
import Domain.Response;
import com.example.rpcserver.configuration.InitRpcConfig;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.log4j.Log4j;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* @program: springBootPractice
* @description:
* @author: hu_pf
* @create: 2019-06-17 17:40
**/
@Log4j
public class CommonDeal {

public static String getInvokeMethodMes(String str){
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
Gson gson = gsonBuilder.create();
Request request = gson.fromJson(str, Request.class);
return gson.toJson(invokeMethod(request));
}

private static Response invokeMethod(Request request) {
String className = request.getClassName();
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
Class<?>[] parameTypes = request.getParameTypes();
Object o = InitRpcConfig.rpcServiceMap.get(className);
Response response = new Response();
try {
Method method = o.getClass().getDeclaredMethod(methodName, parameTypes);
Object invokeMethod = method.invoke(o, parameters);
response.setResult(invokeMethod);
} catch (NoSuchMethodException e) {
log.info("没有找到" + methodName);
} catch (IllegalAccessException e) {
log.info("执行错误" + parameters);
} catch (InvocationTargetException e) {
log.info("执行错误" + parameters);
}
return response;
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
package com.example.rpcserver.rpcHandle;

import Domain.ClassTypeAdapterFactory;
import Domain.Request;
import Domain.Response;
import com.example.rpcserver.configuration.InitRpcConfig;
import com.example.rpcserver.service.SendMessageImpl;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.log4j.Log4j;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

@Log4j
Expand All @@ -28,9 +19,7 @@ public ServerThread(Socket client) {
@Override
public void run() {
try {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new ClassTypeAdapterFactory());
Gson gson = gsonBuilder.create();

//获取Socket的输出流,用来向客户端发送数据
PrintStream out = new PrintStream(client.getOutputStream());
//获取Socket的输入流,用来接收从客户端发送过来的数据
Expand All @@ -39,13 +28,10 @@ public void run() {
while (flag) {
//接收从客户端发送过来的数据
String str = buf.readLine();
Request request = gson.fromJson(str, Request.class);
if (str == null || "".equals(str)) {
flag = false;
} else {
Response response = invokeMethod(request);
String res = gson.toJson(response);
out.println(res);
out.println(CommonDeal.getInvokeMethodMes(str));
}
}
out.close();
Expand All @@ -55,24 +41,5 @@ public void run() {
}
}

public Response invokeMethod(Request request) {
String className = request.getClassName();
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
Class<?>[] parameTypes = request.getParameTypes();
Object o = InitRpcConfig.rpcServiceMap.get(className);
Response response = new Response();
try {
Method method = o.getClass().getDeclaredMethod(methodName, parameTypes);
Object invokeMethod = method.invoke(o, parameters);
response.setResult(invokeMethod);
} catch (NoSuchMethodException e) {
log.info("没有找到" + methodName);
} catch (IllegalAccessException e) {
log.info("执行错误" + parameters);
} catch (InvocationTargetException e) {
log.info("执行错误" + parameters);
}
return response;
}

}

0 comments on commit e4ea9ed

Please sign in to comment.