Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefer close() to shutdown(), it waits for finish #98

Merged
merged 2 commits into from
Aug 9, 2023

Conversation

carver
Copy link
Contributor

@carver carver commented Jul 27, 2023

Fix #89

stream.close().await will not return until the connection exits, either from an error, or from all data being written and ACKed.

To simplify, shutdown() is no longer part of the public API.

@carver carver requested a review from jacobkaufmann July 31, 2023 02:42
@carver carver self-assigned this Jul 31, 2023
Copy link
Collaborator

@jacobkaufmann jacobkaufmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took an initial pass. need to look more at the mock stuff, but I think it's in the right general direction!

src/testutils.rs Outdated
Comment on lines 53 to 57
let result = self.outbound.send(buf.to_vec());
if result.is_err() {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"channel closed: {result:?}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = self.outbound.send(buf.to_vec());
if result.is_err() {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"channel closed: {result:?}",
if let Err(err) = self.outbound.send(buf.to_vec()) {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"channel closed: {err}",

@carver carver force-pushed the blocking-close-api branch from 659adf4 to 159d80d Compare August 3, 2023 17:58
@carver carver requested a review from jacobkaufmann August 3, 2023 17:59
Copy link
Collaborator

@jacobkaufmann jacobkaufmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that once we resolve these comments we will be ready to merge

tests/stream.rs Outdated

// Test that close() returns a timeout, if recipient is not ACKing (after a successful connection)
#[tokio::test]
async fn that_close_errors_if_all_packets_dropped() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn that_close_errors_if_all_packets_dropped() {
async fn close_errors_if_all_packets_dropped() {

Comment on lines +108 to +106
socket_a: &MockUdpSocket,
socket_b: &MockUdpSocket,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's nothing here that enforces that A's only peer is B and vice versa. if that doesn't hold, then the connection ID pair doesn't seem to be a "pair"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code that this comment is referring to is intended to enforce the pair: https://github.com/ethereum/utp/pull/98/files#r1285253920

So I think "pair" still works.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above, but I think it's better to give this back to the caller along with the socket upon creation of the link

src/stream.rs Outdated
@@ -101,10 +104,21 @@ where
Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err:?}"))),
}
}

/// Close this connection, after finishing the pending writes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
/// Close this connection, after finishing the pending writes
/// Closes the stream gracefully.
/// Completes when the remote peer acknowledges all sent data.

src/testutils.rs Outdated
Comment on lines 53 to 54
let result = self.outbound.send(buf.to_vec());
if let Err(err) = result {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = self.outbound.send(buf.to_vec());
if let Err(err) = result {
if let Err(err) = self.outbound.send(buf.to_vec()) {

src/testutils.rs Outdated
pub only_peer: char,
/// Watch for signal that the link is up. If not up, link will SILENTLY drop all sent packets.
/// If up_status is None, the link is always up.
up_status: Option<watch::Receiver<bool>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use something like an AtomicBool here? it seems unnecessary to have to worry about the None variant here, and that we just want a "switch" to open and close the link.

tests/stream.rs Outdated
Comment on lines 153 to 154
// After adding some mechanism that sends a RESET packet, add the following test:
// Test that close() returns ConnectionReset if recipient resets during transfer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this to an issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻 #102

src/testutils.rs Outdated
Comment on lines 81 to 79
pub fn build_link_pair() -> (MockUdpSocket, MockUdpSocket) {
let (peer_a, peer_b): (char, char) = ('A', 'B');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would prefer something more in line with the (tx, rx) = channel() semantics

Suggested change
pub fn build_link_pair() -> (MockUdpSocket, MockUdpSocket) {
let (peer_a, peer_b): (char, char) = ('A', 'B');
pub fn mock_link((a, b): (char, char)) -> (MockUdpSocket, MockUdpSocket) {

and ideally the connection ID stuff would also be taken care of here, since there is only ever one connection per socket. if that can't be done easily here, then we can come back to it, but I think it would be nice to unify the two steps to enforce consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would prefer something more in line with the (tx, rx) = channel() semantics

Hm, are you talking about the refactor I mentioned in #100 ?

one natural path is to split the socket into a sender and receiver (much how creating a channel returns a split tx and rx object)

That refactor is interesting, but feels significant and out of scope. I'm not really sure what it would look like without changing how the socket trait is defined.

and ideally the connection ID stuff would also be taken care of here, since there is only ever one connection per socket. if that can't be done easily here, then we can come back to it, but I think it would be nice to unify the two steps to enforce consistency.

You're thinking like a single method that calls both build_link_pair() and build_connection_id_pair() and then returns the 4 resulting values?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last bit is what I'm referring to yes, returning a pair of pairs. and then those other functions don't need to be pub.

.await
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "channel closed"))?;
if buf.len() < packet.len() {
panic!("buffer too small for perfect link");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to panic here?

Copy link
Contributor Author

@carver carver Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the other options I see:

  • Return n=0, which gives the tester a warning log "unable to decode uTP packet"
  • Put the first part of the packet, up to the buffer size, into the buffer. again, giving an "unable to decode uTP packet"
  • Return an io::Error, maybe an InvalidInput. Note that src/socket seems to drop errors on the floor without a log

Any other suggestions?

Since this is a utility aimed at testing instead of production, panicking with a clear reason is more important than staying alive. None of these other options seem to give the tester much information about why it failed, and some will fail quietly, which means a long timeout/retry loop that's hard to debug.

So I don't see a better option than panicking, yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. I'm okay with keeping the panic, but we should document it as we did above for send_to.

@carver
Copy link
Contributor Author

carver commented Aug 8, 2023

Ok, I've finished this round of changes. The only change here that isn't an implementation from your feedback is 0d2a6da

I wanted a way to test the close() timeout completely, but did not want to wait the whole wall-clock time for the test to complete. Luckily, I found tokio's pause, which handles it beautifully. (even though I find it awkwardly named)

@carver carver requested a review from jacobkaufmann August 8, 2023 00:18
carver added a commit to carver/utp that referenced this pull request Aug 8, 2023
Close errors will be used more, when ethereum#98 lands, which returns the close
error to clients who wait for the stream to exit.
stream.close().await will not return until the connection exits, either
from an error, or from all data being written and ACKed.
@carver carver force-pushed the blocking-close-api branch from 14ddfd0 to 0d2a6da Compare August 8, 2023 17:10
@carver
Copy link
Contributor Author

carver commented Aug 8, 2023

Just rebased on #101

Copy link
Collaborator

@jacobkaufmann jacobkaufmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few minor remaining things. up to you whether to address the nits.

.await
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "channel closed"))?;
if buf.len() < packet.len() {
panic!("buffer too small for perfect link");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. I'm okay with keeping the panic, but we should document it as we did above for send_to.

src/testutils.rs Outdated
Comment on lines 81 to 79
pub fn build_link_pair() -> (MockUdpSocket, MockUdpSocket) {
let (peer_a, peer_b): (char, char) = ('A', 'B');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last bit is what I'm referring to yes, returning a pair of pairs. and then those other functions don't need to be pub.

Comment on lines +108 to +106
socket_a: &MockUdpSocket,
socket_b: &MockUdpSocket,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above, but I think it's better to give this back to the caller along with the socket upon creation of the link

src/testutils.rs Outdated
impl MockUdpSocket {
/// Grab a status controller for this socket.
/// Set the status to false to simulate a down link.
pub fn get_status_controller(&mut self) -> Arc<AtomicBool> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
pub fn get_status_controller(&mut self) -> Arc<AtomicBool> {
pub fn up_status(&mut self) -> Arc<AtomicBool> {

src/testutils.rs Outdated
}

/// Should the link send packets?
fn active(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
fn active(&self) -> bool {
fn is_active(&self) -> bool {

Use the new mock socket to:
- remove chance of real dropped packets on UDP during test
- allow parallel tests without fiddling with UDP ports
- convenience method to launch connected peers
- test what happens when network goes down

Add test that close() waits for writes to ACK and eventually returns a
timeout, by disabling the network after connecting to a peer. Closing
the connection will time out, because the sender is waiting for ACKs
that the recipient will never send. (Because the recipient never
recieves the data)

Why does this add the tokio test-util and start_paused = true?

We don't want the test to actually wait the timeout length while
running, so we use tokio's pause feature to skip over the timeout
without interrupting anything else that would happen in real-time:
https://docs.rs/tokio/latest/tokio/time/fn.pause.html
@carver carver force-pushed the blocking-close-api branch from 0d2a6da to 7a3fb10 Compare August 9, 2023 01:38
@carver
Copy link
Contributor Author

carver commented Aug 9, 2023

Thanks, that's a wrap!

@carver carver merged commit 750bd14 into ethereum:master Aug 9, 2023
@carver carver deleted the blocking-close-api branch August 9, 2023 01:41
carver added a commit to carver/utp that referenced this pull request Aug 9, 2023
Close errors will be used more, when ethereum#98 lands, which returns the close
error to clients who wait for the stream to exit.
carver added a commit to carver/utp that referenced this pull request Aug 9, 2023
Close errors will be used more, when ethereum#98 lands, which returns the close
error to clients who wait for the stream to exit.
carver added a commit to carver/utp that referenced this pull request Aug 9, 2023
Close errors will be used more, when ethereum#98 lands, which returns the close
error to clients who wait for the stream to exit.
carver added a commit to carver/utp that referenced this pull request Aug 9, 2023
Close errors will be used more, when ethereum#98 lands, which returns the close
error to clients who wait for the stream to exit.
carver added a commit to carver/utp that referenced this pull request Aug 9, 2023
Close errors will be used more, when ethereum#98 lands, which returns the close
error to clients who wait for the stream to exit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Change shutdown() to block until the write data is sent & acked
2 participants