Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/dbus call issue #2838

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions crates/libcgroups/src/systemd/dbus_native/dbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};

use nix::errno::Errno;
use nix::sys::socket;

use super::client::SystemdClient;
Expand Down Expand Up @@ -228,9 +229,10 @@ impl DbusConnection {
.filter(|m| m.preamble.mtype == MessageType::MethodReturn)
.collect();

let res = res.first().ok_or(DbusError::MethodCallErr(
"expected method call to have reply, found no reply message".into(),
))?;
let res = res.first().ok_or(DbusError::AuthenticationErr(format!(
"expected Hello call to have reply, found no reply message, got {:?} instead",
res
)))?;
let mut ctr = 0;
let id = String::deserialize(&res.body, &mut ctr)?;
self.id = Some(id);
Expand All @@ -247,13 +249,18 @@ impl DbusConnection {
let mut reply: [u8; REPLY_BUF_SIZE] = [0_u8; REPLY_BUF_SIZE];
let mut reply_buffer = [IoSliceMut::new(&mut reply[0..])];

let reply_rcvd = socket::recvmsg::<()>(
let reply_res = socket::recvmsg::<()>(
self.socket,
&mut reply_buffer,
None,
socket::MsgFlags::empty(),
)?;
);

let reply_rcvd = match reply_res {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Ok(msg) => msg,
Err(Errno::EAGAIN) => continue,
Err(e) => return Err(e.into()),
};
let received_byte_count = reply_rcvd.bytes;

ret.extend_from_slice(&reply[0..received_byte_count]);
Expand Down Expand Up @@ -296,20 +303,47 @@ impl DbusConnection {
None,
)?;

let reply = self.receive_complete_response()?;

// note that a single received response can contain multiple
// messages, so we must deserialize it piece by piece
let mut ret = Vec::new();
let mut buf = &reply[..];

while !buf.is_empty() {
let mut ctr = 0;
let msg = Message::deserialize(&buf[ctr..], &mut ctr)?;
// we reset the buf, because I couldn't figure out how the adjust_counter function
// should should be changed to work correctly with non-zero start counter, and this solved that issue
buf = &buf[ctr..];
ret.push(msg);

// it is possible that while receiving messages, we get some extra/previous message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know why we get these extra messages?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now the only extra messages that are observed are of type signal, which basically broadcast some event to all connected clients. The thing is we used to get these before as well, but usually these would get consumed in buffers of some other method calls or such, and get ignored. In few cases that would not happen, which lead to the dbus error in question. To solve this, now we keep consuming all messages until we get method return or error message (in case of method call message).

// for method calls, we need to have an error or method return type message, so
// we keep looping until we get either of these. see https://github.com/containers/youki/issues/2826
// for more detailed analysis.
loop {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand why this loop is necessary. Shouldn't receive_complete_response have read everything from the socket the first time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way receive_complete_response is implemented, it loops internally to read from socket into a buffer, and if read_bytes < buffer_size, it returns the buffer. This is because dbus uses same socket for communications, and the read will block if there is no further data to read. Hence we return once the received data is less than buffer.
Unfortunately, the dbus messages can come in multiple bursts, so we can have two messages that get delivered together, or they can have few milliseconds between them, and thus one read call might only give single message < size of buffer. We need to loop here to keep consuming messages until we find one that we are looking for, which in this case is the method_return or error type.

let reply = self.receive_complete_response()?;

// note that a single received response can contain multiple
// messages, so we must deserialize it piece by piece
let mut buf = &reply[..];

while !buf.is_empty() {
let mut ctr = 0;
let msg = Message::deserialize(&buf[ctr..], &mut ctr)?;
// we reset the buf, because I couldn't figure out how the adjust_counter function
// should should be changed to work correctly with non-zero start counter, and this solved that issue
buf = &buf[ctr..];
ret.push(msg);
}

// in Youki, we only ever do method call apart from initial auth
// in case it is, we don't really have a specific message to look
// out of, so we take the buffer and break
if mtype != MessageType::MethodCall {
break;
}

// check if any of the received message is method return or error type
let return_message_count = ret
.iter()
.filter(|m| {
m.preamble.mtype == MessageType::MethodReturn
|| m.preamble.mtype == MessageType::Error
})
.count();

if return_message_count > 0 {
break;
}
Comment on lines +328 to +346
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like suggestion for other ways to implement this. Ideally I feel that send_message function should not have to deal with the specific message type, but as we collect the response here, we need to account for it. Other way I can think is if we separate the send message and get response functions, so that the higher level calling functions can choose to loop over the get response. But that means we would have to do two function calls at each place we are doing a send message call. Would that be a better way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that means we would have to do two function calls at each place we are doing a send message call. Would that be a better way?

I do not think it is. The complexity of should be contained in send_message. The caller should not have to think about it.

}
Ok(ret)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/libcgroups/src/systemd/dbus_native/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl HeaderSignature {
}

/// Type of message
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum MessageType {
MethodCall,
MethodReturn,
Expand Down
8 changes: 5 additions & 3 deletions crates/libcgroups/src/systemd/dbus_native/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl<'conn> Proxy<'conn> {
member: &str,
body: Option<Body>,
) -> Result<Output> {
tracing::trace!("dbus call at interface {} member {}", interface, member);
let mut headers = Vec::with_capacity(4);

// create necessary headers
Expand Down Expand Up @@ -114,9 +115,10 @@ impl<'conn> Proxy<'conn> {
// we are only going to consider first reply, cause... so.
// realistically there should only be at most one method return type of message
// for a method call
let reply = reply.first().ok_or(DbusError::MethodCallErr(
"expected to get a reply for method call, didn't get any".into(),
))?;
let reply = reply.first().ok_or(DbusError::MethodCallErr(format!(
"expected to get a reply for method call, got {:?} instead",
reply_messages
)))?;

let headers = &reply.headers;
let expected_signature = Output::get_signature();
Expand Down
2 changes: 1 addition & 1 deletion crates/libcontainer/src/container/builder_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl ContainerBuilderImpl {
let (init_pid, need_to_clean_up_intel_rdt_dir) =
process::container_main_process::container_main_process(&container_args).map_err(
|err| {
tracing::error!(?err, "failed to run container process");
tracing::error!("failed to run container process {}", err);
LibcontainerError::MainProcess(err)
},
)?;
Expand Down
8 changes: 8 additions & 0 deletions crates/libcontainer/src/process/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum ChannelError {
MissingSeccompFds,
#[error("exec process failed with error {0}")]
ExecError(String),
#[error("intermediate process error {0}")]
OtherError(String),
}

/// Channel Design
Expand Down Expand Up @@ -83,6 +85,11 @@ impl MainSender {
Ok(())
}

pub fn send_error(&mut self, err: String) -> Result<(), ChannelError> {
self.sender.send(Message::OtherError(err))?;
Ok(())
}

pub fn close(&self) -> Result<(), ChannelError> {
self.sender.close()?;

Expand Down Expand Up @@ -110,6 +117,7 @@ impl MainReceiver {
match msg {
Message::IntermediateReady(pid) => Ok(Pid::from_raw(pid)),
Message::ExecFailed(err) => Err(ChannelError::ExecError(err)),
Message::OtherError(err) => Err(ChannelError::OtherError(err)),
msg => Err(ChannelError::UnexpectedMessage {
expected: Message::IntermediateReady(0),
received: msg,
Expand Down
12 changes: 11 additions & 1 deletion crates/libcontainer/src/process/container_main_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,17 @@ pub fn container_main_process(container_args: &ContainerArgs) -> Result<(Pid, bo
) {
Ok(_) => 0,
Err(err) => {
tracing::error!(?err, "failed to run intermediate process");
tracing::error!("failed to run intermediate process {}", err);
match main_sender.send_error(err.to_string()) {
Ok(_) => {}
Err(e) => {
tracing::error!(
"error in sending intermediate error message {} to main: {}",
err,
e
)
}
}
-1
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/libcontainer/src/process/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub enum Message {
SeccompNotify,
SeccompNotifyDone,
ExecFailed(String),
OtherError(String),
}

impl fmt::Display for Message {
Expand All @@ -24,6 +25,7 @@ impl fmt::Display for Message {
Message::SeccompNotify => write!(f, "SeccompNotify"),
Message::SeccompNotifyDone => write!(f, "SeccompNotifyDone"),
Message::ExecFailed(s) => write!(f, "ExecFailed({})", s),
Message::OtherError(s) => write!(f, "OtherError({})", s),
}
}
}
Loading