Skip to content

Commit

Permalink
Address compilation issues with auto generated amqp files (#1820)
Browse files Browse the repository at this point in the history
Summary: Address compilation issues with auto generated amqp files

When reviewing #1816, I realized that there was drift from the amqp
template files and the resulting code. The updated stitcher interface
was propagated to the [auto generated
files](e04a764#diff-cece0214fed75080ef2a368fa37e9addb7c1c407c0463038c18c7c48af297ce7R210),
but not the template itself. As for the other changes, I wasn't able to
track down why they didn't exist in the template file but these changes
were necessary to get the code compiling again.

Once this is merged, #1816 should be easy to merge afterwards.

Relevant Issues: #1816

Type of change: /kind cleanup

Test Plan: amqp trace bpf test passes

Signed-off-by: Dom Del Nano <[email protected]>
  • Loading branch information
ddelnano authored Jan 23, 2024
1 parent 892cfc6 commit 4018018
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ Bazel cmds:
```
wget "https://www.rabbitmq.com/resources/specs/amqp0-9-1.xml"
bazel run //src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator:amqp_code_gen_main -- run
cp generated_files/decode.cc ../
cp generated_files/decode.h ../
cp generated_files/types_gen.h ../
cp generated_files/{decode.h,decode.cc,types_gen.h} ../
cp generated_files/amqp.h src/carnot/funcs/protocols/amqp.h
arc lint
```


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def gen_json_builder(self):
The field name is used as json key to be close to the key used in spec
"""
if self.field_type == FieldType.table:
return f"// TODO: support KV for {self.field_name} field table type"
return f"// TODO(vsrivatsa): support KV for {self.field_name} field table type"
return f'builder->WriteKV("{self.c_field_name}", {self.c_field_name});'

def gen_buffer_extract(self):
Expand Down Expand Up @@ -694,7 +694,7 @@ def gen_process_frame_type(self):
AMQPFrameTypes amqp_frame_type = static_cast<AMQPFrameTypes>(req->frame_type);
switch (amqp_frame_type) {
case AMQPFrameTypes::kFrameHeader:
return ProcessContentHeader(&decoder, req);
return ProcessContentHeader(decoder, req);
case AMQPFrameTypes::kFrameBody: {
req->msg = "";
auto status = decoder->ExtractBufIgnore(req->payload_size);
Expand All @@ -707,7 +707,7 @@ def gen_process_frame_type(self):
req->msg = "";
break; // Heartbeat frames have no body or length
case AMQPFrameTypes::kFrameMethod:
return ProcessFrameMethod(&decoder, req);
return ProcessFrameMethod(decoder, req);
default:
VLOG(1) << absl::Substitute("Unparsed frame $0", req->frame_type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ namespace amqp {

{{ struct_declr }}

// TODO combine with kafka ToString function
template <typename T>
std::string ToString(T obj) {
utils::JSONObjectBuilder json_object_builder;
obj.ToJSON(&json_object_builder);
return json_object_builder.GetString();
}
// TODO(vsrivatsa) combine with kafka ToString function
Status ProcessPayload(Frame* req, BinaryDecoder* decoder);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ enum class AMQPFrameTypes : uint8_t {
kFrameBody = 3,
kFrameHeartbeat = 8,
};
const char kFrameEnd = 0xCE;
const uint8_t kFrameEnd = 0xCE;
const uint8_t kMinFrameLength = 8;

// Represents a generic AMQP message.
Expand Down Expand Up @@ -83,6 +83,11 @@ struct Frame : public FrameBase {
uint16_t method_id = 0;

size_t ByteSize() const override { return sizeof(Frame) + msg.size(); }

std::string ToString() const override {
return absl::Substitute("frame_type=[$0] channel=[$1] payload_size=[$2] msg=[$3]", frame_type,
channel, payload_size, msg);
}
};

struct Record {
Expand All @@ -96,14 +101,16 @@ struct Record {
std::string px_info = "";

std::string ToString() const {
return absl::Substitute("req=[$0] resp=[$1]", req->ToString(), resp->ToString());
return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString());
}
};

using channel_id = uint16_t;
struct ProtocolTraits : public BaseProtocolTraits<Record> {
using frame_type = Frame;
using record_type = Record;
using state_type = NoState;
using key_type = channel_id;
};

} // namespace amqp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
// Code generated by AMQP protocol generator. DO NOT EDIT.
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h"

#include <map>
#include <stack>
#include <string>
#include <utility>
#include <vector>

#include "src/common/base/base.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h"
Expand All @@ -44,6 +40,7 @@ StatusOr<std::string> ExtractLongString(BinaryDecoder* decoder) {
PX_ASSIGN_OR_RETURN(uint32_t len, decoder->ExtractBEInt<uint32_t>());
return decoder->ExtractString(len);
}

StatusOr<bool> ExtractNthBit(BinaryDecoder* decoder, int n) {
// Extract Value at Nth bit
return decoder->Buf()[0] >> n & 1;
Expand Down Expand Up @@ -726,7 +723,7 @@ Status ProcessContentHeader(BinaryDecoder* decoder, Frame* req) {
return ExtractAMQPTxContentHeader(decoder, req);

default:
VLOG(1) << absl::Substitute("Unparsed frame method class $0", class_id);
VLOG(1) << absl::Substitute("Unparsed content header class $0", class_id);
}
return Status::OK();
}
Expand Down Expand Up @@ -967,6 +964,7 @@ Status ProcessFrameMethod(BinaryDecoder* decoder, Frame* req) {
default:
VLOG(1) << absl::Substitute("Unparsed frame method class $0 method $1", class_id, method_id);
}

return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#pragma once

#include <string>
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h"

#include "src/common/base/base.h"
#include "src/common/json/json.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h"
#include "src/stirling/utils/binary_decoder.h"

namespace px {
Expand All @@ -41,7 +41,7 @@ struct AMQPConnectionStart {
void ToJSON(utils::JSONObjectBuilder* builder) const {
builder->WriteKV("version_major", version_major);
builder->WriteKV("version_minor", version_minor);
// TODO(vsrivatsa): support KV for server_properties field table type
// TODO(vsrivatsa): support KV for server-properties field table type
builder->WriteKV("mechanisms", mechanisms);
builder->WriteKV("locales", locales);
}
Expand All @@ -55,7 +55,7 @@ struct AMQPConnectionStartOk {
bool synchronous = 1;

void ToJSON(utils::JSONObjectBuilder* builder) const {
// TODO(vsrivatsa): support KV for client_properties field table type
// TODO(vsrivatsa): support KV for client-properties field table type
builder->WriteKV("mechanism", mechanism);
builder->WriteKV("response", response);
builder->WriteKV("locale", locale);
Expand Down Expand Up @@ -719,6 +719,7 @@ std::string ToString(T obj) {
obj.ToJSON(&json_object_builder);
return json_object_builder.GetString();
}
// TODO(vsrivatsa) combine with kafka ToString function
Status ProcessPayload(Frame* req, BinaryDecoder* decoder);

} // namespace amqp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
// Code generated by AMQP protocol generator. DO NOT EDIT.
#pragma once

#include <string>

#include "src/common/base/base.h"
Expand Down Expand Up @@ -156,30 +157,26 @@ enum class AMQPFrameTypes : uint8_t {
};
const uint8_t kFrameEnd = 0xCE;
const uint8_t kMinFrameLength = 8;
constexpr uint8_t kEndByteSize = 1;
const uint8_t kMinFrameWithoutEnd = 7;

// Represents a generic AMQP message.
struct Frame : public FrameBase {
// Marks end of the frame by hexadecimal value %xCE

uint8_t frame_type = 0;
uint8_t frame_type;

// Communication channel to be used
uint16_t channel = 0;
uint16_t channel;

// Defines the length of message upcoming
uint32_t payload_size = 0;
uint32_t payload_size;

// Actual body content to be used
std::string msg = "";
std::string msg;

// sync value only known after full body parsing
bool synchronous = false;

// `consumed` is used to mark if a request packet has been matched to a
// `consumed` is used to mark if a request frame has been matched to a
// response in StitchFrames. This is an optimization to efficiently remove all
// matched packets from the front of the deque.
// matched frames from the front of the deque.
bool consumed = false;

// if full body parsing already done
Expand All @@ -197,11 +194,15 @@ struct Frame : public FrameBase {
};

struct Record {
// AMQP record can support both sync and async frames.
// async frames have either req/resp set
// sync frames have both req & resp set
Frame req;
Frame resp;

// Debug information.
std::string px_info = "";

std::string ToString() const {
return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString());
}
Expand Down

0 comments on commit 4018018

Please sign in to comment.