Skip to content

Commit

Permalink
Call didSendRequestPart after the write has hit the socket (#566)
Browse files Browse the repository at this point in the history
### Motivation

Today `didSendRequestPart` is called after a request body part has been passed to the executor. However, this does not mean that the write hit the socket. Users may depend on this behavior to implement back-pressure. For this reason, we should only call this `didSendRequestPart` once the write was successful.

### Modification

Pass a promise to the actual channel write and only call the delegate once that promise succeeds.

### Result

The delegate method `didSendRequestPart` is only called after the write was successful. Fixes #565.

Co-authored-by: Fabian Fett <[email protected]>
  • Loading branch information
FranzBusch and fabianfett authored Apr 26, 2022
1 parent a586fba commit 2442598
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 165 deletions.
8 changes: 4 additions & 4 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {

switch writeAction {
case .writeAndWait(let executor), .writeAndContinue(let executor):
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)

case .fail:
// an error/cancellation has happened. we don't need to continue here
Expand Down Expand Up @@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
switch self.state.writeNextRequestPart() {
case .writeAndContinue(let executor):
self.stateLock.unlock()
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)

case .writeAndWait(let executor):
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
self.state.waitForRequestBodyDemand(continuation: continuation)
self.stateLock.unlock()

executor.writeRequestBodyPart(.byteBuffer(part), request: self)
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
}

case .fail:
Expand All @@ -132,7 +132,7 @@ final class Transaction: @unchecked Sendable {
break

case .forwardStreamFinished(let executor, let succeedContinuation):
executor.finishRequestBodyStream(self)
executor.finishRequestBodyStream(self, promise: nil)
succeedContinuation?.resume(returning: nil)
}
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .sendRequestHead(let head, startBody: let startBody):
self.sendRequestHead(head, startBody: startBody, context: context)

case .sendBodyPart(let part):
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
case .sendBodyPart(let part, let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)

case .sendRequestEnd:
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
case .sendRequestEnd(let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)

if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(timeoutAction, context: context)
Expand Down Expand Up @@ -260,34 +260,51 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
switch finalAction {
case .close:
context.close(promise: nil)
case .sendRequestEnd:
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
oldRequest.succeedRequest(buffer)
case .sendRequestEnd(let writePromise):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
writePromise.futureResult.whenComplete { result in
switch result {
case .success:
oldRequest.succeedRequest(buffer)
case .failure(let error):
oldRequest.fail(error)
}
}

context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
case .informConnectionIsIdle:
self.connection.taskCompleted()
case .none:
break
oldRequest.succeedRequest(buffer)
}

oldRequest.succeedRequest(buffer)

case .failRequest(let error, let finalAction):
// see comment in the `succeedRequest` case.
let oldRequest = self.request!
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)

switch finalAction {
case .close:
case .close(let writePromise):
context.close(promise: nil)
case .sendRequestEnd:
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
writePromise?.fail(error)
oldRequest.fail(error)

case .informConnectionIsIdle:
self.connection.taskCompleted()
oldRequest.fail(error)

case .failWritePromise(let writePromise):
writePromise?.fail(error)
oldRequest.fail(error)

case .none:
break
oldRequest.fail(error)
}

oldRequest.fail(error)
case .failSendBodyPart(let error, let writePromise), .failSendStreamFinished(let error, let writePromise):
writePromise?.fail(error)
}
}

Expand Down Expand Up @@ -355,27 +372,29 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

// MARK: Private HTTPRequestExecutor

private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
guard self.request === request, let context = self.channelContext else {
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
// the request has been popped by the state machine or the ChannelHandler has been
// removed from the Channel pipeline. This is a normal threading issue, noone has
// screwed up.
promise?.fail(HTTPClientError.requestStreamCancelled)
return
}

let action = self.state.requestStreamPartReceived(data)
let action = self.state.requestStreamPartReceived(data, promise: promise)
self.run(action, context: context)
}

private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
promise?.fail(HTTPClientError.requestStreamCancelled)
return
}

let action = self.state.requestStreamFinished()
let action = self.state.requestStreamFinished(promise: promise)
self.run(action, context: context)
}

Expand Down Expand Up @@ -405,22 +424,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
}

extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.writeRequestBodyPart0(data, request: request)
self.writeRequestBodyPart0(data, request: request, promise: promise)
} else {
self.eventLoop.execute {
self.writeRequestBodyPart0(data, request: request)
self.writeRequestBodyPart0(data, request: request, promise: promise)
}
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.finishRequestBodyStream0(request)
self.finishRequestBodyStream0(request, promise: promise)
} else {
self.eventLoop.execute {
self.finishRequestBodyStream0(request)
self.finishRequestBodyStream0(request, promise: promise)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,46 @@ struct HTTP1ConnectionStateMachine {

enum Action {
/// A action to execute, when we consider a request "done".
enum FinalStreamAction {
enum FinalSuccessfulStreamAction {
/// Close the connection
case close
/// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded,
/// as soon as we wrote the request end onto the wire.
case sendRequestEnd
///
/// The promise is an optional write promise.
case sendRequestEnd(EventLoopPromise<Void>?)
/// Inform an observer that the connection has become idle
case informConnectionIsIdle
}

/// A action to execute, when we consider a request "done".
enum FinalFailedStreamAction {
/// Close the connection
///
/// The promise is an optional write promise.
case close(EventLoopPromise<Void>?)
/// Inform an observer that the connection has become idle
case informConnectionIsIdle
/// Fail the write promise
case failWritePromise(EventLoopPromise<Void>?)
/// Do nothing.
case none
}

case sendRequestHead(HTTPRequestHead, startBody: Bool)
case sendBodyPart(IOData)
case sendRequestEnd
case sendBodyPart(IOData, EventLoopPromise<Void>?)
case sendRequestEnd(EventLoopPromise<Void>?)
case failSendBodyPart(Error, EventLoopPromise<Void>?)
case failSendStreamFinished(Error, EventLoopPromise<Void>?)

case pauseRequestBodyStream
case resumeRequestBodyStream

case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool)
case forwardResponseBodyParts(CircularBuffer<ByteBuffer>)

case failRequest(Error, FinalStreamAction)
case succeedRequest(FinalStreamAction, CircularBuffer<ByteBuffer>)
case failRequest(Error, FinalFailedStreamAction)
case succeedRequest(FinalSuccessfulStreamAction, CircularBuffer<ByteBuffer>)

case read
case close
Expand Down Expand Up @@ -189,25 +205,25 @@ struct HTTP1ConnectionStateMachine {
}
}

mutating func requestStreamPartReceived(_ part: IOData) -> Action {
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
guard case .inRequest(var requestStateMachine, let close) = self.state else {
preconditionFailure("Invalid state: \(self.state)")
}

return self.avoidingStateMachineCoW { state -> Action in
let action = requestStateMachine.requestStreamPartReceived(part)
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
state = .inRequest(requestStateMachine, close: close)
return state.modify(with: action)
}
}

mutating func requestStreamFinished() -> Action {
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
guard case .inRequest(var requestStateMachine, let close) = self.state else {
preconditionFailure("Invalid state: \(self.state)")
}

return self.avoidingStateMachineCoW { state -> Action in
let action = requestStateMachine.requestStreamFinished()
let action = requestStateMachine.requestStreamFinished(promise: promise)
state = .inRequest(requestStateMachine, close: close)
return state.modify(with: action)
}
Expand Down Expand Up @@ -377,10 +393,10 @@ extension HTTP1ConnectionStateMachine.State {
return .pauseRequestBodyStream
case .resumeRequestBodyStream:
return .resumeRequestBodyStream
case .sendBodyPart(let part):
return .sendBodyPart(part)
case .sendRequestEnd:
return .sendRequestEnd
case .sendBodyPart(let part, let writePromise):
return .sendBodyPart(part, writePromise)
case .sendRequestEnd(let writePromise):
return .sendRequestEnd(writePromise)
case .forwardResponseHead(let head, let pauseRequestBodyStream):
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
case .forwardResponseBodyParts(let parts):
Expand All @@ -390,13 +406,13 @@ extension HTTP1ConnectionStateMachine.State {
preconditionFailure("Invalid state: \(self)")
}

let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalStreamAction
let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction
switch finalAction {
case .close:
self = .closing
newFinalAction = .close
case .sendRequestEnd:
newFinalAction = .sendRequestEnd
case .sendRequestEnd(let writePromise):
newFinalAction = .sendRequestEnd(writePromise)
case .none:
self = .idle
newFinalAction = close ? .close : .informConnectionIsIdle
Expand All @@ -410,9 +426,12 @@ extension HTTP1ConnectionStateMachine.State {
case .idle:
preconditionFailure("How can we fail a task, if we are idle")
case .inRequest(_, close: let close):
if close || finalAction == .close {
if case .close(let promise) = finalAction {
self = .closing
return .failRequest(error, .close(promise))
} else if close {
self = .closing
return .failRequest(error, .close)
return .failRequest(error, .close(nil))
} else {
self = .idle
return .failRequest(error, .informConnectionIsIdle)
Expand All @@ -433,6 +452,12 @@ extension HTTP1ConnectionStateMachine.State {

case .wait:
return .wait

case .failSendBodyPart(let error, let writePromise):
return .failSendBodyPart(error, writePromise)

case .failSendStreamFinished(let error, let writePromise):
return .failSendStreamFinished(error, writePromise)
}
}
}
Expand Down
Loading

0 comments on commit 2442598

Please sign in to comment.