Skip to content

Commit

Permalink
修复一些idea给出的建议问题
Browse files Browse the repository at this point in the history
  • Loading branch information
codingmiao committed Nov 12, 2024
1 parent 65a22ef commit 6bd1629
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static boolean receiveServerBytes(CommonConfig config, byte[] responseBod
if (null != sessionIdCallBack) {
sessionIdCallBack.cb(sessionId);
} else {
log.warn("没有对应的SessionIdCallBack {}", sessionIdCallBack);
log.warn("没有对应的SessionIdCallBack {}", sessionId);
}
}
case Constant.ScCommands.CloseSession -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@

/**
* 可发送的SessionBytes,被发送后会触发回调函数并告知是否成功
*
* @author liuyu
* @date 2024/10/20
*/
public class SendAbleSessionBytes {
public interface CallBack{
public record SendAbleSessionBytes(SessionBytes sessionBytes,
org.wowtools.hppt.common.pojo.SendAbleSessionBytes.CallBack callBack) {
public interface CallBack {
void cb(boolean success);
}

public final SessionBytes sessionBytes;

public final CallBack callBack;

public SendAbleSessionBytes(SessionBytes sessionBytes, CallBack callBack) {
this.sessionBytes = sessionBytes;
this.callBack = callBack;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private static List<SendAbleSessionBytes> merge(List<SendAbleSessionBytes> bytes
SessionBytes sessionBytes = new SessionBytes(sessionId, BytesUtil.merge(mergeCell.bytesList));
SendAbleSessionBytes.CallBack callBack;
if (mergeCell.callBacks.size() == 1) {
callBack = mergeCell.callBacks.get(0);
callBack = mergeCell.callBacks.getFirst();
} else {
callBack = (success) -> {
for (SendAbleSessionBytes.CallBack callBack1 : mergeCell.callBacks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,14 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
disposeServerSession(session, "channelInactive");
}

private static final class CallBack implements SendAbleSessionBytes.CallBack {
private final CompletableFuture<Boolean> future;

public CallBack(CompletableFuture<Boolean> future) {
this.future = future;
}
private record CallBack(CompletableFuture<Boolean> future) implements SendAbleSessionBytes.CallBack {

@Override
public void cb(boolean success) {
//锁住当前线程直至字节发送成功,避免缓冲区积压过多数据或后发先至问题
future.complete(success);
}
}
public void cb(boolean success) {
//锁住当前线程直至字节发送成功,避免缓冲区积压过多数据或后发先至问题
future.complete(success);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,21 @@ public static void replyToClient(CommonConfig config, ServerSessionManager serve
}

//处理SendAbleSessionBytes的回调
private static final class SendAbleSessionBytesResult {
private final boolean success;
private final SendAbleSessionBytes.CallBack callBack;

public SendAbleSessionBytesResult(boolean success, SendAbleSessionBytes.CallBack callBack) {
this.success = success;
this.callBack = callBack;
}
private record SendAbleSessionBytesResult(boolean success, SendAbleSessionBytes.CallBack callBack) {
}

@Slf4j
private static final class CbRunnable implements Runnable {
private final SendAbleSessionBytesResult sendAbleSessionBytesResult;

public CbRunnable(SendAbleSessionBytesResult sendAbleSessionBytesResult) {
this.sendAbleSessionBytesResult = sendAbleSessionBytesResult;
}
private record CbRunnable(SendAbleSessionBytesResult sendAbleSessionBytesResult) implements Runnable {

@Override
public void run() {
try {
sendAbleSessionBytesResult.callBack.cb(sendAbleSessionBytesResult.success);
} catch (Exception e) {
log.warn("CbRunnable err", e);
public void run() {
try {
sendAbleSessionBytesResult.callBack.cb(sendAbleSessionBytesResult.success);
} catch (Exception e) {
log.warn("CbRunnable err", e);
}
}
}
}

private static final BufferPool<SendAbleSessionBytesResult> sendAbleSessionBytesResultQueue
= new BufferPool<>("ServerTalker.sendAbleSessionBytesResultQueue");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* @author liuyu
* @date 2024/3/20
*/
class PostCtx {
public class PostCtx {
final String cookie;
final BufferPool<byte[]> sendQueue = new BufferPool<>(">PostCtx-sendQueue");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
import org.wowtools.hppt.common.util.BytesUtil;
import org.wowtools.hppt.common.util.NettyObjectBuilder;
import org.wowtools.hppt.run.ss.common.ServerSessionService;
Expand All @@ -15,6 +16,7 @@
* @author liuyu
* @date 2024/4/15
*/
@Slf4j
public class RHpptServerSessionService extends ServerSessionService<ChannelHandlerContext> {

private EventLoopGroup group;
Expand Down Expand Up @@ -70,7 +72,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E

@Override
protected void sendBytesToClient(ChannelHandlerContext ctx, byte[] bytes) {
BytesUtil.writeToChannelHandlerContext(ctx, bytes);
Throwable e = BytesUtil.writeToChannelHandlerContext(ctx, bytes);
if (null != e) {
log.warn("sendBytesToClient err", e);
ctx.close();
}
}

@Override
Expand Down

0 comments on commit 6bd1629

Please sign in to comment.