From 49262547c05d0f2031a2c6cbc42781716751c93c Mon Sep 17 00:00:00 2001 From: Ben Kilimnik <47846691+benkilimnik@users.noreply.github.com> Date: Thu, 9 Nov 2023 10:58:25 -0800 Subject: [PATCH] [Fix CQL Sticher 4/4] Move StreamID assignment to ParseFramesLoop (#1732) Summary: Populates a map of streamIDs to deque of frames in `ParseFramesLoop` instead of `ParseFrames`. This should provide a small efficiency boost, as we won't have to loop over the frames twice. This PR relies on #1761 due to the way timestamps are updated using `ParseResult`. Related issues: https://github.com/pixie-io/pixie/issues/1375 Type of change: /kind cleanup Test Plan: Updated parsing tests to use new interface Signed-off-by: Benjamin Kilimnik --- .../socket_tracer/data_stream.cc | 6 +- .../protocols/common/event_parser.h | 80 +++--- .../protocols/common/event_parser_test.cc | 5 +- .../socket_tracer/protocols/cql/parse_test.cc | 67 +++-- .../socket_tracer/protocols/dns/parse_test.cc | 267 ++++++++++-------- .../protocols/http/parse_test.cc | 204 +++++++------ .../protocols/kafka/parse_test.cc | 26 +- .../protocols/mysql/parse_test.cc | 105 ++++--- 8 files changed, 435 insertions(+), 325 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/data_stream.cc b/src/stirling/source_connectors/socket_tracer/data_stream.cc index 73d1bd3932a..3d0be9cc31f 100644 --- a/src/stirling/source_connectors/socket_tracer/data_stream.cc +++ b/src/stirling/source_connectors/socket_tracer/data_stream.cc @@ -103,7 +103,7 @@ void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) { bool keep_processing = has_new_events_ || attempt_sync || conn_closed(); - protocols::ParseResult parse_result; + protocols::ParseResult parse_result; parse_result.state = ParseState::kNeedsMoreData; parse_result.end_position = 0; @@ -134,7 +134,9 @@ void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) { keep_processing = false; } - stat_valid_frames_ += parse_result.frame_positions.size(); + for (const auto& [stream, positions] : parse_result.frame_positions) { + stat_valid_frames_ += positions.size(); + } stat_invalid_frames_ += parse_result.invalid_frames; stat_raw_data_gaps_ += keep_processing; diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h index 387effd54f1..cb4eb55ec6d 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h @@ -68,9 +68,10 @@ inline bool operator==(const StartEndPos& lhs, const StartEndPos& rhs) { } // A ParseResult returns a vector of parsed frames, and also some position markers. +template struct ParseResult { // Positions of frame start and end positions in the source buffer. - std::vector frame_positions; + absl::flat_hash_map> frame_positions; // Position of where parsing ended consuming the source buffer. // This is total bytes successfully consumed. size_t end_position; @@ -98,9 +99,9 @@ struct ParseResult { * @return ParseResult with locations where parseable frames were found in the source buffer. */ template -ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffer, - absl::flat_hash_map>* frames, - bool resync = false, TStateType* state = nullptr) { +ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffer, + absl::flat_hash_map>* frames, + bool resync = false, TStateType* state = nullptr) { std::string_view buf = data_stream_buffer->Head(); size_t start_pos = 0; @@ -121,32 +122,43 @@ ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffe buf.remove_prefix(start_pos); } - // Parse and append new frames to the frames vector. - std::deque new_frames = std::deque(); - ParseResult result = ParseFramesLoop(type, buf, &new_frames, state); + // Maintain a map of previous sizes. + absl::flat_hash_map prev_sizes; + for (const auto& [stream_id, deque] : *frames) { + prev_sizes[stream_id] = deque.size(); + } - VLOG(1) << absl::Substitute("Parsed $0 new frames", new_frames.size()); + // Parse and append new frames to the map of stream ID to deque of frames + ParseResult result = ParseFramesLoop(type, buf, frames, state); - // Match timestamps with the parsed frames. - for (size_t i = 0; i < result.frame_positions.size(); ++i) { - auto& f = result.frame_positions[i]; - f.start += start_pos; - f.end += start_pos; - - auto& msg = new_frames[i]; - StatusOr timestamp_ns_status = - data_stream_buffer->GetTimestamp(data_stream_buffer->position() + f.end); - LOG_IF(ERROR, !timestamp_ns_status.ok()) << timestamp_ns_status.ToString(); - msg.timestamp_ns = timestamp_ns_status.ValueOr(0); + // Compute the number of newly parsed frames for each stream + size_t total_new_frames = 0; + for (const auto& [stream_id, positions] : result.frame_positions) { + total_new_frames += positions.size(); + if (prev_sizes.find(stream_id) != prev_sizes.end()) { + total_new_frames -= prev_sizes[stream_id]; + } } - result.end_position += start_pos; + VLOG(1) << absl::Substitute("Parsed $0 new frames", total_new_frames); - // Parse frames into map - for (auto& frame : new_frames) { - // GetStreamID returns 0 by default if not implemented in protocol. - TKey key = GetStreamID(&frame); - (*frames)[key].push_back(std::move(frame)); + // Match timestamps with the parsed frames. + for (auto& [stream_id, positions] : result.frame_positions) { + size_t offset = prev_sizes[stream_id]; // Retrieve the initial offset for this stream_id + + for (auto& f : positions) { + f.start += start_pos; + f.end += start_pos; + + // Retrieve the message using the current offset + auto& msg = (*frames)[stream_id][offset]; + offset++; + StatusOr timestamp_ns_status = + data_stream_buffer->GetTimestamp(data_stream_buffer->position() + f.end); + LOG_IF(ERROR, !timestamp_ns_status.ok()) << timestamp_ns_status.ToString(); + msg.timestamp_ns = timestamp_ns_status.ValueOr(0); + } } + result.end_position += start_pos; return result; } @@ -164,10 +176,11 @@ ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffe * @return ParseResult with locations where parseable frames were found in the source buffer. */ // TODO(oazizi): Convert tests to use ParseFrames() instead of ParseFramesLoop(). -template -ParseResult ParseFramesLoop(message_type_t type, std::string_view buf, - std::deque* frames, TStateType* state = nullptr) { - std::vector frame_positions; +template +ParseResult ParseFramesLoop(message_type_t type, std::string_view buf, + absl::flat_hash_map>* frames, + TStateType* state = nullptr) { + absl::flat_hash_map> frame_positions; const size_t buf_size = buf.size(); ParseState s = ParseState::kSuccess; size_t bytes_processed = 0; @@ -225,12 +238,15 @@ ParseResult ParseFramesLoop(message_type_t type, std::string_view buf, size_t end_position = bytes_processed - 1; if (push) { - frame_positions.push_back({start_position, end_position}); + // GetStreamID returns 0 by default if not implemented in protocol. + TKey key = GetStreamID(&frame); + frame_positions[key].push_back({start_position, end_position}); + (*frames)[key].push_back(std::move(frame)); frame_bytes += (end_position - start_position) + 1; - frames->push_back(std::move(frame)); } } - return ParseResult{std::move(frame_positions), bytes_processed, s, invalid_count, frame_bytes}; + return ParseResult{std::move(frame_positions), bytes_processed, s, invalid_count, + frame_bytes}; } } // namespace protocols diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc index aba11c62691..a76bf6239de 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc @@ -85,10 +85,11 @@ TEST_F(EventParserTest, BasicProtocolParsing) { std::vector events = CreateEvents(event_messages); AddEvents(events); - ParseResult res = ParseFrames(message_type_t::kRequest, &data_buffer_, &word_frames); + ParseResult res = ParseFrames(message_type_t::kRequest, &data_buffer_, &word_frames); EXPECT_EQ(ParseState::kSuccess, res.state); - EXPECT_THAT(res.frame_positions, + stream_id_t stream_id = 0; + EXPECT_THAT(res.frame_positions[stream_id], ElementsAre(StartEndPos{0, 7}, StartEndPos{8, 14}, StartEndPos{15, 22}, StartEndPos{23, 29}, StartEndPos{30, 35}, StartEndPos{36, 43})); EXPECT_EQ(res.end_position, 44); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/cql/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/cql/parse_test.cc index e981f51e3b0..3a08179b999 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/cql/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/cql/parse_test.cc @@ -19,6 +19,8 @@ #include #include +#include +#include "src/stirling/source_connectors/socket_tracer/protocols/common/test_utils.h" #include "src/stirling/source_connectors/socket_tracer/protocols/cql/parse.h" namespace px { @@ -63,29 +65,33 @@ class CQLParserTest : public ::testing::Test {}; TEST_F(CQLParserTest, Basic) { auto frame_view = CreateStringView(CharArrayStringView(kQueryFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); - ASSERT_EQ(frames.size(), 1); - EXPECT_EQ(frames[0].hdr.version & 0x80, 0); - EXPECT_EQ(frames[0].hdr.version & 0x7f, 4); - EXPECT_EQ(frames[0].hdr.flags, 0); - EXPECT_EQ(frames[0].hdr.stream, 6); - EXPECT_EQ(frames[0].hdr.opcode, Opcode::kQuery); - EXPECT_EQ(frames[0].hdr.length, 60); - EXPECT_THAT(frames[0].msg, testing::HasSubstr("SELECT * FROM system.schema_keyspaces ;")); + ASSERT_EQ(TotalDequeSize(frames), 1); + std::deque expected_stream = frames[6]; + EXPECT_EQ(expected_stream[0].hdr.version & 0x80, 0); + EXPECT_EQ(expected_stream[0].hdr.version & 0x7f, 4); + EXPECT_EQ(expected_stream[0].hdr.flags, 0); + EXPECT_EQ(expected_stream[0].hdr.stream, 6); + EXPECT_EQ(expected_stream[0].hdr.opcode, Opcode::kQuery); + EXPECT_EQ(expected_stream[0].hdr.length, 60); + EXPECT_THAT(expected_stream[0].msg, + testing::HasSubstr("SELECT * FROM system.schema_keyspaces ;")); } TEST_F(CQLParserTest, NeedsMoreData) { std::string_view frame_view = CreateStringView(CharArrayStringView(kQueryFrame)); frame_view.remove_suffix(10); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kNeedsMoreData); - ASSERT_EQ(frames.size(), 0); + ASSERT_EQ(TotalDequeSize(frames), 0); } TEST_F(CQLParserTest, BadOpcode) { @@ -95,11 +101,12 @@ TEST_F(CQLParserTest, BadOpcode) { std::string_view frame_view = CreateStringView(CharArrayStringView(kBadOpcodeFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); - ASSERT_EQ(frames.size(), 0); + ASSERT_EQ(TotalDequeSize(frames), 0); } TEST_F(CQLParserTest, LengthTooLarge) { @@ -110,11 +117,12 @@ TEST_F(CQLParserTest, LengthTooLarge) { std::string_view frame_view = CreateStringView(CharArrayStringView(kBadLengthFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); - ASSERT_EQ(frames.size(), 0); + ASSERT_EQ(TotalDequeSize(frames), 0); } TEST_F(CQLParserTest, LengthNegative) { @@ -125,11 +133,12 @@ TEST_F(CQLParserTest, LengthNegative) { std::string_view frame_view = CreateStringView(CharArrayStringView(kBadLengthFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); - ASSERT_EQ(frames.size(), 0); + ASSERT_EQ(TotalDequeSize(frames), 0); } TEST_F(CQLParserTest, VersionTooOld) { @@ -140,11 +149,12 @@ TEST_F(CQLParserTest, VersionTooOld) { std::string_view frame_view = CreateStringView(CharArrayStringView(kBadLengthFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); - ASSERT_EQ(frames.size(), 0); + ASSERT_EQ(TotalDequeSize(frames), 0); } TEST_F(CQLParserTest, VersionTooNew) { @@ -155,11 +165,12 @@ TEST_F(CQLParserTest, VersionTooNew) { std::string_view frame_view = CreateStringView(CharArrayStringView(kBadLengthFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); - ASSERT_EQ(frames.size(), 0); + ASSERT_EQ(TotalDequeSize(frames), 0); } } // namespace cass diff --git a/src/stirling/source_connectors/socket_tracer/protocols/dns/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/dns/parse_test.cc index 7581d64b6e7..e5379ec7fa3 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/dns/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/dns/parse_test.cc @@ -16,6 +16,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include @@ -250,147 +251,176 @@ class DNSParserTest : public ::testing::Test {}; TEST_F(DNSParserTest, BasicReq) { auto frame_view = CreateStringView(CharArrayStringView(kQueryFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); - ASSERT_EQ(frames.size(), 1); - EXPECT_EQ(frames[0].header.txid, 0xc6fa); - EXPECT_EQ(frames[0].header.flags, 0x0100); - EXPECT_EQ(frames[0].header.num_queries, 1); - EXPECT_EQ(frames[0].header.num_answers, 0); - EXPECT_EQ(frames[0].header.num_auth, 0); - EXPECT_EQ(frames[0].header.num_addl, 1); - - ASSERT_EQ(frames[0].records().size(), 1); - EXPECT_EQ(frames[0].records()[0].name, "intellij-experiments.appspot.com"); - EXPECT_EQ(frames[0].records()[0].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[0].addr.AddrStr(), "0.0.0.0"); + + stream_id_t only_key = + frames.begin()->first; // Grab the first (and only) key. DNS has no notion of streams. + ASSERT_EQ(frames[only_key].size(), 1); + Frame& first_frame = frames[only_key][0]; + + EXPECT_EQ(first_frame.header.txid, 0xc6fa); + EXPECT_EQ(first_frame.header.flags, 0x0100); + EXPECT_EQ(first_frame.header.num_queries, 1); + EXPECT_EQ(first_frame.header.num_answers, 0); + EXPECT_EQ(first_frame.header.num_auth, 0); + EXPECT_EQ(first_frame.header.num_addl, 1); + + ASSERT_EQ(first_frame.records().size(), 1); + EXPECT_EQ(first_frame.records()[0].name, "intellij-experiments.appspot.com"); + EXPECT_EQ(first_frame.records()[0].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[0].addr.AddrStr(), "0.0.0.0"); } TEST_F(DNSParserTest, BasicResp) { auto frame_view = CreateStringView(CharArrayStringView(kRespFrame)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); - ASSERT_EQ(frames.size(), 1); - EXPECT_EQ(frames[0].header.txid, 0xc6fa); - EXPECT_EQ(frames[0].header.flags, 0x8180); - EXPECT_EQ(frames[0].header.num_queries, 1); - EXPECT_EQ(frames[0].header.num_answers, 1); - EXPECT_EQ(frames[0].header.num_auth, 0); - EXPECT_EQ(frames[0].header.num_addl, 1); - - ASSERT_EQ(frames[0].records().size(), 1); - EXPECT_EQ(frames[0].records()[0].name, "intellij-experiments.appspot.com"); - EXPECT_EQ(frames[0].records()[0].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[0].addr.AddrStr(), "216.58.194.180"); + + stream_id_t only_key = + frames.begin()->first; // Grab the first (and only) key. DNS has no notion of streams. + ASSERT_EQ(frames[only_key].size(), 1); + Frame& first_frame = frames[only_key][0]; + + EXPECT_EQ(first_frame.header.txid, 0xc6fa); + EXPECT_EQ(first_frame.header.flags, 0x8180); + EXPECT_EQ(first_frame.header.num_queries, 1); + EXPECT_EQ(first_frame.header.num_answers, 1); + EXPECT_EQ(first_frame.header.num_auth, 0); + EXPECT_EQ(first_frame.header.num_addl, 1); + + ASSERT_EQ(first_frame.records().size(), 1); + EXPECT_EQ(first_frame.records()[0].name, "intellij-experiments.appspot.com"); + EXPECT_EQ(first_frame.records()[0].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[0].addr.AddrStr(), "216.58.194.180"); } TEST_F(DNSParserTest, BasicReq2) { auto frame_view = CreateStringView(CharArrayStringView(kReqFrame2)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); - ASSERT_EQ(frames.size(), 1); - EXPECT_EQ(frames[0].header.txid, 0xfeae); - EXPECT_EQ(frames[0].header.flags, 0x0100); - EXPECT_EQ(frames[0].header.num_queries, 1); - EXPECT_EQ(frames[0].header.num_answers, 0); - EXPECT_EQ(frames[0].header.num_auth, 0); - EXPECT_EQ(frames[0].header.num_addl, 0); - - ASSERT_EQ(frames[0].records().size(), 1); - EXPECT_EQ(frames[0].records()[0].name, "www.yahoo.com"); - EXPECT_EQ(frames[0].records()[0].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[0].addr.AddrStr(), "0.0.0.0"); + + stream_id_t only_key = + frames.begin()->first; // Grab the first (and only) key. DNS has no notion of streams. + ASSERT_EQ(frames[only_key].size(), 1); + Frame& first_frame = frames[only_key][0]; + + EXPECT_EQ(first_frame.header.txid, 0xfeae); + EXPECT_EQ(first_frame.header.flags, 0x0100); + EXPECT_EQ(first_frame.header.num_queries, 1); + EXPECT_EQ(first_frame.header.num_answers, 0); + EXPECT_EQ(first_frame.header.num_auth, 0); + EXPECT_EQ(first_frame.header.num_addl, 0); + + ASSERT_EQ(first_frame.records().size(), 1); + EXPECT_EQ(first_frame.records()[0].name, "www.yahoo.com"); + EXPECT_EQ(first_frame.records()[0].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[0].addr.AddrStr(), "0.0.0.0"); } TEST_F(DNSParserTest, CNameAndMultipleResponses) { auto frame_view = CreateStringView(CharArrayStringView(kRespFrame2)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); - ASSERT_EQ(frames.size(), 1); - EXPECT_EQ(frames[0].header.txid, 0xfeae); - EXPECT_EQ(frames[0].header.flags, 0x8180); - EXPECT_EQ(frames[0].header.num_queries, 1); - EXPECT_EQ(frames[0].header.num_answers, 5); - EXPECT_EQ(frames[0].header.num_auth, 0); - EXPECT_EQ(frames[0].header.num_addl, 0); - - ASSERT_EQ(frames[0].records().size(), 5); - - EXPECT_EQ(frames[0].records()[0].name, "www.yahoo.com"); - EXPECT_EQ(frames[0].records()[0].addr.family, InetAddrFamily::kUnspecified); - EXPECT_EQ(frames[0].records()[0].cname, "new-fp-shed.wg1.b.yahoo.com"); - - EXPECT_EQ(frames[0].records()[1].name, "new-fp-shed.wg1.b.yahoo.com"); - EXPECT_EQ(frames[0].records()[1].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[1].addr.AddrStr(), "98.137.11.164"); - EXPECT_EQ(frames[0].records()[1].cname, ""); - - EXPECT_EQ(frames[0].records()[2].name, "new-fp-shed.wg1.b.yahoo.com"); - EXPECT_EQ(frames[0].records()[2].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[2].addr.AddrStr(), "74.6.231.20"); - EXPECT_EQ(frames[0].records()[2].cname, ""); - - EXPECT_EQ(frames[0].records()[3].name, "new-fp-shed.wg1.b.yahoo.com"); - EXPECT_EQ(frames[0].records()[3].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[3].addr.AddrStr(), "74.6.231.21"); - EXPECT_EQ(frames[0].records()[3].cname, ""); - - EXPECT_EQ(frames[0].records()[4].name, "new-fp-shed.wg1.b.yahoo.com"); - EXPECT_EQ(frames[0].records()[4].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[4].addr.AddrStr(), "98.137.11.163"); - EXPECT_EQ(frames[0].records()[4].cname, ""); + stream_id_t only_key = + frames.begin()->first; // Grab the first (and only) key. DNS has no notion of streams. + ASSERT_EQ(frames[only_key].size(), 1); + Frame& first_frame = frames[only_key][0]; + + EXPECT_EQ(first_frame.header.txid, 0xfeae); + EXPECT_EQ(first_frame.header.flags, 0x8180); + EXPECT_EQ(first_frame.header.num_queries, 1); + EXPECT_EQ(first_frame.header.num_answers, 5); + EXPECT_EQ(first_frame.header.num_auth, 0); + EXPECT_EQ(first_frame.header.num_addl, 0); + + ASSERT_EQ(first_frame.records().size(), 5); + + EXPECT_EQ(first_frame.records()[0].name, "www.yahoo.com"); + EXPECT_EQ(first_frame.records()[0].addr.family, InetAddrFamily::kUnspecified); + EXPECT_EQ(first_frame.records()[0].cname, "new-fp-shed.wg1.b.yahoo.com"); + + EXPECT_EQ(first_frame.records()[1].name, "new-fp-shed.wg1.b.yahoo.com"); + EXPECT_EQ(first_frame.records()[1].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[1].addr.AddrStr(), "98.137.11.164"); + EXPECT_EQ(first_frame.records()[1].cname, ""); + + EXPECT_EQ(first_frame.records()[2].name, "new-fp-shed.wg1.b.yahoo.com"); + EXPECT_EQ(first_frame.records()[2].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[2].addr.AddrStr(), "74.6.231.20"); + EXPECT_EQ(first_frame.records()[2].cname, ""); + + EXPECT_EQ(first_frame.records()[3].name, "new-fp-shed.wg1.b.yahoo.com"); + EXPECT_EQ(first_frame.records()[3].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[3].addr.AddrStr(), "74.6.231.21"); + EXPECT_EQ(first_frame.records()[3].cname, ""); + + EXPECT_EQ(first_frame.records()[4].name, "new-fp-shed.wg1.b.yahoo.com"); + EXPECT_EQ(first_frame.records()[4].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[4].addr.AddrStr(), "98.137.11.163"); + EXPECT_EQ(first_frame.records()[4].cname, ""); } TEST_F(DNSParserTest, CNameAndMultipleResponses2) { auto frame_view = CreateStringView(CharArrayStringView(kRespFrame3)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kResponse, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); - ASSERT_EQ(frames.size(), 1); - EXPECT_EQ(frames[0].header.txid, 0x938f); - EXPECT_EQ(frames[0].header.flags, 0x8180); - EXPECT_EQ(frames[0].header.num_queries, 1); - EXPECT_EQ(frames[0].header.num_answers, 5); - EXPECT_EQ(frames[0].header.num_auth, 0); - EXPECT_EQ(frames[0].header.num_addl, 1); - ASSERT_EQ(frames[0].records().size(), 5); - - EXPECT_EQ(frames[0].records()[0].name, "www.reddit.com"); - EXPECT_EQ(frames[0].records()[0].addr.family, InetAddrFamily::kUnspecified); - EXPECT_EQ(frames[0].records()[0].cname, "reddit.map.fastly.net"); - - EXPECT_EQ(frames[0].records()[1].name, "reddit.map.fastly.net"); - EXPECT_EQ(frames[0].records()[1].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[1].addr.AddrStr(), "151.101.1.140"); - EXPECT_EQ(frames[0].records()[1].cname, ""); - - EXPECT_EQ(frames[0].records()[2].name, "reddit.map.fastly.net"); - EXPECT_EQ(frames[0].records()[2].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[2].addr.AddrStr(), "151.101.65.140"); - EXPECT_EQ(frames[0].records()[2].cname, ""); - - EXPECT_EQ(frames[0].records()[3].name, "reddit.map.fastly.net"); - EXPECT_EQ(frames[0].records()[3].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[3].addr.AddrStr(), "151.101.129.140"); - EXPECT_EQ(frames[0].records()[3].cname, ""); - - EXPECT_EQ(frames[0].records()[4].name, "reddit.map.fastly.net"); - EXPECT_EQ(frames[0].records()[4].addr.family, InetAddrFamily::kIPv4); - EXPECT_EQ(frames[0].records()[4].addr.AddrStr(), "151.101.193.140"); - EXPECT_EQ(frames[0].records()[4].cname, ""); + + stream_id_t only_key = + frames.begin()->first; // Grab the first (and only) key. DNS has no notion of streams. + ASSERT_EQ(frames[only_key].size(), 1); + Frame& first_frame = frames[only_key][0]; + + EXPECT_EQ(first_frame.header.txid, 0x938f); + EXPECT_EQ(first_frame.header.flags, 0x8180); + EXPECT_EQ(first_frame.header.num_queries, 1); + EXPECT_EQ(first_frame.header.num_answers, 5); + EXPECT_EQ(first_frame.header.num_auth, 0); + EXPECT_EQ(first_frame.header.num_addl, 1); + ASSERT_EQ(first_frame.records().size(), 5); + + EXPECT_EQ(first_frame.records()[0].name, "www.reddit.com"); + EXPECT_EQ(first_frame.records()[0].addr.family, InetAddrFamily::kUnspecified); + EXPECT_EQ(first_frame.records()[0].cname, "reddit.map.fastly.net"); + + EXPECT_EQ(first_frame.records()[1].name, "reddit.map.fastly.net"); + EXPECT_EQ(first_frame.records()[1].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[1].addr.AddrStr(), "151.101.1.140"); + EXPECT_EQ(first_frame.records()[1].cname, ""); + + EXPECT_EQ(first_frame.records()[2].name, "reddit.map.fastly.net"); + EXPECT_EQ(first_frame.records()[2].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[2].addr.AddrStr(), "151.101.65.140"); + EXPECT_EQ(first_frame.records()[2].cname, ""); + + EXPECT_EQ(first_frame.records()[3].name, "reddit.map.fastly.net"); + EXPECT_EQ(first_frame.records()[3].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[3].addr.AddrStr(), "151.101.129.140"); + EXPECT_EQ(first_frame.records()[3].cname, ""); + + EXPECT_EQ(first_frame.records()[4].name, "reddit.map.fastly.net"); + EXPECT_EQ(first_frame.records()[4].addr.family, InetAddrFamily::kIPv4); + EXPECT_EQ(first_frame.records()[4].addr.AddrStr(), "151.101.193.140"); + EXPECT_EQ(first_frame.records()[4].cname, ""); } TEST_F(DNSParserTest, IncompleteHeader) { @@ -398,8 +428,9 @@ TEST_F(DNSParserTest, IncompleteHeader) { 0x00, 0x00, 0x00, 0x00, 0x00}; auto frame_view = CreateStringView(CharArrayStringView(kIncompleteHeader)); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); } @@ -411,8 +442,9 @@ TEST_F(DNSParserTest, PartialRecords) { auto frame_view = CreateStringView(CharArrayStringView(kRespFrame)); frame_view.remove_suffix(10); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kSuccess); } @@ -421,8 +453,9 @@ TEST_F(DNSParserTest, PartialRecords) { auto frame_view = CreateStringView(CharArrayStringView(kRespFrame)); frame_view.remove_suffix(20); - std::deque frames; - ParseResult parse_result = ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); + absl::flat_hash_map> frames; + ParseResult parse_result = + ParseFramesLoop(message_type_t::kRequest, frame_view, &frames); ASSERT_EQ(parse_result.state, ParseState::kInvalid); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc index 20211a84195..333130e9e36 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc @@ -288,13 +288,15 @@ TEST_F(HTTPParserTest, CompleteMessages) { std::string msg_c = HTTPRespWithSizedBody("c"); std::string buf = absl::StrCat(msg_a, msg_b, msg_c); - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); EXPECT_EQ(msg_a.size() + msg_b.size() + msg_c.size(), result.end_position); - EXPECT_THAT(parsed_messages, ElementsAre(HasBody("a"), HasBody("b"), HasBody("c"))); - EXPECT_THAT(result.frame_positions, + EXPECT_THAT(parsed_messages[0], ElementsAre(HasBody("a"), HasBody("b"), HasBody("c"))); + stream_id_t expectedStreamID = 0; + EXPECT_THAT(result.frame_positions[expectedStreamID], ElementsAre(StartEndPos{0, msg_a.size() - 1}, StartEndPos{msg_a.size(), msg_a.size() + msg_b.size() - 1}, StartEndPos{msg_a.size() + msg_b.size(), @@ -309,12 +311,13 @@ TEST_F(HTTPParserTest, PartialHeader) { "Content-Length: 40\r\n" "Content-Type:"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); EXPECT_EQ(0, result.end_position); - EXPECT_THAT(parsed_messages, IsEmpty()); + EXPECT_THAT(parsed_messages[0], IsEmpty()); } TEST_F(HTTPParserTest, PartialBody) { @@ -326,12 +329,13 @@ TEST_F(HTTPParserTest, PartialBody) { "\r\n" "Foo"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); EXPECT_EQ(0, result.end_position); - EXPECT_THAT(parsed_messages, IsEmpty()); + EXPECT_THAT(parsed_messages[0], IsEmpty()); } TEST_F(HTTPParserTest, Status101) { @@ -345,12 +349,13 @@ TEST_F(HTTPParserTest, Status101) { std::string data = absl::StrCat(switch_protocol_msg, new_protocol_data); - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, data, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, data, &parsed_messages, &state); EXPECT_EQ(ParseState::kEOS, result.state); EXPECT_EQ(switch_protocol_msg.size(), result.end_position); - EXPECT_THAT(parsed_messages, ElementsAre(HasBody(""))); + EXPECT_THAT(parsed_messages[0], ElementsAre(HasBody(""))); } TEST_F(HTTPParserTest, Status204) { @@ -359,11 +364,12 @@ TEST_F(HTTPParserTest, Status204) { "HTTP/1.1 204 No Content\r\n" "\r\n"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(HasBody(""))); + EXPECT_THAT(parsed_messages[0], ElementsAre(HasBody(""))); } //============================================================================= @@ -404,12 +410,13 @@ TEST_F(HTTPParserTest, ParseCompleteHTTPResponseWithContentLengthHeader) { expected_message2.body = "pixielabs!"; expected_message2.body_size = 10; - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; const std::string buf = absl::StrCat(msg1, msg2); - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1, expected_message2)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1, expected_message2)); } TEST_F(HTTPParserTest, ParseIncompleteHTTPResponseWithContentLengthHeader) { @@ -433,30 +440,33 @@ TEST_F(HTTPParserTest, ParseIncompleteHTTPResponseWithContentLengthHeader) { expected_message1.body_size = 21; const std::string buf = absl::StrCat(msg1, msg2, msg3); - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1)); } TEST_F(HTTPParserTest, InvalidInput) { StateWrapper state{}; const std::string_view buf = " is awesome"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kInvalid, result.state); - EXPECT_THAT(parsed_messages, IsEmpty()); + EXPECT_THAT(parsed_messages[0], IsEmpty()); } TEST_F(HTTPParserTest, NoAppend) { StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, "", &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, "", &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, IsEmpty()); + EXPECT_THAT(parsed_messages[0], IsEmpty()); } TEST_F(HTTPParserTest, ParseCompleteChunkEncodedMessage) { @@ -475,11 +485,12 @@ TEST_F(HTTPParserTest, ParseCompleteChunkEncodedMessage) { expected_message.body = "pixielabs is awesome!"; expected_message.body_size = 21; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message)); } TEST_F(HTTPParserTest, LongChunkedMessageTruncated) { @@ -511,11 +522,12 @@ TEST_F(HTTPParserTest, LongChunkedMessageTruncated) { "3333333333333333333333333333333333333333333"; expected_message.body_size = 277; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message)); } TEST_F(HTTPParserTest, LongContentLengthBodyTruncated) { @@ -546,11 +558,12 @@ TEST_F(HTTPParserTest, LongContentLengthBodyTruncated) { "3333333333333333333333333333333333333333333333333333333333333333"; expected_message.body_size = 320; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message)); } TEST_F(HTTPParserTest, ParseIncompleteChunks) { @@ -562,11 +575,12 @@ TEST_F(HTTPParserTest, ParseIncompleteChunks) { "9\r\n" "pixie"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); - EXPECT_THAT(parsed_messages, IsEmpty()); + EXPECT_THAT(parsed_messages[0], IsEmpty()); } // Note that many other tests already use requests with no content-length, @@ -590,11 +604,12 @@ TEST_F(HTTPParserTest, ParseRequestWithoutLengthOrChunking) { expected_message.req_path = "/foo.html"; expected_message.body = ""; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message)); } // Test scenario with a HEAD response followed by a GET response. @@ -615,9 +630,9 @@ TEST_F(HTTPParserTest, ParseHeadAndGetResponse) { "\r\n" "pixie"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, absl::StrCat(head_resp, get_resp), - &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = ParseFramesLoop( + message_type_t::kResponse, absl::StrCat(head_resp, get_resp), &parsed_messages, &state); Message expected_message1 = EmptyHTTPResp(); expected_message1.type = message_type_t::kResponse; @@ -636,7 +651,7 @@ TEST_F(HTTPParserTest, ParseHeadAndGetResponse) { expected_message2.body_size = 5; EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1, expected_message2)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1, expected_message2)); } // Test a HEAD response where there is no subsequent traffic, nor is there a connection close. @@ -652,12 +667,12 @@ TEST_F(HTTPParserTest, ParseHeadResponseWithNoConnClose) { "Content-Type: text/plain; charset=utf-8\r\n" "\r\n"; - std::deque parsed_messages; - ParseResult result = + absl::flat_hash_map> parsed_messages; + ParseResult result = ParseFramesLoop(message_type_t::kResponse, absl::StrCat(head_resp), &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); - EXPECT_THAT(parsed_messages, ElementsAre()); + EXPECT_THAT(parsed_messages[0], ElementsAre()); } // Test a HEAD response followed by a connection close. @@ -674,8 +689,8 @@ TEST_F(HTTPParserTest, ParseHeadResponseWithConnClose) { "Content-Type: text/plain; charset=utf-8\r\n" "\r\n"; - std::deque parsed_messages; - ParseResult result = + absl::flat_hash_map> parsed_messages; + ParseResult result = ParseFramesLoop(message_type_t::kResponse, absl::StrCat(head_resp), &parsed_messages, &state); Message expected_message1 = EmptyHTTPResp(); @@ -686,7 +701,7 @@ TEST_F(HTTPParserTest, ParseHeadResponseWithConnClose) { expected_message1.body = ""; EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1)); } // When a response has no content-length or transfer-encoding, @@ -703,14 +718,15 @@ TEST_F(HTTPParserTest, ParseResponseWithoutLengthOrChunking) { Message expected_message = EmptyHTTPResp(); expected_message.body = "pixielabs is aweso"; - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; state.global.conn_closed = false; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); state.global.conn_closed = true; result = ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message)); } TEST_F(HTTPParserTest, MessagePartialHeaders) { @@ -719,11 +735,12 @@ TEST_F(HTTPParserTest, MessagePartialHeaders) { "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain"; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); - EXPECT_THAT(parsed_messages, IsEmpty()); + EXPECT_THAT(parsed_messages[0], IsEmpty()); } TEST_F(HTTPParserTest, PartialMessageInTheMiddleOfStream) { @@ -735,11 +752,12 @@ TEST_F(HTTPParserTest, PartialMessageInTheMiddleOfStream) { std::string msg4 = HTTPChunk(""); const std::string buf = absl::StrCat(msg0, msg1, msg2, msg3, msg4); - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(HasBody("foobar"), HasBody("pixielabs rocks!"))); + EXPECT_THAT(parsed_messages[0], ElementsAre(HasBody("foobar"), HasBody("pixielabs rocks!"))); } //============================================================================= @@ -748,22 +766,23 @@ TEST_F(HTTPParserTest, PartialMessageInTheMiddleOfStream) { TEST_F(HTTPParserTest, ParseHTTPRequestSingle) { StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = + absl::flat_hash_map> parsed_messages; + ParseResult result = ParseFramesLoop(message_type_t::kRequest, kHTTPGetReq0, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(HTTPGetReq0ExpectedMessage())); + EXPECT_THAT(parsed_messages[0], ElementsAre(HTTPGetReq0ExpectedMessage())); } TEST_F(HTTPParserTest, ParseHTTPRequestMultiple) { StateWrapper state{}; const std::string buf = absl::StrCat(kHTTPGetReq0, kHTTPPostReq0); - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, + EXPECT_THAT(parsed_messages[0], ElementsAre(HTTPGetReq0ExpectedMessage(), HTTPPostReq0ExpectedMessage())); } @@ -793,8 +812,9 @@ TEST_P(HTTPParserTest, ParseHTTPRequestsRepeatedly) { AddEvent(events[2]); absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result = + ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, + /* resync */ false, &state); data_buffer_.RemovePrefix(result.end_position); ASSERT_EQ(ParseState::kSuccess, result.state); @@ -830,8 +850,9 @@ TEST_P(HTTPParserTest, ParseHTTPResponsesRepeatedly) { AddEvent(events[2]); absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result = + ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, + /* resync */ false, &state); data_buffer_.RemovePrefix(result.end_position); ASSERT_EQ(ParseState::kSuccess, result.state); @@ -863,8 +884,9 @@ TEST_F(HTTPParserTest, ParseHTTPResponsesWithLeftover) { // Don't append last split, yet. absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result = + ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, + /* resync */ false, &state); ASSERT_EQ(ParseState::kNeedsMoreData, result.state); ASSERT_THAT(parsed_messages[0], @@ -912,15 +934,17 @@ TEST_P(HTTPParserTest, ParseHTTPResponsesWithLeftoverRepeatedly) { AddEvent(events[1]); absl::flat_hash_map> parsed_messages; - ParseResult result1 = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result1 = + ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, + /* resync */ false, &state); data_buffer_.RemovePrefix(result1.end_position); // Now add msg_splits[2]. AddEvent(events[2]); - ParseResult result2 = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result2 = + ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, + /* resync */ false, &state); ASSERT_EQ(ParseState::kSuccess, result2.state); ASSERT_THAT(parsed_messages[0], @@ -1056,8 +1080,9 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessage) { AddEvents(events); absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, - /* resync */ true, &state); + ParseResult result = + ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, + /* resync */ true, &state); // CreateEvents creates chunks starting at 0. // When the test loops, we end up with overlapping chunks. @@ -1080,8 +1105,9 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessage) { AddEvents(events); absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, - /* resync */ true, &state); + ParseResult result = + ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, + /* resync */ true, &state); // CreateEvents creates chunks starting at 0. // When the test loops, we end up with overlapping chunks. @@ -1106,8 +1132,9 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessageNoSync) { AddEvents(events); absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result = + ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, + /* resync */ false, &state); EXPECT_EQ(ParseState::kSuccess, result.state); EXPECT_THAT(parsed_messages[0], @@ -1123,8 +1150,9 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessageNoSync) { AddEvents(events); absl::flat_hash_map> parsed_messages; - ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, - /* resync */ false, &state); + ParseResult result = + ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, + /* resync */ false, &state); EXPECT_EQ(ParseState::kSuccess, result.state); EXPECT_THAT(parsed_messages[0], @@ -1150,7 +1178,7 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessageWithSync) { AddEvents(events); absl::flat_hash_map> parsed_messages; - ParseResult result; + ParseResult result; result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, /* resync */ false, &state); @@ -1180,7 +1208,7 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessageWithSync) { AddEvents(events); absl::flat_hash_map> parsed_messages; - ParseResult result; + ParseResult result; result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ false, &state); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/kafka/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/kafka/parse_test.cc index e4adbc6d475..20aca560532 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/kafka/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/kafka/parse_test.cc @@ -16,6 +16,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include @@ -78,12 +79,13 @@ TEST(KafkaParserTest, ParseMultipleRequests) { const std::string buf = absl::StrCat(produce_frame_view, metadata_frame_view); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; StateWrapper state; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1, expected_message2)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1, expected_message2)); EXPECT_TRUE(state.global.seen_correlation_ids.contains(expected_message1.correlation_id)); EXPECT_TRUE(state.global.seen_correlation_ids.contains(expected_message2.correlation_id)); } @@ -104,10 +106,11 @@ TEST(KafkaParserTest, ParseMultipleResponses) { const std::string buf = absl::StrCat(produce_frame_view, metadata_frame_view); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; StateWrapper state{.global = {{1, 4}}, .send = {}, .recv = {}}; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1, expected_message2)); + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1, expected_message2)); } TEST(KafkaParserTest, ParseIncompleteRequest) { @@ -115,22 +118,23 @@ TEST(KafkaParserTest, ParseIncompleteRequest) { CreateStringView(CharArrayStringView(testdata::kProduceRequest)); auto truncated_produce_frame = produce_frame_view.substr(0, produce_frame_view.size() - 1); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; StateWrapper state; - ParseResult result = + ParseResult result = ParseFramesLoop(message_type_t::kRequest, truncated_produce_frame, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); - EXPECT_THAT(parsed_messages, ElementsAre()); + EXPECT_THAT(parsed_messages[0], ElementsAre()); EXPECT_TRUE(state.global.seen_correlation_ids.empty()); } TEST(KafkaParserTest, ParseInvalidInput) { std::string msg1("\x00\x00\x18\x00\x03SELECT name FROM users;", 28); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; StateWrapper state; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kInvalid, result.state); EXPECT_THAT(parsed_messages, ElementsAre()); EXPECT_TRUE(state.global.seen_correlation_ids.empty()); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mysql/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/mysql/parse_test.cc index 64565eb76c2..60dd63f55f3 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mysql/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/mysql/parse_test.cc @@ -16,6 +16,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include @@ -82,8 +83,9 @@ TEST_F(MySQLParserTest, ParseRaw) { testutils::GenRawPacket(1, "\x03SELECT bar")); StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); Packet expected_message0; expected_message0.msg = "\x03SELECT foo"; @@ -94,7 +96,7 @@ TEST_F(MySQLParserTest, ParseRaw) { expected_message1.sequence_id = 1; EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message0, expected_message1)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message0, expected_message1)); } TEST_F(MySQLParserTest, ParseComStmtPrepare) { @@ -116,11 +118,12 @@ TEST_F(MySQLParserTest, ParseComStmtPrepare) { const std::string buf = absl::StrCat(msg1, msg2); StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1, expected_message2)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1, expected_message2)); } TEST_F(MySQLParserTest, ParseComStmtExecute) { @@ -135,11 +138,12 @@ TEST_F(MySQLParserTest, ParseComStmtExecute) { expected_message1.msg = absl::StrCat(CommandToString(Command::kStmtExecute), body); expected_message1.sequence_id = 0; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1)); } TEST_F(MySQLParserTest, ParseComStmtClose) { @@ -147,11 +151,12 @@ TEST_F(MySQLParserTest, ParseComStmtClose) { std::string msg = testutils::GenRawPacket(expected_packet); StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, msg, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, msg, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_packet)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_packet)); } TEST_F(MySQLParserTest, ParseComQuery) { @@ -169,11 +174,12 @@ TEST_F(MySQLParserTest, ParseComQuery) { const std::string buf = absl::StrCat(msg1, msg2); StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, buf, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(expected_message1, expected_message2)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_message1, expected_message2)); } TEST_F(MySQLParserTest, ParseResponse) { @@ -197,9 +203,9 @@ TEST_F(MySQLParserTest, ParseResponse) { Command::kStmtPrepare}; StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, kMySQLStmtPrepareMessage.response, - &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = ParseFramesLoop( + message_type_t::kResponse, kMySQLStmtPrepareMessage.response, &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); Packet expected_header; @@ -217,32 +223,37 @@ TEST_F(MySQLParserTest, ParseResponse) { expected_eof.msg = ConstStringView("\xfe\x00\x00\x02\x00"); expected_eof.sequence_id = 3; - EXPECT_THAT(parsed_messages, ElementsAre(expected_header, expected_col_def, expected_eof)); + EXPECT_THAT(parsed_messages[0], ElementsAre(expected_header, expected_col_def, expected_eof)); } TEST_F(MySQLParserTest, ParseMultipleRawPackets) { - std::deque prepare_resp_packets = + connection_id_t conn_id = 0; + std::deque prepare_resp_packets_deque = testutils::GenStmtPrepareOKResponse(testdata::kStmtPrepareResponse); - std::deque execute_resp_packets = + absl::flat_hash_map> prepare_resp_packets; + prepare_resp_packets[conn_id] = prepare_resp_packets_deque; + std::deque execute_resp_packets_deque = testutils::GenResultset(testdata::kStmtExecuteResultset); + absl::flat_hash_map> execute_resp_packets; + execute_resp_packets[conn_id] = execute_resp_packets_deque; // Splitting packets from 2 responses into 3 different raw packet chunks. std::vector packets1; for (size_t i = 0; i < 3; ++i) { - packets1.push_back(testutils::GenRawPacket(prepare_resp_packets[i])); + packets1.push_back(testutils::GenRawPacket(prepare_resp_packets[conn_id][i])); } std::vector packets2; - for (size_t i = 3; i < prepare_resp_packets.size(); ++i) { - packets2.push_back(testutils::GenRawPacket(prepare_resp_packets[i])); + for (size_t i = 3; i < prepare_resp_packets[conn_id].size(); ++i) { + packets2.push_back(testutils::GenRawPacket(prepare_resp_packets[conn_id][i])); } for (size_t i = 0; i < 2; ++i) { - packets2.push_back(testutils::GenRawPacket(execute_resp_packets[i])); + packets2.push_back(testutils::GenRawPacket(execute_resp_packets[conn_id][i])); } std::vector packets3; - for (size_t i = 2; i < execute_resp_packets.size(); ++i) { - packets3.push_back(testutils::GenRawPacket(execute_resp_packets[i])); + for (size_t i = 2; i < execute_resp_packets[conn_id].size(); ++i) { + packets3.push_back(testutils::GenRawPacket(execute_resp_packets[conn_id][i])); } std::string chunk1 = absl::StrJoin(packets1, ""); @@ -252,19 +263,20 @@ TEST_F(MySQLParserTest, ParseMultipleRawPackets) { const std::string buf = absl::StrCat(chunk1, chunk2, chunk3); StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, buf, &parsed_messages, &state); - std::deque expected_packets; - for (Packet p : prepare_resp_packets) { - expected_packets.push_back(p); + absl::flat_hash_map> expected_packets; + for (const Packet& p : prepare_resp_packets[conn_id]) { + expected_packets[conn_id].push_back(p); } - for (Packet p : execute_resp_packets) { - expected_packets.push_back(p); + for (const Packet& p : execute_resp_packets[conn_id]) { + expected_packets[conn_id].push_back(p); } - EXPECT_EQ(expected_packets.size(), parsed_messages.size()); - EXPECT_THAT(parsed_messages, ElementsAreArray(expected_packets)); + EXPECT_EQ(expected_packets[conn_id].size(), parsed_messages[conn_id].size()); + EXPECT_THAT(parsed_messages[conn_id], ::testing::ElementsAreArray(expected_packets[conn_id])); } TEST_F(MySQLParserTest, ParseIncompleteRequest) { @@ -274,30 +286,33 @@ TEST_F(MySQLParserTest, ParseIncompleteRequest) { msg1[0] = '\x24'; StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kNeedsMoreData, result.state); - EXPECT_THAT(parsed_messages, ElementsAre()); + EXPECT_THAT(parsed_messages[0], ElementsAre()); } TEST_F(MySQLParserTest, ParseInvalidInput) { std::string msg1 = "hello world"; StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kRequest, msg1, &parsed_messages, &state); EXPECT_EQ(ParseState::kInvalid, result.state); - EXPECT_THAT(parsed_messages, ElementsAre()); + EXPECT_THAT(parsed_messages[0], ElementsAre()); } TEST_F(MySQLParserTest, Empty) { StateWrapper state{}; - std::deque parsed_messages; - ParseResult result = ParseFramesLoop(message_type_t::kResponse, "", &parsed_messages, &state); + absl::flat_hash_map> parsed_messages; + ParseResult result = + ParseFramesLoop(message_type_t::kResponse, "", &parsed_messages, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre()); + EXPECT_THAT(parsed_messages[0], ElementsAre()); } //=============================================================================