Skip to content

Commit

Permalink
[HTTP2] Prepare migration actions (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnadoba authored Oct 13, 2021
1 parent 1081b0b commit c1a60d8
Show file tree
Hide file tree
Showing 12 changed files with 547 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
--patternlet inline
--stripunusedargs unnamed-only
--ranges nospace
--disable typeSugar # https://github.com/nicklockwood/SwiftFormat/issues/636
--disable typeSugar, andOperator # typeSugar: https://github.com/nicklockwood/SwiftFormat/issues/636

# rules
26 changes: 25 additions & 1 deletion Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ final class HTTPConnectionPool {
self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout

self._state = StateMachine(
eventLoopGroup: eventLoopGroup,
idGenerator: idGenerator,
maximumConcurrentHTTP1Connections: clientConfiguration.connectionPool.concurrentHTTP1ConnectionsPerHostSoftLimit
)
Expand Down Expand Up @@ -97,6 +96,10 @@ final class HTTPConnectionPool {
case createConnection(Connection.ID, on: EventLoop)
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case migration(
createConnections: [(Connection.ID, EventLoop)],
closeConnections: [Connection]
)
case none
}

Expand Down Expand Up @@ -184,6 +187,18 @@ final class HTTPConnectionPool {
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
cleanupContext.connectBackoff = []
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
case .migration(
let createConnections,
let closeConnections,
let scheduleTimeout
):
if let (connectionID, eventLoop) = scheduleTimeout {
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
}
self.unlocked.connection = .migration(
createConnections: createConnections,
closeConnections: closeConnections
)
case .none:
break
}
Expand Down Expand Up @@ -279,6 +294,15 @@ final class HTTPConnectionPool {
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
}

case .migration(let createConnections, let closeConnections):
for connection in closeConnections {
connection.close(promise: nil)
}

for (connectionID, eventLoop) in createConnections {
self.createConnection(connectionID, on: eventLoop)
}

case .none:
break
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ extension HTTPConnectionPool {
return migrationContext
}

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
mutating func migrateFromHTTP2(
starting: [(Connection.ID, EventLoop)],
backingOff: [(Connection.ID, EventLoop)]
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
}

for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
}
}

// MARK: Shutdown

mutating func shutdown() -> CleanupContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ extension HTTPConnectionPool {
}

typealias Action = HTTPConnectionPool.StateMachine.Action
typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction
typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction
typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction

private(set) var connections: HTTP1Connections
private(set) var http2Connections: HTTP2Connections?
private var failedConsecutiveConnectionAttempts: Int = 0
/// the error from the last connection creation
private var lastConnectFailure: Error?
Expand All @@ -41,6 +45,73 @@ extension HTTPConnectionPool {
self.requests = RequestQueue()
}

mutating func migrateFromHTTP2(
http2State: HTTP2StateMachine,
newHTTP1Connection: Connection
) -> Action {
self.migrateFromHTTP2(
http1Connections: http2State.http1Connections,
http2Connections: http2State.connections,
requests: http2State.requests,
newHTTP1Connection: newHTTP1Connection
)
}

mutating func migrateFromHTTP2(
http1Connections: HTTP1Connections? = nil,
http2Connections: HTTP2Connections,
requests: RequestQueue,
newHTTP1Connection: Connection
) -> Action {
let migrationAction = self.migrateConnectionsAndRequestsFromHTTP2(
http1Connections: http1Connections,
http2Connections: http2Connections,
requests: requests
)

let newConnectionAction = self._newHTTP1ConnectionEstablished(
newHTTP1Connection
)

return .init(
request: newConnectionAction.request,
connection: .combined(migrationAction, newConnectionAction.connection)
)
}

private mutating func migrateConnectionsAndRequestsFromHTTP2(
http1Connections: HTTP1Connections?,
http2Connections: HTTP2Connections,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty)
precondition(self.http2Connections == nil)
precondition(self.requests.isEmpty)

if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()
self.connections.migrateFromHTTP2(
starting: migration.starting,
backingOff: migration.backingOff
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Close all idle connections from context.close
// TODO: Start new http1 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

self.requests = requests

return .init(closeConnections: [], createConnections: [])
}

// MARK: - Events

mutating func executeRequest(_ request: Request) -> Action {
Expand Down Expand Up @@ -137,6 +208,10 @@ extension HTTPConnectionPool {
}

mutating func newHTTP1ConnectionEstablished(_ connection: Connection) -> Action {
.init(self._newHTTP1ConnectionEstablished(connection))
}

private mutating func _newHTTP1ConnectionEstablished(_ connection: Connection) -> EstablishedAction {
self.failedConsecutiveConnectionAttempts = 0
self.lastConnectFailure = nil
let (index, context) = self.connections.newHTTP1ConnectionEstablished(connection)
Expand Down Expand Up @@ -210,7 +285,7 @@ extension HTTPConnectionPool {

mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
let (index, context) = self.connections.releaseConnection(connectionID)
return self.nextActionForIdleConnection(at: index, context: context)
return .init(self.nextActionForIdleConnection(at: index, context: context))
}

/// A connection has been unexpectedly closed
Expand Down Expand Up @@ -278,7 +353,7 @@ extension HTTPConnectionPool {
// If there aren't any more connections, everything is shutdown
let isShutdown: StateMachine.ConnectionAction.IsShutdown
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty)
if self.connections.isEmpty {
if self.connections.isEmpty && self.http2Connections == nil {
self.state = .shutDown
isShutdown = .yes(unclean: unclean)
} else {
Expand All @@ -299,7 +374,7 @@ extension HTTPConnectionPool {
private mutating func nextActionForIdleConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> Action {
) -> EstablishedAction {
switch self.state {
case .running:
switch context.use {
Expand All @@ -311,7 +386,7 @@ extension HTTPConnectionPool {
case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
let connection = self.connections.closeConnection(at: index)
if self.connections.isEmpty {
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
Expand All @@ -330,7 +405,7 @@ extension HTTPConnectionPool {
private mutating func nextActionForIdleGeneralPurposeConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> Action {
) -> EstablishedAction {
// 1. Check if there are waiting requests in the general purpose queue
if let request = self.requests.popFirst(for: nil) {
return .init(
Expand Down Expand Up @@ -359,7 +434,7 @@ extension HTTPConnectionPool {
private mutating func nextActionForIdleEventLoopConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> Action {
) -> EstablishedAction {
// Check if there are waiting requests in the matching eventLoop queue
if let request = self.requests.popFirst(for: context.eventLoop) {
return .init(
Expand Down Expand Up @@ -398,7 +473,7 @@ extension HTTPConnectionPool {
case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
self.connections.removeConnection(at: index)
if self.connections.isEmpty {
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
Expand Down Expand Up @@ -444,6 +519,99 @@ extension HTTPConnectionPool {
self.connections.removeConnection(at: index)
return .none
}

// MARK: HTTP2

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
return .none
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.goAwayReceived(connectionID)
return .none
}

mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .running:
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}
return .none

case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
)
}
return .init(
request: .none,
connection: .none
)

case .shutDown:
preconditionFailure("It the pool is already shutdown, all connections must have been torn down.")
}
}

mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
switch self.state {
case .running:
let (index, context) = self.http2Connections!.releaseStream(connectionID)
guard context.isIdle else {
return .none
}

let connection = self.http2Connections!.closeConnection(at: index)
if self.http2Connections!.isEmpty {
self.http2Connections = nil
}
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)

case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
let (index, context) = self.http2Connections!.releaseStream(connectionID)
guard context.isIdle else {
return .none
}

let connection = self.http2Connections!.closeConnection(at: index)
if self.http2Connections!.isEmpty {
self.http2Connections = nil
}
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
)
}
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)

case .shutDown:
preconditionFailure("It the pool is already shutdown, all connections must have been torn down.")
}
}
}
}

Expand Down
Loading

0 comments on commit c1a60d8

Please sign in to comment.