Skip to content

Commit

Permalink
Merge pull request #87 from DeNA/fix_slow_response
Browse files Browse the repository at this point in the history
OutputStreamがflushされてなくて処理が遅くなっていたのを修正
  • Loading branch information
funa-tk authored Apr 12, 2021
2 parents d3ebe08 + 639f095 commit 67ba904
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/main/java/core/packetproxy/Duplex.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public byte[] callOnServerChunkSendForced(byte[] data) throws Exception {
public void callOnClientChunkFlowControl(byte[] data) throws Exception {
if (isEnabledDuplexEventListener() == false) {
clientOutputForFlowControl.write(data);
clientOutputForFlowControl.flush();
}
for (DuplexEventListener listener: duplexEventListenerList.getListeners(DuplexEventListener.class)) {
listener.onClientChunkFlowControl(data);
Expand All @@ -227,6 +228,7 @@ public InputStream getClientChunkFlowControlSink() throws Exception {
public void callOnServerChunkFlowControl(byte[] data) throws Exception {
if (isEnabledDuplexEventListener() == false) {
serverOutputForFlowControl.write(data);
serverOutputForFlowControl.flush();
}
for (DuplexEventListener listener: duplexEventListenerList.getListeners(DuplexEventListener.class)) {
listener.onServerChunkFlowControl(data);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/core/packetproxy/DuplexAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void run() {
int inputLen = 0;
while ((inputLen = getClientChunkFlowControlSink().read(inputBuf)) > 0) {
client_output.write(inputBuf, 0, inputLen);
client_output.flush();
}
flow_controlled_client_input.close();
client_output.close();
Expand All @@ -143,6 +144,7 @@ public void run() {
int inputLen = 0;
while ((inputLen = getServerChunkFlowControlSink().read(inputBuf)) > 0) {
server_output.write(inputBuf, 0, inputLen);
server_output.flush();
}
flow_controlled_server_input.close();
server_output.close();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/core/packetproxy/encode/Encoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ public void setGroupId(Packet packet) throws Exception {
*/
public void putToClientFlowControlledQueue(byte[] output_data) throws Exception {
clientOutputForFlowControl.write(output_data);
clientOutputForFlowControl.flush();
}
public void putToServerFlowControlledQueue(byte[] output_data) throws Exception {
serverOutputForFlowControl.write(output_data);
serverOutputForFlowControl.flush();
}
public InputStream getClientFlowControlledInputStream() {
return clientInputForFlowControl;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/core/packetproxy/http/Https.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public static SSLSocket[] createBothSideSSLSockets(Socket clientSocket, InputStr
OutputStream proxyOut = serverSocket.getOutputStream();
InputStream proxyIn = serverSocket.getInputStream();
proxyOut.write(String.format("CONNECT %s:%d HTTP/1.1\r\nHost: %s\r\n\r\n", serverAddr.getHostString(), serverAddr.getPort(), serverAddr.getHostString()).getBytes());
proxyOut.flush();
int length = 0;
byte[] input_data = new byte[1024];
while ((length = proxyIn.read(input_data, 0, input_data.length)) != -1) {
Expand Down Expand Up @@ -138,6 +139,7 @@ public static SSLSocket[] createBothSideSSLSockets(Socket clientSocket, InputStr
OutputStream proxyOut = serverSocket.getOutputStream();
InputStream proxyIn = serverSocket.getInputStream();
proxyOut.write(String.format("CONNECT %s:%d HTTP/1.1\r\nHost: %s\r\n\r\n", serverAddr.getHostString(), serverAddr.getPort(), serverAddr.getHostString()).getBytes());
proxyOut.flush();
int length = 0;
byte[] input_data = new byte[1024];
while ((length = proxyIn.read(input_data, 0, input_data.length)) != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public HttpsProxySocketEndpoint(SSLSocket proxySocket, InetSocketAddress serverA
serverAddr.getHostString(),
serverAddr.getPort(),
serverAddr.getHostString()).getBytes());
proxyOut.flush();

proxyIn = socket.getInputStream();

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/core/packetproxy/http2/FlowControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void appendWindowSize(int appendWindowSize) {
public void enqueue(Frame frame) throws Exception {
synchronized (queue) {
queue.write(frame.getPayload());
queue.flush();
if ((frame.getFlags() & DataFrame.FLAG_END_STREAM) > 0) {
end_flag = true;
}
Expand All @@ -69,6 +70,7 @@ public Stream dequeue(int connectionWindowSize) throws Exception {
byte[] remaining = ArrayUtils.subarray(queue.toByteArray(), dataLen, queue.size());
queue.reset();
queue.write(remaining);
queue.flush();

Stream stream = new Stream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private void writeData(FlowControl flow) throws Exception {
if (stream != null) {
connectionWindowSize -= stream.payloadSize();
outputForFlowControl.write(stream.toByteArray());
outputForFlowControl.flush();
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/core/packetproxy/http2/FrameManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private void analyzeFrame(Frame frame) throws Exception {
flag_receive_peer_settings = true;
if (flag_send_end_settings == false && flag_send_settings == true) {
flowControlManager.getOutputStream().write(FrameUtils.END_SETTINGS);
flowControlManager.getOutputStream().flush();
flag_send_end_settings = true;
}
}
Expand Down Expand Up @@ -129,21 +130,25 @@ public List<Frame> readHeadersDataFrames() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
public void putToFlowControlledQueue(byte[] frameData) throws Exception {
baos.write(frameData);
baos.flush();
int length = 0;
while ((length = FrameUtils.checkDelimiter(baos.toByteArray())) > 0) {
byte[] frame = ArrayUtils.subarray(baos.toByteArray(), 0, length);
byte[] remaining = ArrayUtils.subarray(baos.toByteArray(), length, baos.size());
baos.reset();
baos.write(remaining);
baos.flush();
if (FrameUtils.isPreface(frame)) {
flowControlManager.getOutputStream().write(frame);
flowControlManager.getOutputStream().flush();
} else {
Frame f = new Frame(frame);
flowControlManager.write(f);
if (f.getType() == Frame.Type.SETTINGS) {
flag_send_settings = true;
if (flag_send_end_settings == false && flag_receive_peer_settings == true) {
flowControlManager.getOutputStream().write(FrameUtils.END_SETTINGS);
flowControlManager.getOutputStream().flush();
flag_send_end_settings = true;
}
}
Expand Down

0 comments on commit 67ba904

Please sign in to comment.