From 9eee7f51e8bff56a20b84ed6c8f2b97e8744be92 Mon Sep 17 00:00:00 2001 From: MORI Shingo Date: Mon, 12 Apr 2021 17:38:39 +0900 Subject: [PATCH 1/2] =?UTF-8?q?Forwarder=E3=81=8C=E9=81=85=E3=81=84?= =?UTF-8?q?=E5=95=8F=E9=A1=8C=E3=82=92=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/core/packetproxy/encode/Encoder.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/core/packetproxy/encode/Encoder.java b/src/main/java/core/packetproxy/encode/Encoder.java index 0b566a3..93e950b 100644 --- a/src/main/java/core/packetproxy/encode/Encoder.java +++ b/src/main/java/core/packetproxy/encode/Encoder.java @@ -167,9 +167,11 @@ public void setGroupId(Packet packet) throws Exception { */ public void putToClientFlowControlledQueue(byte[] output_data) throws Exception { clientOutputForFlowControl.write(output_data); + clientOutputForFlowControl.flush(); } public void putToServerFlowControlledQueue(byte[] output_data) throws Exception { serverOutputForFlowControl.write(output_data); + serverOutputForFlowControl.flush(); } public InputStream getClientFlowControlledInputStream() { return clientInputForFlowControl; From 639f0951d38537a0ec8a98a4d29b9b737b85ce10 Mon Sep 17 00:00:00 2001 From: MORI Shingo Date: Mon, 12 Apr 2021 18:29:47 +0900 Subject: [PATCH 2/2] =?UTF-8?q?OutputStream=E3=81=8Cflush=E3=81=95?= =?UTF-8?q?=E3=82=8C=E3=81=A6=E3=81=AA=E3=81=84=E7=AE=87=E6=89=80=E3=82=92?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/core/packetproxy/Duplex.java | 2 ++ src/main/java/core/packetproxy/DuplexAsync.java | 2 ++ src/main/java/core/packetproxy/http/Https.java | 2 ++ .../java/core/packetproxy/http/HttpsProxySocketEndpoint.java | 1 + src/main/java/core/packetproxy/http2/FlowControl.java | 2 ++ src/main/java/core/packetproxy/http2/FlowControlManager.java | 1 + src/main/java/core/packetproxy/http2/FrameManager.java | 5 +++++ 7 files changed, 15 insertions(+) diff --git a/src/main/java/core/packetproxy/Duplex.java b/src/main/java/core/packetproxy/Duplex.java index 7f06186..991c587 100644 --- a/src/main/java/core/packetproxy/Duplex.java +++ b/src/main/java/core/packetproxy/Duplex.java @@ -210,6 +210,7 @@ public byte[] callOnServerChunkSendForced(byte[] data) throws Exception { public void callOnClientChunkFlowControl(byte[] data) throws Exception { if (isEnabledDuplexEventListener() == false) { clientOutputForFlowControl.write(data); + clientOutputForFlowControl.flush(); } for (DuplexEventListener listener: duplexEventListenerList.getListeners(DuplexEventListener.class)) { listener.onClientChunkFlowControl(data); @@ -227,6 +228,7 @@ public InputStream getClientChunkFlowControlSink() throws Exception { public void callOnServerChunkFlowControl(byte[] data) throws Exception { if (isEnabledDuplexEventListener() == false) { serverOutputForFlowControl.write(data); + serverOutputForFlowControl.flush(); } for (DuplexEventListener listener: duplexEventListenerList.getListeners(DuplexEventListener.class)) { listener.onServerChunkFlowControl(data); diff --git a/src/main/java/core/packetproxy/DuplexAsync.java b/src/main/java/core/packetproxy/DuplexAsync.java index bc3e770..b5d7b9a 100644 --- a/src/main/java/core/packetproxy/DuplexAsync.java +++ b/src/main/java/core/packetproxy/DuplexAsync.java @@ -121,6 +121,7 @@ public void run() { int inputLen = 0; while ((inputLen = getClientChunkFlowControlSink().read(inputBuf)) > 0) { client_output.write(inputBuf, 0, inputLen); + client_output.flush(); } flow_controlled_client_input.close(); client_output.close(); @@ -143,6 +144,7 @@ public void run() { int inputLen = 0; while ((inputLen = getServerChunkFlowControlSink().read(inputBuf)) > 0) { server_output.write(inputBuf, 0, inputLen); + server_output.flush(); } flow_controlled_server_input.close(); server_output.close(); diff --git a/src/main/java/core/packetproxy/http/Https.java b/src/main/java/core/packetproxy/http/Https.java index fc1bd06..a9e12d9 100644 --- a/src/main/java/core/packetproxy/http/Https.java +++ b/src/main/java/core/packetproxy/http/Https.java @@ -95,6 +95,7 @@ public static SSLSocket[] createBothSideSSLSockets(Socket clientSocket, InputStr OutputStream proxyOut = serverSocket.getOutputStream(); InputStream proxyIn = serverSocket.getInputStream(); proxyOut.write(String.format("CONNECT %s:%d HTTP/1.1\r\nHost: %s\r\n\r\n", serverAddr.getHostString(), serverAddr.getPort(), serverAddr.getHostString()).getBytes()); + proxyOut.flush(); int length = 0; byte[] input_data = new byte[1024]; while ((length = proxyIn.read(input_data, 0, input_data.length)) != -1) { @@ -138,6 +139,7 @@ public static SSLSocket[] createBothSideSSLSockets(Socket clientSocket, InputStr OutputStream proxyOut = serverSocket.getOutputStream(); InputStream proxyIn = serverSocket.getInputStream(); proxyOut.write(String.format("CONNECT %s:%d HTTP/1.1\r\nHost: %s\r\n\r\n", serverAddr.getHostString(), serverAddr.getPort(), serverAddr.getHostString()).getBytes()); + proxyOut.flush(); int length = 0; byte[] input_data = new byte[1024]; while ((length = proxyIn.read(input_data, 0, input_data.length)) != -1) { diff --git a/src/main/java/core/packetproxy/http/HttpsProxySocketEndpoint.java b/src/main/java/core/packetproxy/http/HttpsProxySocketEndpoint.java index 9c30a9b..24ac61f 100644 --- a/src/main/java/core/packetproxy/http/HttpsProxySocketEndpoint.java +++ b/src/main/java/core/packetproxy/http/HttpsProxySocketEndpoint.java @@ -37,6 +37,7 @@ public HttpsProxySocketEndpoint(SSLSocket proxySocket, InetSocketAddress serverA serverAddr.getHostString(), serverAddr.getPort(), serverAddr.getHostString()).getBytes()); + proxyOut.flush(); proxyIn = socket.getInputStream(); diff --git a/src/main/java/core/packetproxy/http2/FlowControl.java b/src/main/java/core/packetproxy/http2/FlowControl.java index ce4a611..b46b8c0 100644 --- a/src/main/java/core/packetproxy/http2/FlowControl.java +++ b/src/main/java/core/packetproxy/http2/FlowControl.java @@ -46,6 +46,7 @@ public void appendWindowSize(int appendWindowSize) { public void enqueue(Frame frame) throws Exception { synchronized (queue) { queue.write(frame.getPayload()); + queue.flush(); if ((frame.getFlags() & DataFrame.FLAG_END_STREAM) > 0) { end_flag = true; } @@ -69,6 +70,7 @@ public Stream dequeue(int connectionWindowSize) throws Exception { byte[] remaining = ArrayUtils.subarray(queue.toByteArray(), dataLen, queue.size()); queue.reset(); queue.write(remaining); + queue.flush(); Stream stream = new Stream(); diff --git a/src/main/java/core/packetproxy/http2/FlowControlManager.java b/src/main/java/core/packetproxy/http2/FlowControlManager.java index 23d1aa9..931368e 100644 --- a/src/main/java/core/packetproxy/http2/FlowControlManager.java +++ b/src/main/java/core/packetproxy/http2/FlowControlManager.java @@ -57,6 +57,7 @@ private void writeData(FlowControl flow) throws Exception { if (stream != null) { connectionWindowSize -= stream.payloadSize(); outputForFlowControl.write(stream.toByteArray()); + outputForFlowControl.flush(); } } diff --git a/src/main/java/core/packetproxy/http2/FrameManager.java b/src/main/java/core/packetproxy/http2/FrameManager.java index e9afb53..3ed367e 100644 --- a/src/main/java/core/packetproxy/http2/FrameManager.java +++ b/src/main/java/core/packetproxy/http2/FrameManager.java @@ -84,6 +84,7 @@ private void analyzeFrame(Frame frame) throws Exception { flag_receive_peer_settings = true; if (flag_send_end_settings == false && flag_send_settings == true) { flowControlManager.getOutputStream().write(FrameUtils.END_SETTINGS); + flowControlManager.getOutputStream().flush(); flag_send_end_settings = true; } } @@ -129,14 +130,17 @@ public List readHeadersDataFrames() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); public void putToFlowControlledQueue(byte[] frameData) throws Exception { baos.write(frameData); + baos.flush(); int length = 0; while ((length = FrameUtils.checkDelimiter(baos.toByteArray())) > 0) { byte[] frame = ArrayUtils.subarray(baos.toByteArray(), 0, length); byte[] remaining = ArrayUtils.subarray(baos.toByteArray(), length, baos.size()); baos.reset(); baos.write(remaining); + baos.flush(); if (FrameUtils.isPreface(frame)) { flowControlManager.getOutputStream().write(frame); + flowControlManager.getOutputStream().flush(); } else { Frame f = new Frame(frame); flowControlManager.write(f); @@ -144,6 +148,7 @@ public void putToFlowControlledQueue(byte[] frameData) throws Exception { flag_send_settings = true; if (flag_send_end_settings == false && flag_receive_peer_settings == true) { flowControlManager.getOutputStream().write(FrameUtils.END_SETTINGS); + flowControlManager.getOutputStream().flush(); flag_send_end_settings = true; } }