From 40188aba470e93af6de2b940d6b6fba8ceffff8e Mon Sep 17 00:00:00 2001 From: funa-tk <1781263+funa-tk@users.noreply.github.com> Date: Tue, 4 Feb 2020 18:03:21 +0900 Subject: [PATCH] [fix] grpc and http2 frames are mixed in a response --- .../packetproxy/encode/EncodeHTTPBase.java | 9 +- .../core/packetproxy/http2/FramesBase.java | 2 + .../java/core/packetproxy/http2/Grpc.java | 176 ++++++++++++++++++ .../java/core/packetproxy/http2/Http2.java | 53 +----- 4 files changed, 189 insertions(+), 51 deletions(-) create mode 100644 src/main/java/core/packetproxy/http2/Grpc.java diff --git a/src/main/java/core/packetproxy/encode/EncodeHTTPBase.java b/src/main/java/core/packetproxy/encode/EncodeHTTPBase.java index 4733944..09b7628 100644 --- a/src/main/java/core/packetproxy/encode/EncodeHTTPBase.java +++ b/src/main/java/core/packetproxy/encode/EncodeHTTPBase.java @@ -18,6 +18,8 @@ import java.io.InputStream; import packetproxy.http.Http; +import packetproxy.http2.FramesBase; +import packetproxy.http2.Grpc; import packetproxy.http2.Http2; import packetproxy.model.Packet; @@ -27,7 +29,7 @@ public enum HTTPVersion { HTTP1, HTTP2 } private HTTPVersion httpVersion; - private Http2 http2; + private FramesBase http2; public EncodeHTTPBase() { super("http/1.1"); @@ -40,9 +42,12 @@ public EncodeHTTPBase(String ALPN) throws Exception { httpVersion = HTTPVersion.HTTP1; } else if (ALPN.equals("http/1.0") || ALPN.equals("http/1.1")) { httpVersion = HTTPVersion.HTTP1; - } else if (ALPN.equals("h2") || ALPN.startsWith("grpc")) { + } else if (ALPN.equals("h2")) { httpVersion = HTTPVersion.HTTP2; http2 = new Http2(); + } else if (ALPN.equals("grpc") || ALPN.equals("grpc-exp")) { + httpVersion = HTTPVersion.HTTP2; + http2 = new Grpc(); } else { httpVersion = HTTPVersion.HTTP1; } diff --git a/src/main/java/core/packetproxy/http2/FramesBase.java b/src/main/java/core/packetproxy/http2/FramesBase.java index 9ed3a08..c2433b0 100644 --- a/src/main/java/core/packetproxy/http2/FramesBase.java +++ b/src/main/java/core/packetproxy/http2/FramesBase.java @@ -24,6 +24,7 @@ import packetproxy.http2.frames.Frame; import packetproxy.http2.frames.FrameUtils; +import packetproxy.model.Packet; public abstract class FramesBase { @@ -139,4 +140,5 @@ public InputStream getServerFlowControlledInputStream() { protected abstract byte[] decodeServerResponseFromFrames(byte[] frames) throws Exception; protected abstract byte[] encodeClientRequestToFrames(byte[] data) throws Exception; protected abstract byte[] encodeServerResponseToFrames(byte[] data) throws Exception; + public abstract void setGroupId(Packet packet) throws Exception; } diff --git a/src/main/java/core/packetproxy/http2/Grpc.java b/src/main/java/core/packetproxy/http2/Grpc.java new file mode 100644 index 0000000..37dcbc2 --- /dev/null +++ b/src/main/java/core/packetproxy/http2/Grpc.java @@ -0,0 +1,176 @@ +/* + * Copyright 2019 DeNA Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package packetproxy.http2; + +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http2.hpack.HpackEncoder; + +import packetproxy.common.UniqueID; +import packetproxy.http.HeaderField; +import packetproxy.http.Http; +import packetproxy.http2.frames.DataFrame; +import packetproxy.http2.frames.Frame; +import packetproxy.http2.frames.FrameUtils; +import packetproxy.http2.frames.HeadersFrame; +import packetproxy.model.Packet; + +public class Grpc extends FramesBase +{ + private StreamManager clientStreamManager = new StreamManager(); + private StreamManager serverStreamManager = new StreamManager(); + + private static String[] GRPC_RESPONSE_2ND_HEADERS = new String[]{"grpc-status","trace-proto-bin"}; + + public Grpc() throws Exception { + super(); + } + + @Override + public String getName() { + return "gRPC"; + } + + @Override + protected byte[] passFramesToDecodeClientRequest(List frames) throws Exception { return filterFrames(clientStreamManager, frames); } + @Override + protected byte[] passFramesToDecodeServerResponse(List frames) throws Exception { return filterFrames(serverStreamManager, frames); } + + private byte[] filterFrames(StreamManager streamManager, List frames) throws Exception { + for (Frame frame : frames) { + if (frame instanceof HeadersFrame) { + streamManager.write(frame); + } else if (frame instanceof DataFrame) { + streamManager.write(frame); + } + if ((frame.getFlags() & 0x01) > 0) { + List stream = streamManager.read(frame.getStreamId()); + return FrameUtils.toByteArray(stream); + } + } + return null; + } + + @Override + protected byte[] decodeClientRequestFromFrames(byte[] frames) throws Exception { return decodeFromFrames(frames); } + @Override + protected byte[] decodeServerResponseFromFrames(byte[] frames) throws Exception { return decodeFromFrames(frames); } + + private byte[] decodeFromFrames(byte[] frames) throws Exception { + ByteArrayOutputStream outHeader = new ByteArrayOutputStream(); + ByteArrayOutputStream outData = new ByteArrayOutputStream(); + Http httpHeaderSums = null; + + for (Frame frame : FrameUtils.parseFrames(frames)) { + if (frame instanceof HeadersFrame) { + HeadersFrame headersFrame = (HeadersFrame)frame; + Http http = new Http(headersFrame.getHttp()); + if(null==httpHeaderSums){ + httpHeaderSums = http; + }else{ + for(HeaderField field: http.getHeader().getFields()){ + httpHeaderSums.updateHeader(field.getName(), field.getValue()); + } + } + } else if (frame instanceof DataFrame) { + DataFrame dataFrame = (DataFrame)frame; + outData.write(dataFrame.getPayload()); + } + } + outHeader.write(httpHeaderSums.toByteArray()); + //順序をHeader, Dataにする。本当はStreamIDで管理してencode時に元の順番に戻せるようにしたい。 + //暫定でgRPC over HTTP2のレスポンスの2つめのヘッダはGRPC_RESPONSE_2ND_HEADERSに従って元の順番に戻す。 + outData.writeTo(outHeader); + Http http = new Http(outHeader.toByteArray()); + int flags = Integer.valueOf(http.getFirstHeader("X-PacketProxy-HTTP2-Flags")); + if (http.getBody() == null || http.getBody().length == 0) { + http.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff | HeadersFrame.FLAG_END_STREAM)); + } + return http.toByteArray(); + } + + @Override + protected byte[] encodeClientRequestToFrames(byte[] http) throws Exception { return encodeToFrames(http, super.getClientHpackEncoder()); } + @Override + protected byte[] encodeServerResponseToFrames(byte[] http) throws Exception { return encodeToFrames(http, super.getServerHpackEncoder()); } + + private byte[] encodeToFrames(byte[] data, HpackEncoder encoder) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Http http = new Http(data); + int flags = Integer.valueOf(http.getFirstHeader("X-PacketProxy-HTTP2-Flags")); + if (http.getBody() != null && http.getBody().length > 0) { + http.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff & ~HeadersFrame.FLAG_END_STREAM)); + HttpFields GRPC2ndHeaderHttpFields = new HttpFields(); + for(String k:GRPC_RESPONSE_2ND_HEADERS){ + String v = http.getFirstHeader(k); + http.removeHeader(k); + if("".equals(v)) continue; + GRPC2ndHeaderHttpFields.add(k, v); + } + + HeadersFrame headersFrame = new HeadersFrame(http); + out.write(headersFrame.toByteArrayWithoutExtra(encoder)); + + DataFrame dataFrame = new DataFrame(http); + if(GRPC2ndHeaderHttpFields.size()>0){ + dataFrame.setFlags(dataFrame.getFlags() & 0xff & ~DataFrame.FLAG_END_STREAM); + } + out.write(dataFrame.toByteArrayWithoutExtra()); + + if(GRPC2ndHeaderHttpFields.size()>0) { + Http althttp = http; + althttp.setBody(new byte[0]); + althttp.removeMatches("^(?!X-PacketProxy-HTTP2).*$"); + + for (HttpField headerField : GRPC2ndHeaderHttpFields) { + althttp.updateHeader(headerField.getName(), headerField.getValue()); + } + althttp.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff | HeadersFrame.FLAG_END_STREAM)); + HeadersFrame headers2ndFrame = new HeadersFrame(althttp); + out.write(headers2ndFrame.toByteArrayWithoutExtra(encoder)); + } + } else { + http.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff | HeadersFrame.FLAG_END_STREAM)); + HeadersFrame headersFrame = new HeadersFrame(http); + out.write(headersFrame.toByteArrayWithoutExtra(encoder)); + } + return out.toByteArray(); + } + + /* key: streamId, value: groupId */ + private Map groupMap = new HashMap<>(); + + public void setGroupId(Packet packet) throws Exception { + byte[] data = (packet.getDecodedData().length > 0) ? packet.getDecodedData() : packet.getModifiedData(); + Http http = new Http(data); + String streamIdStr = http.getFirstHeader("X-PacketProxy-HTTP2-Stream-Id"); + if (streamIdStr != null && streamIdStr.length() > 0) { + long streamId = Long.parseLong(streamIdStr); + if (groupMap.containsKey(streamId)) { + packet.setGroup(groupMap.get(streamId)); + } else { + long groupId = UniqueID.getInstance().createId(); + groupMap.put(streamId, groupId); + packet.setGroup(groupId); + } + } + } +} diff --git a/src/main/java/core/packetproxy/http2/Http2.java b/src/main/java/core/packetproxy/http2/Http2.java index d0574c1..38ee421 100644 --- a/src/main/java/core/packetproxy/http2/Http2.java +++ b/src/main/java/core/packetproxy/http2/Http2.java @@ -20,14 +20,10 @@ import java.util.List; import java.util.Map; -import org.eclipse.jetty.http.HttpField; -import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http2.hpack.HpackEncoder; import packetproxy.common.UniqueID; -import packetproxy.http.HeaderField; import packetproxy.http.Http; -import packetproxy.http.HttpHeader; import packetproxy.http2.frames.DataFrame; import packetproxy.http2.frames.Frame; import packetproxy.http2.frames.FrameUtils; @@ -39,8 +35,6 @@ public class Http2 extends FramesBase private StreamManager clientStreamManager = new StreamManager(); private StreamManager serverStreamManager = new StreamManager(); - private static String[] GRPC_RESPONSE_2ND_HEADERS = new String[]{"grpc-status","trace-proto-bin"}; - public Http2() throws Exception { super(); } @@ -76,31 +70,17 @@ private byte[] filterFrames(StreamManager streamManager, List frames) thr protected byte[] decodeServerResponseFromFrames(byte[] frames) throws Exception { return decodeFromFrames(frames); } private byte[] decodeFromFrames(byte[] frames) throws Exception { - ByteArrayOutputStream outHeader = new ByteArrayOutputStream(); - ByteArrayOutputStream outData = new ByteArrayOutputStream(); - Http httpHeaderSums = null; - + ByteArrayOutputStream out = new ByteArrayOutputStream(); for (Frame frame : FrameUtils.parseFrames(frames)) { if (frame instanceof HeadersFrame) { HeadersFrame headersFrame = (HeadersFrame)frame; - Http http = new Http(headersFrame.getHttp()); - if(null==httpHeaderSums){ - httpHeaderSums = http; - }else{ - for(HeaderField field: http.getHeader().getFields()){ - httpHeaderSums.updateHeader(field.getName(), field.getValue()); - } - } + out.write(headersFrame.getHttp()); } else if (frame instanceof DataFrame) { DataFrame dataFrame = (DataFrame)frame; - outData.write(dataFrame.getPayload()); + out.write(dataFrame.getPayload()); } } - outHeader.write(httpHeaderSums.toByteArray()); - //順序をHeader, Dataにする。本当はStreamIDで管理してencode時に元の順番に戻せるようにしたい。 - //暫定でgRPC over HTTP2のレスポンスの2つめのヘッダはGRPC_RESPONSE_2ND_HEADERSに従って元の順番に戻す。 - outData.writeTo(outHeader); - Http http = new Http(outHeader.toByteArray()); + Http http = new Http(out.toByteArray()); int flags = Integer.valueOf(http.getFirstHeader("X-PacketProxy-HTTP2-Flags")); if (http.getBody() == null || http.getBody().length == 0) { http.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff | HeadersFrame.FLAG_END_STREAM)); @@ -119,35 +99,10 @@ private byte[] encodeToFrames(byte[] data, HpackEncoder encoder) throws Exceptio int flags = Integer.valueOf(http.getFirstHeader("X-PacketProxy-HTTP2-Flags")); if (http.getBody() != null && http.getBody().length > 0) { http.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff & ~HeadersFrame.FLAG_END_STREAM)); - HttpFields GRPC2ndHeaderHttpFields = new HttpFields(); - for(String k:GRPC_RESPONSE_2ND_HEADERS){ - String v = http.getFirstHeader(k); - http.removeHeader(k); - if("".equals(v)) continue; - GRPC2ndHeaderHttpFields.add(k, v); - } - HeadersFrame headersFrame = new HeadersFrame(http); out.write(headersFrame.toByteArrayWithoutExtra(encoder)); - DataFrame dataFrame = new DataFrame(http); - if(GRPC2ndHeaderHttpFields.size()>0){ - dataFrame.setFlags(dataFrame.getFlags() & 0xff & ~DataFrame.FLAG_END_STREAM); - } out.write(dataFrame.toByteArrayWithoutExtra()); - - if(GRPC2ndHeaderHttpFields.size()>0) { - Http althttp = http; - althttp.setBody(new byte[0]); - althttp.removeMatches("^(?!X-PacketProxy-HTTP2).*$"); - - for (HttpField headerField : GRPC2ndHeaderHttpFields) { - althttp.updateHeader(headerField.getName(), headerField.getValue()); - } - althttp.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff | HeadersFrame.FLAG_END_STREAM)); - HeadersFrame headers2ndFrame = new HeadersFrame(althttp); - out.write(headers2ndFrame.toByteArrayWithoutExtra(encoder)); - } } else { http.updateHeader("X-PacketProxy-HTTP2-Flags", String.valueOf(flags & 0xff | HeadersFrame.FLAG_END_STREAM)); HeadersFrame headersFrame = new HeadersFrame(http);