Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tcp_stats] Use kfuncs to probe the tcp_sendmsg function #2072

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/stirling/bpf_tools/bcc_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,25 @@ Status BCCWrapperImpl::AttachSamplingProbe(const SamplingProbeSpec& probe) {
return AttachPerfEvent(perf_event_spec);
}

Status BCCWrapperImpl::AttachKFunc(const KFuncSpec& probe) {
VLOG(1) << absl::Substitute("Deploying kfunc $0", probe.ToString());

int prog_fd;
auto res = bpf_.load_func(probe.kfunc, BPF_PROG_TYPE_TRACING, prog_fd);
LOG_IF(ERROR, !res.ok()) << res.msg();

int attach_fd = bpf_attach_kfunc(prog_fd);
if (attach_fd < 0) {
bpf_.unload_func(probe.kfunc);
return error::Internal("Unable to attach kfunc");
}

kfuncs_.push_back(probe);
++num_attached_kfuncs_;

return Status::OK();
}

Status BCCWrapperImpl::AttachKProbes(const ArrayView<KProbeSpec>& probes) {
for (const KProbeSpec& p : probes) {
PX_RETURN_IF_ERROR(AttachKProbe(p));
Expand Down Expand Up @@ -271,6 +290,13 @@ Status BCCWrapperImpl::AttachSamplingProbes(const ArrayView<SamplingProbeSpec>&
return Status::OK();
}

Status BCCWrapperImpl::AttachKFuncs(const ArrayView<KFuncSpec>& probes) {
for (const KFuncSpec& p : probes) {
PX_RETURN_IF_ERROR(AttachKFunc(p));
}
return Status::OK();
}

// This will replace the XDP program previously-attached on the the same device.
// Newer kernel allows attaching multiple XDP programs on the same device:
// https://lwn.net/Articles/801478/
Expand Down Expand Up @@ -323,6 +349,15 @@ Status BCCWrapperImpl::DetachTracepoint(const TracepointSpec& probe) {
return Status::OK();
}

Status BCCWrapperImpl::UnloadKFunc(const KFuncSpec& probe) {
VLOG(1) << absl::Substitute("Unloading kfunc $0", probe.ToString());

PX_RETURN_IF_ERROR(bpf_.unload_func(probe.kfunc));

--num_attached_kfuncs_;
return Status::OK();
}

void BCCWrapperImpl::DetachKProbes() {
for (const auto& p : kprobes_) {
auto res = DetachKProbe(p);
Expand All @@ -347,6 +382,14 @@ void BCCWrapperImpl::DetachTracepoints() {
tracepoints_.clear();
}

void BCCWrapperImpl::UnloadKFuncs() {
for (const auto& p : kfuncs_) {
auto res = UnloadKFunc(p);
LOG_IF(ERROR, !res.ok()) << res.msg();
}
kfuncs_.clear();
}

int BCCWrapperImpl::CommonPerfBufferSetup(const PerfBufferSpec& perf_buffer_spec) {
DCHECK(perf_buffer_spec.cb_cookie != nullptr) << "perf_buffer_spec.cb_cookie must be non-null.";
DCHECK(perf_buffer_spec.size_bytes > 0) << "perf_buffer_spec.cb_cookie must greater than zero.";
Expand Down Expand Up @@ -465,6 +508,7 @@ void BCCWrapperImpl::Close() {
DetachKProbes();
DetachUProbes();
DetachTracepoints();
UnloadKFuncs();
}

Status RecordingBCCWrapperImpl::OpenPerfBuffer(const PerfBufferSpec& perf_buffer_spec) {
Expand Down
24 changes: 23 additions & 1 deletion src/stirling/bpf_tools/bcc_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ class BCCWrapper {
*/
virtual Status AttachSamplingProbe(const SamplingProbeSpec& probe) = 0;

/**
* Attach a single kfunc.
* @param probe Specifications of the kfunc.
* @return Error if probe fails to attach.
*/
virtual Status AttachKFunc(const KFuncSpec& probe) = 0;

/**
* Open a perf buffer for reading events.
* @param perf_buff Specifications of the perf buffer (name, callback function, etc.).
Expand Down Expand Up @@ -182,6 +189,13 @@ class BCCWrapper {
*/
virtual Status AttachSamplingProbes(const ArrayView<SamplingProbeSpec>& probes) = 0;

/**
* Convenience function that attaches multiple kfuncs.
* @param probes Vector of probes.
* @return Error of first probe to fail to attach (remaining probe attachments are not attempted).
*/
virtual Status AttachKFuncs(const ArrayView<KFuncSpec>& probes) = 0;

/**
* Convenience function that attaches a XDP program.
*/
Expand Down Expand Up @@ -260,6 +274,7 @@ class BCCWrapper {
inline static size_t num_attached_tracepoints_;
inline static size_t num_open_perf_buffers_;
inline static size_t num_attached_perf_events_;
inline static size_t num_attached_kfuncs_;

private:
// This is shared by all source connectors that uses BCCWrapper.
Expand Down Expand Up @@ -289,12 +304,14 @@ class BCCWrapperImpl : public BCCWrapper {
Status AttachUProbe(const UProbeSpec& probe) override;
Status AttachTracepoint(const TracepointSpec& probe) override;
Status AttachSamplingProbe(const SamplingProbeSpec& probe) override;
Status AttachKFunc(const KFuncSpec& probe) override;
Status OpenPerfBuffer(const PerfBufferSpec& perf_buffer) override;
Status AttachPerfEvent(const PerfEventSpec& perf_event) override;
Status AttachKProbes(const ArrayView<KProbeSpec>& probes) override;
Status AttachTracepoints(const ArrayView<TracepointSpec>& probes) override;
Status AttachUProbes(const ArrayView<UProbeSpec>& uprobes) override;
Status AttachSamplingProbes(const ArrayView<SamplingProbeSpec>& probes) override;
Status AttachKFuncs(const ArrayView<KFuncSpec>& probes) override;
Status AttachXDP(const std::string& dev_name, const std::string& fn_name) override;
Status OpenPerfBuffers(const ArrayView<PerfBufferSpec>& perf_buffers) override;
Status AttachPerfEvents(const ArrayView<PerfEventSpec>& perf_events) override;
Expand All @@ -316,14 +333,16 @@ class BCCWrapperImpl : public BCCWrapper {
Status DetachUProbe(const UProbeSpec& probe);
Status DetachTracepoint(const TracepointSpec& probe);
Status DetachPerfEvent(const PerfEventSpec& perf_event);
Status UnloadKFunc(const KFuncSpec& probe);

// Detaches all kprobes/uprobes/perf buffers/perf events that were attached by the wrapper.
// Detaches all kprobes/uprobes/perf buffers/perf events/kfuncs that were attached by the wrapper.
// If any fails to detach, an error is logged, and the function continues.
void DetachKProbes();
void DetachUProbes();
void DetachTracepoints();
void ClosePerfBuffers();
void DetachPerfEvents();
void UnloadKFuncs();

// Returns the name that identifies the target to attach this k-probe.
std::string GetKProbeTargetName(const KProbeSpec& probe);
Expand All @@ -332,6 +351,7 @@ class BCCWrapperImpl : public BCCWrapper {
std::vector<UProbeSpec> uprobes_;
std::vector<TracepointSpec> tracepoints_;
std::vector<PerfEventSpec> perf_events_;
std::vector<KFuncSpec> kfuncs_;

protected:
std::vector<PerfBufferSpec> perf_buffer_specs_;
Expand Down Expand Up @@ -395,11 +415,13 @@ class ReplayingBCCWrapperImpl : public BCCWrapper {
Status AttachUProbe(const UProbeSpec&) override { return Status::OK(); }
Status AttachTracepoint(const TracepointSpec&) override { return Status::OK(); }
Status AttachSamplingProbe(const SamplingProbeSpec&) override { return Status::OK(); }
Status AttachKFunc(const KFuncSpec&) override { return Status::OK(); }
Status AttachPerfEvent(const PerfEventSpec&) override { return Status::OK(); }
Status AttachKProbes(const ArrayView<KProbeSpec>&) override { return Status::OK(); }
Status AttachTracepoints(const ArrayView<TracepointSpec>&) override { return Status::OK(); }
Status AttachUProbes(const ArrayView<UProbeSpec>&) override { return Status::OK(); }
Status AttachSamplingProbes(const ArrayView<SamplingProbeSpec>&) override { return Status::OK(); }
Status AttachKFuncs(const ArrayView<KFuncSpec>&) override { return Status::OK(); }
Status AttachXDP(const std::string&, const std::string&) override { return Status::OK(); }
Status AttachPerfEvents(const ArrayView<PerfEventSpec>&) override { return Status::OK(); }
Status PopulateBPFPerfArray(const std::string&, const uint32_t, const uint64_t) override {
Expand Down
9 changes: 9 additions & 0 deletions src/stirling/bpf_tools/probe_specs/probe_specs.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ struct TracepointSpec {
}
};

/**
* Describes a probe on a kernel function.
*/
struct KFuncSpec {
std::string kfunc;

std::string ToString() const { return absl::Substitute("[kfunc=$0]", kfunc); }
};

/**
* Describes a sampling probe that triggers according to a time period.
* This is in contrast to KProbes and UProbes, which trigger based on
Expand Down
27 changes: 12 additions & 15 deletions src/stirling/source_connectors/tcp_stats/bcc_bpf/tcp_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,6 @@ static int tcp_sendstat(struct pt_regs* ctx, uint32_t tgid, uint32_t id, int siz
return 0;
}

int probe_ret_tcp_sendmsg(struct pt_regs* ctx) {
int size = PT_REGS_RC(ctx);
if (size > 0) {
uint64_t id = bpf_get_current_pid_tgid();
uint32_t tgid = id >> 32;
return tcp_sendstat(ctx, tgid, id, size);
} else {
return 0;
}
}

int probe_ret_tcp_sendpage(struct pt_regs* ctx) {
int size = PT_REGS_RC(ctx);
if (size > 0) {
Expand All @@ -131,10 +120,6 @@ int probe_entry_tcp_sendpage(struct pt_regs* ctx, struct sock* sk, struct page*
return tcp_send_entry(sk);
}

int probe_entry_tcp_sendmsg(struct pt_regs* ctx, struct sock* sk, struct msghdr* msg, size_t size) {
return tcp_send_entry(sk);
}

int probe_entry_tcp_cleanup_rbuf(struct pt_regs* ctx, struct sock* sk, int copied) {
if (copied <= 0) {
return 0;
Expand Down Expand Up @@ -226,3 +211,15 @@ int probe_entry_tcp_retransmit_skb(struct pt_regs* ctx, struct sock* skp, struct
tcp_events.perf_submit(ctx, &event, sizeof(event));
return 0;
}

KFUNC_PROBE(tcp_sendmsg, struct sock* sk) { return tcp_send_entry(sk); }

KRETFUNC_PROBE(tcp_sendmsg, struct sock* sk, struct msghdr* msg, size_t size, int ret) {
if (ret > 0) {
uint64_t id = bpf_get_current_pid_tgid();
uint32_t tgid = id >> 32;
return tcp_sendstat((struct pt_regs*)(ctx), tgid, id, ret);
} else {
return 0;
}
}
22 changes: 19 additions & 3 deletions src/stirling/source_connectors/tcp_stats/tcp_stats_connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ constexpr uint32_t kPerfBufferPerCPUSizeBytes = 50 * 1024 * 1024;
using ProbeType = bpf_tools::BPFProbeAttachType;

const auto kProbeSpecs = MakeArray<bpf_tools::KProbeSpec>(
{{"tcp_sendmsg", ProbeType::kEntry, "probe_entry_tcp_sendmsg", /*is_syscall*/ false},
{"tcp_sendmsg", ProbeType::kReturn, "probe_ret_tcp_sendmsg", /*is_syscall*/ false},
{"tcp_cleanup_rbuf", ProbeType::kEntry, "probe_entry_tcp_cleanup_rbuf", /*is_syscall*/ false},
{{"tcp_cleanup_rbuf", ProbeType::kEntry, "probe_entry_tcp_cleanup_rbuf", /*is_syscall*/ false},
{"tcp_retransmit_skb", ProbeType::kEntry, "probe_entry_tcp_retransmit_skb",
/*is_syscall*/ false}});

const auto kSendPageProbeSpecs = MakeArray<bpf_tools::KProbeSpec>(
{{"tcp_sendpage", ProbeType::kEntry, "probe_entry_tcp_sendpage", /*is_syscall*/ false},
{"tcp_sendpage", ProbeType::kReturn, "probe_ret_tcp_sendpage", /*is_syscall*/ false}});

const auto kFuncSpecs = MakeArray<bpf_tools::KFuncSpec>(
{{"kfunc__vmlinux__tcp_sendmsg"}, {"kretfunc__vmlinux__tcp_sendmsg"}});

void HandleTcpEvent(void* cb_cookie, void* data, int /*data_size*/) {
auto* connector = reinterpret_cast<TCPStatsConnector*>(cb_cookie);
auto* event = reinterpret_cast<struct tcp_event_t*>(data);
Expand Down Expand Up @@ -82,8 +83,23 @@ Status TCPStatsConnector::InitImpl() {
"was removed in Kernel 6.5.",
sendpage_attach_status.msg(), kernel_version.ToString());
}

// We deploy kfunc probes for the tcp_sendmsg function to side step a probe insertion conflict
// with the socket tracer since it already deploys kprobes for the tcp_sendmsg function.
const auto kfunc_attach_status = bcc_->AttachKFuncs(kFuncSpecs);
if (!kfunc_attach_status.ok()) {
const auto kernel_version = system::GetCachedKernelVersion();
LOG(INFO) << absl::Substitute(
"Could not attach kfunc probes for tcp_sendmsg: $0, detected kernel version: $1. Note: "
"kfunc probes are supported on kernel version 5.5 and newer.",
kfunc_attach_status.msg(), kernel_version.ToString());
} else {
LOG(INFO) << absl::Substitute("Successfully deployed $0 kfunc probes.", kFuncSpecs.size());
}

PX_RETURN_IF_ERROR(bcc_->OpenPerfBuffers(perf_buffer_specs));
LOG(INFO) << absl::Substitute("Successfully deployed $0 kprobes.", kProbeSpecs.size());

return Status::OK();
}

Expand Down
Loading