diff --git a/.swiftformat b/.swiftformat index 3ac6aa632..5fd782414 100644 --- a/.swiftformat +++ b/.swiftformat @@ -9,6 +9,6 @@ --patternlet inline --stripunusedargs unnamed-only --ranges nospace ---disable typeSugar # https://github.com/nicklockwood/SwiftFormat/issues/636 +--disable typeSugar, andOperator # typeSugar: https://github.com/nicklockwood/SwiftFormat/issues/636 # rules diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index 9da353802..b392fc4bb 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -69,7 +69,6 @@ final class HTTPConnectionPool { self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout self._state = StateMachine( - eventLoopGroup: eventLoopGroup, idGenerator: idGenerator, maximumConcurrentHTTP1Connections: clientConfiguration.connectionPool.concurrentHTTP1ConnectionsPerHostSoftLimit ) @@ -97,6 +96,10 @@ final class HTTPConnectionPool { case createConnection(Connection.ID, on: EventLoop) case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown) case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown) + case migration( + createConnections: [(Connection.ID, EventLoop)], + closeConnections: [Connection] + ) case none } @@ -184,6 +187,18 @@ final class HTTPConnectionPool { self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff) cleanupContext.connectBackoff = [] self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown) + case .migration( + let createConnections, + let closeConnections, + let scheduleTimeout + ): + if let (connectionID, eventLoop) = scheduleTimeout { + self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop) + } + self.unlocked.connection = .migration( + createConnections: createConnections, + closeConnections: closeConnections + ) case .none: break } @@ -279,6 +294,15 @@ final class HTTPConnectionPool { self.delegate.connectionPoolDidShutdown(self, unclean: unclean) } + case .migration(let createConnections, let closeConnections): + for connection in closeConnections { + connection.close(promise: nil) + } + + for (connectionID, eventLoop) in createConnections { + self.createConnection(connectionID, on: eventLoop) + } + case .none: break } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index b0317eef4..fffd283c0 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -530,6 +530,28 @@ extension HTTPConnectionPool { return migrationContext } + /// we only handle starting and backing off connection here. + /// All running connections must be handled by the enclosing state machine + /// - Parameters: + /// - starting: starting HTTP connections from previous state machine + /// - backingOff: backing off HTTP connections from previous state machine + mutating func migrateFromHTTP2( + starting: [(Connection.ID, EventLoop)], + backingOff: [(Connection.ID, EventLoop)] + ) { + for (connectionID, eventLoop) in starting { + let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + self.connections.append(newConnection) + } + + for (connectionID, eventLoop) in backingOff { + var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + // TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState + backingOffConnection.failedToConnect() + self.connections.append(backingOffConnection) + } + } + // MARK: Shutdown mutating func shutdown() -> CleanupContext { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 0790f70db..42bad981b 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -23,8 +23,12 @@ extension HTTPConnectionPool { } typealias Action = HTTPConnectionPool.StateMachine.Action + typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction + typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction + typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction private(set) var connections: HTTP1Connections + private(set) var http2Connections: HTTP2Connections? private var failedConsecutiveConnectionAttempts: Int = 0 /// the error from the last connection creation private var lastConnectFailure: Error? @@ -41,6 +45,73 @@ extension HTTPConnectionPool { self.requests = RequestQueue() } + mutating func migrateFromHTTP2( + http2State: HTTP2StateMachine, + newHTTP1Connection: Connection + ) -> Action { + self.migrateFromHTTP2( + http1Connections: http2State.http1Connections, + http2Connections: http2State.connections, + requests: http2State.requests, + newHTTP1Connection: newHTTP1Connection + ) + } + + mutating func migrateFromHTTP2( + http1Connections: HTTP1Connections? = nil, + http2Connections: HTTP2Connections, + requests: RequestQueue, + newHTTP1Connection: Connection + ) -> Action { + let migrationAction = self.migrateConnectionsAndRequestsFromHTTP2( + http1Connections: http1Connections, + http2Connections: http2Connections, + requests: requests + ) + + let newConnectionAction = self._newHTTP1ConnectionEstablished( + newHTTP1Connection + ) + + return .init( + request: newConnectionAction.request, + connection: .combined(migrationAction, newConnectionAction.connection) + ) + } + + private mutating func migrateConnectionsAndRequestsFromHTTP2( + http1Connections: HTTP1Connections?, + http2Connections: HTTP2Connections, + requests: RequestQueue + ) -> ConnectionMigrationAction { + precondition(self.connections.isEmpty) + precondition(self.http2Connections == nil) + precondition(self.requests.isEmpty) + + if let http1Connections = http1Connections { + self.connections = http1Connections + } + + var http2Connections = http2Connections + let migration = http2Connections.migrateToHTTP1() + self.connections.migrateFromHTTP2( + starting: migration.starting, + backingOff: migration.backingOff + ) + + if !http2Connections.isEmpty { + self.http2Connections = http2Connections + } + + // TODO: Close all idle connections from context.close + // TODO: Start new http1 connections for pending requests + // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) + + self.requests = requests + + return .init(closeConnections: [], createConnections: []) + } + // MARK: - Events mutating func executeRequest(_ request: Request) -> Action { @@ -137,6 +208,10 @@ extension HTTPConnectionPool { } mutating func newHTTP1ConnectionEstablished(_ connection: Connection) -> Action { + .init(self._newHTTP1ConnectionEstablished(connection)) + } + + private mutating func _newHTTP1ConnectionEstablished(_ connection: Connection) -> EstablishedAction { self.failedConsecutiveConnectionAttempts = 0 self.lastConnectFailure = nil let (index, context) = self.connections.newHTTP1ConnectionEstablished(connection) @@ -210,7 +285,7 @@ extension HTTPConnectionPool { mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { let (index, context) = self.connections.releaseConnection(connectionID) - return self.nextActionForIdleConnection(at: index, context: context) + return .init(self.nextActionForIdleConnection(at: index, context: context)) } /// A connection has been unexpectedly closed @@ -278,7 +353,7 @@ extension HTTPConnectionPool { // If there aren't any more connections, everything is shutdown let isShutdown: StateMachine.ConnectionAction.IsShutdown let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) - if self.connections.isEmpty { + if self.connections.isEmpty && self.http2Connections == nil { self.state = .shutDown isShutdown = .yes(unclean: unclean) } else { @@ -299,7 +374,7 @@ extension HTTPConnectionPool { private mutating func nextActionForIdleConnection( at index: Int, context: HTTP1Connections.IdleConnectionContext - ) -> Action { + ) -> EstablishedAction { switch self.state { case .running: switch context.use { @@ -311,7 +386,7 @@ extension HTTPConnectionPool { case .shuttingDown(let unclean): assert(self.requests.isEmpty) let connection = self.connections.closeConnection(at: index) - if self.connections.isEmpty { + if self.connections.isEmpty && self.http2Connections == nil { return .init( request: .none, connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean)) @@ -330,7 +405,7 @@ extension HTTPConnectionPool { private mutating func nextActionForIdleGeneralPurposeConnection( at index: Int, context: HTTP1Connections.IdleConnectionContext - ) -> Action { + ) -> EstablishedAction { // 1. Check if there are waiting requests in the general purpose queue if let request = self.requests.popFirst(for: nil) { return .init( @@ -359,7 +434,7 @@ extension HTTPConnectionPool { private mutating func nextActionForIdleEventLoopConnection( at index: Int, context: HTTP1Connections.IdleConnectionContext - ) -> Action { + ) -> EstablishedAction { // Check if there are waiting requests in the matching eventLoop queue if let request = self.requests.popFirst(for: context.eventLoop) { return .init( @@ -398,7 +473,7 @@ extension HTTPConnectionPool { case .shuttingDown(let unclean): assert(self.requests.isEmpty) self.connections.removeConnection(at: index) - if self.connections.isEmpty { + if self.connections.isEmpty && self.http2Connections == nil { return .init( request: .none, connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) @@ -444,6 +519,99 @@ extension HTTPConnectionPool { self.connections.removeConnection(at: index) return .none } + + // MARK: HTTP2 + + mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { + // It is save to bang the http2Connections here. If we get this callback but we don't have + // http2 connections something has gone terribly wrong. + _ = self.http2Connections!.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + return .none + } + + mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { + // It is save to bang the http2Connections here. If we get this callback but we don't have + // http2 connections something has gone terribly wrong. + _ = self.http2Connections!.goAwayReceived(connectionID) + return .none + } + + mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { + switch self.state { + case .running: + _ = self.http2Connections?.failConnection(connectionID) + if self.http2Connections?.isEmpty == true { + self.http2Connections = nil + } + return .none + + case .shuttingDown(let unclean): + assert(self.requests.isEmpty) + _ = self.http2Connections?.failConnection(connectionID) + if self.http2Connections?.isEmpty == true { + self.http2Connections = nil + } + if self.connections.isEmpty && self.http2Connections == nil { + return .init( + request: .none, + connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) + ) + } + return .init( + request: .none, + connection: .none + ) + + case .shutDown: + preconditionFailure("It the pool is already shutdown, all connections must have been torn down.") + } + } + + mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { + // It is save to bang the http2Connections here. If we get this callback but we don't have + // http2 connections something has gone terribly wrong. + switch self.state { + case .running: + let (index, context) = self.http2Connections!.releaseStream(connectionID) + guard context.isIdle else { + return .none + } + + let connection = self.http2Connections!.closeConnection(at: index) + if self.http2Connections!.isEmpty { + self.http2Connections = nil + } + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + + case .shuttingDown(let unclean): + assert(self.requests.isEmpty) + let (index, context) = self.http2Connections!.releaseStream(connectionID) + guard context.isIdle else { + return .none + } + + let connection = self.http2Connections!.closeConnection(at: index) + if self.http2Connections!.isEmpty { + self.http2Connections = nil + } + if self.connections.isEmpty && self.http2Connections == nil { + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean)) + ) + } + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + + case .shutDown: + preconditionFailure("It the pool is already shutdown, all connections must have been torn down.") + } + } } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 28b9517b6..33c747eb1 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -282,6 +282,40 @@ extension HTTPConnectionPool { } } + enum MigrateAction { + case removeConnection + case keepConnection + } + + func migrateToHTTP1( + context: inout HTTP2Connections.HTTP2ToHTTP1MigrationContext + ) -> MigrateAction { + switch self.state { + case .starting: + context.starting.append((self.connectionID, self.eventLoop)) + return .removeConnection + + case .active(let connection, _, let usedStreams, _): + precondition(usedStreams >= 0) + if usedStreams == 0 { + context.close.append(connection) + return .removeConnection + } else { + return .keepConnection + } + + case .draining: + return .keepConnection + + case .backingOff: + context.backingOff.append((self.connectionID, self.eventLoop)) + return .removeConnection + + case .closed: + preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)") + } + } + init(connectionID: Connection.ID, eventLoop: EventLoop) { self.connectionID = connectionID self.eventLoop = eventLoop @@ -317,7 +351,7 @@ extension HTTPConnectionPool { /// - Parameters: /// - starting: starting HTTP connections from previous state machine /// - backingOff: backing off HTTP connections from previous state machine - mutating func migrateConnections( + mutating func migrateFromHTTP1( starting: [(Connection.ID, EventLoop)], backingOff: [(Connection.ID, EventLoop)] ) { @@ -334,6 +368,25 @@ extension HTTPConnectionPool { } } + struct HTTP2ToHTTP1MigrationContext { + var backingOff: [(Connection.ID, EventLoop)] = [] + var starting: [(Connection.ID, EventLoop)] = [] + var close: [Connection] = [] + } + + mutating func migrateToHTTP1() -> HTTP2ToHTTP1MigrationContext { + var context = HTTP2ToHTTP1MigrationContext() + self.connections.removeAll { connection in + switch connection.migrateToHTTP1(context: &context) { + case .removeConnection: + return false + case .keepConnection: + return true + } + } + return context + } + // MARK: Connection creation /// true if one ore more connections are active diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 355c1e7be..a314c0861 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -18,8 +18,9 @@ import NIOHTTP2 extension HTTPConnectionPool { struct HTTP2StateMachine { typealias Action = HTTPConnectionPool.StateMachine.Action - typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction - typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction + typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction + typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction + typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction private enum State: Equatable { case running @@ -30,10 +31,10 @@ extension HTTPConnectionPool { private var lastConnectFailure: Error? private var failedConsecutiveConnectionAttempts = 0 - private var connections: HTTP2Connections - private var http1Connections: HTTP1Connections? + private(set) var connections: HTTP2Connections + private(set) var http1Connections: HTTP1Connections? - private var requests: RequestQueue + private(set) var requests: RequestQueue private let idGenerator: Connection.ID.Generator @@ -48,17 +49,60 @@ extension HTTPConnectionPool { self.connections = HTTP2Connections(generator: idGenerator) } - mutating func migrateConnectionsFromHTTP1( - connections http1Connections: HTTP1Connections, - requests: RequestQueue + mutating func migrateFromHTTP1( + http1State: HTTP1StateMachine, + newHTTP2Connection: Connection, + maxConcurrentStreams: Int ) -> Action { + self.migrateFromHTTP1( + http1Connections: http1State.connections, + http2Connections: http1State.http2Connections, + requests: http1State.requests, + newHTTP2Connection: newHTTP2Connection, + maxConcurrentStreams: maxConcurrentStreams + ) + } + + mutating func migrateFromHTTP1( + http1Connections: HTTP1Connections, + http2Connections: HTTP2Connections? = nil, + requests: RequestQueue, + newHTTP2Connection: Connection, + maxConcurrentStreams: Int + ) -> Action { + let migrationAction = self.migrateConnectionsAndRequestsFromHTTP1( + http1Connections: http1Connections, + http2Connections: http2Connections, + requests: requests + ) + + let newConnectionAction = self._newHTTP2ConnectionEstablished( + newHTTP2Connection, + maxConcurrentStreams: maxConcurrentStreams + ) + + return .init( + request: newConnectionAction.request, + connection: .combined(migrationAction, newConnectionAction.connection) + ) + } + + private mutating func migrateConnectionsAndRequestsFromHTTP1( + http1Connections: HTTP1Connections, + http2Connections: HTTP2Connections?, + requests: RequestQueue + ) -> ConnectionMigrationAction { precondition(self.http1Connections == nil) precondition(self.connections.isEmpty) precondition(self.requests.isEmpty) + if let http2Connections = http2Connections { + self.connections = http2Connections + } + var http1Connections = http1Connections // make http1Connections mutable let context = http1Connections.migrateToHTTP2() - self.connections.migrateConnections( + self.connections.migrateFromHTTP1( starting: context.starting, backingOff: context.backingOff ) @@ -70,10 +114,10 @@ extension HTTPConnectionPool { self.requests = requests // TODO: Close all idle connections from context.close - // TODO: Start new http2 connections for + // TODO: Start new http2 connections for pending requests // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) - return .none + return .init(closeConnections: [], createConnections: []) } mutating func executeRequest(_ request: Request) -> Action { @@ -173,6 +217,10 @@ extension HTTPConnectionPool { } mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> Action { + .init(self._newHTTP2ConnectionEstablished(connection, maxConcurrentStreams: maxConcurrentStreams)) + } + + private mutating func _newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> EstablishedAction { self.failedConsecutiveConnectionAttempts = 0 self.lastConnectFailure = nil let (index, context) = self.connections.newHTTP2ConnectionEstablished( @@ -185,7 +233,7 @@ extension HTTPConnectionPool { private mutating func nextActionForAvailableConnection( at index: Int, context: HTTP2Connections.EstablishedConnectionContext - ) -> Action { + ) -> EstablishedAction { switch self.state { case .running: // We prioritise requests with a required event loop over those without a requirement. @@ -197,7 +245,7 @@ extension HTTPConnectionPool { // use the remaining available streams for requests without a required event loop requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) - let requestAction = { () -> RequestAction in + let requestAction = { () -> HTTPConnectionPool.StateMachine.RequestAction in if requestsToExecute.isEmpty { return .none } else { @@ -209,7 +257,7 @@ extension HTTPConnectionPool { } }() - let connectionAction = { () -> ConnectionAction in + let connectionAction = { () -> EstablishedConnectionAction in if context.isIdle, requestsToExecute.isEmpty { return .scheduleTimeoutTimer(context.connectionID, on: context.eventLoop) } else { @@ -227,7 +275,7 @@ extension HTTPConnectionPool { } let connection = self.connections.closeConnection(at: index) - if self.connections.isEmpty { + if self.http1Connections == nil, self.connections.isEmpty { return .init( request: .none, connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean)) @@ -244,7 +292,7 @@ extension HTTPConnectionPool { mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) - return self.nextActionForAvailableConnection(at: index, context: context) + return .init(self.nextActionForAvailableConnection(at: index, context: context)) } mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { @@ -314,7 +362,7 @@ extension HTTPConnectionPool { mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { let (index, context) = self.connections.releaseStream(connectionID) - return self.nextActionForAvailableConnection(at: index, context: context) + return .init(self.nextActionForAvailableConnection(at: index, context: context)) } mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { @@ -457,7 +505,7 @@ extension HTTPConnectionPool { // If there aren't any more connections, everything is shutdown let isShutdown: StateMachine.ConnectionAction.IsShutdown let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil) - if self.connections.isEmpty, self.http1Connections == nil { + if self.connections.isEmpty && self.http1Connections == nil { isShutdown = .yes(unclean: unclean) self.state = .shutDown } else { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index e462d7716..dab1354c9 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -44,6 +44,12 @@ extension HTTPConnectionPool { case closeConnection(Connection, isShutdown: IsShutdown) case cleanupConnections(CleanupContext, isShutdown: IsShutdown) + case migration( + createConnections: [(Connection.ID, EventLoop)], + closeConnections: [Connection], + scheduleTimeout: (Connection.ID, EventLoop)? + ) + case none } @@ -67,17 +73,17 @@ extension HTTPConnectionPool { var state: HTTPVersionState var isShuttingDown: Bool = false - let eventLoopGroup: EventLoopGroup + let idGenerator: Connection.ID.Generator let maximumConcurrentHTTP1Connections: Int - init(eventLoopGroup: EventLoopGroup, idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) { + init(idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) { self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections + self.idGenerator = idGenerator let http1State = HTTP1StateMachine( idGenerator: idGenerator, maximumConcurrentConnections: maximumConcurrentHTTP1Connections ) self.state = .http1(http1State) - self.eventLoopGroup = eventLoopGroup } mutating func executeRequest(_ request: Request) -> Action { @@ -218,3 +224,82 @@ extension HTTPConnectionPool.StateMachine: CustomStringConvertible { } } } + +extension HTTPConnectionPool.StateMachine { + struct ConnectionMigrationAction { + var closeConnections: [HTTPConnectionPool.Connection] + var createConnections: [(HTTPConnectionPool.Connection.ID, EventLoop)] + } + + struct EstablishedAction { + static let none: Self = .init(request: .none, connection: .none) + let request: HTTPConnectionPool.StateMachine.RequestAction + let connection: EstablishedConnectionAction + } + + enum EstablishedConnectionAction { + case none + case scheduleTimeoutTimer(HTTPConnectionPool.Connection.ID, on: EventLoop) + case closeConnection(HTTPConnectionPool.Connection, isShutdown: HTTPConnectionPool.StateMachine.ConnectionAction.IsShutdown) + } +} + +extension HTTPConnectionPool.StateMachine.Action { + init(_ action: HTTPConnectionPool.StateMachine.EstablishedAction) { + self.init( + request: action.request, + connection: .init(action.connection) + ) + } +} + +extension HTTPConnectionPool.StateMachine.ConnectionAction { + init(_ action: HTTPConnectionPool.StateMachine.EstablishedConnectionAction) { + switch action { + case .none: + self = .none + case .scheduleTimeoutTimer(let connectionID, let eventLoop): + self = .scheduleTimeoutTimer(connectionID, on: eventLoop) + case .closeConnection(let connection, let isShutdown): + self = .closeConnection(connection, isShutdown: isShutdown) + } + } +} + +extension HTTPConnectionPool.StateMachine.ConnectionAction { + static func combined( + _ migrationAction: HTTPConnectionPool.StateMachine.ConnectionMigrationAction, + _ establishedAction: HTTPConnectionPool.StateMachine.EstablishedConnectionAction + ) -> Self { + switch establishedAction { + case .none: + return .migration( + createConnections: migrationAction.createConnections, + closeConnections: migrationAction.closeConnections, + scheduleTimeout: nil + ) + case .closeConnection(let connection, let isShutdown): + guard isShutdown == .no else { + precondition( + migrationAction.closeConnections.isEmpty && + migrationAction.createConnections.isEmpty, + "migration actions are not supported during shutdown" + ) + return .closeConnection(connection, isShutdown: isShutdown) + } + var closeConnections = migrationAction.closeConnections + closeConnections.append(connection) + return .migration( + createConnections: migrationAction.createConnections, + closeConnections: closeConnections, + scheduleTimeout: nil + ) + case .scheduleTimeoutTimer(let connectionID, let eventLoop): + return .migration( + createConnections: migrationAction.createConnections, + closeConnections: migrationAction.closeConnections, + scheduleTimeout: (connectionID, eventLoop) + ) + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift index b100a0b54..49a6fb574 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift @@ -25,7 +25,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 8 ) @@ -109,7 +108,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 2 ) @@ -167,7 +165,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 2 ) @@ -204,7 +201,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 2 ) @@ -595,7 +591,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 6 ) @@ -634,7 +629,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 6 ) @@ -666,7 +660,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try eventLoop.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: eventLoop, idGenerator: .init(), maximumConcurrentHTTP1Connections: 6 ) @@ -690,7 +683,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: 6 ) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index 9198df0b0..c361c5c25 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -513,7 +513,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { let conn1ID: HTTPConnectionPool.Connection.ID = 1 let conn2ID: HTTPConnectionPool.Connection.ID = 2 - connections.migrateConnections( + connections.migrateFromHTTP1( starting: [(conn1ID, el1)], backingOff: [(conn2ID, el2)] ) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 5bcc5fb02..ce728147f 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -312,14 +312,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = http2State.migrateConnectionsFromHTTP1( - connections: http1State.connections, - requests: http1State.requests - ) - XCTAssertEqual(migrationAction, .none) - - let http2ConnectAction = http2State.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) - XCTAssertEqual(http2ConnectAction.connection, .none) + let http2ConnectAction = http2State.migrateFromHTTP1(http1State: http1State, newHTTP2Connection: conn2, maxConcurrentStreams: 100) + XCTAssertEqual(http2ConnectAction.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) guard case .executeRequestsAndCancelTimeouts([request2], conn2) = http2ConnectAction.request else { return XCTFail("Unexpected request action \(http2ConnectAction.request)") } @@ -351,15 +345,16 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) - let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) - XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + XCTAssertEqual(connectAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: (conn1ID, el1) + )) // execute request on idle connection let mockRequest1 = MockHTTPRequest(eventLoop: el1) @@ -395,15 +390,15 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) - let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) - XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + XCTAssertEqual(connectAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: (conn1ID, el1) + )) // let the connection timeout let timeoutAction = state.connectionIdleTimeout(conn1ID) @@ -414,26 +409,39 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { func testConnectionEstablishmentFailure() { struct SomeError: Error, Equatable {} - let elg = EmbeddedEventLoopGroup(loops: 1) + let elg = EmbeddedEventLoopGroup(loops: 2) let el1 = elg.next() + let el2 = elg.next() // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: (conn1ID, el1) + )) + + // create new http2 connection + let mockRequest1 = MockHTTPRequest(eventLoop: el2, requiresEventLoopForChannel: true) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let executeAction = state.executeRequest(request1) + XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request1, on: el2)) + guard case .createConnection(let conn2ID, _) = executeAction.connection else { + return XCTFail("unexpected connection action \(executeAction.connection)") + } - let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn1ID) + let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn2ID) XCTAssertEqual(action.request, .none) - guard case .scheduleBackoffTimer(conn1ID, _, let eventLoop) = action.connection else { + guard case .scheduleBackoffTimer(conn2ID, _, let eventLoop) = action.connection else { return XCTFail("unexpected connection action \(action.connection)") } - XCTAssertEqual(eventLoop.id, el1.id) + XCTAssertEqual(eventLoop.id, el2.id) } func testGoAwayOnIdleConnection() { @@ -445,15 +453,21 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) - let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + + let connectAction = state.migrateFromHTTP1( + http1Connections: http1Conns, + requests: .init(), + newHTTP2Connection: conn1, + maxConcurrentStreams: 100 + ) XCTAssertEqual(connectAction.request, .none) - XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + XCTAssertEqual(connectAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: (conn1ID, el1) + )) let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID) XCTAssertEqual(goAwayAction.request, .none) @@ -469,15 +483,20 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) - let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + let connectAction = state.migrateFromHTTP1( + http1Connections: http1Conns, + requests: .init(), + newHTTP2Connection: conn1, + maxConcurrentStreams: 100 + ) XCTAssertEqual(connectAction.request, .none) - XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + XCTAssertEqual(connectAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: (conn1ID, el1) + )) // execute request on idle connection let mockRequest1 = MockHTTPRequest(eventLoop: el1) @@ -505,15 +524,20 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) - let connectAction1 = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1) + let connectAction1 = state.migrateFromHTTP1( + http1Connections: http1Conns, + requests: .init(), + newHTTP2Connection: conn1, + maxConcurrentStreams: 1 + ) XCTAssertEqual(connectAction1.request, .none) - XCTAssertEqual(connectAction1.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + XCTAssertEqual(connectAction1.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: (conn1ID, el1) + )) // execute request let mockRequest1 = MockHTTPRequest(eventLoop: el1) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift index 716a13358..cb67837d7 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift @@ -87,6 +87,24 @@ extension HTTPConnectionPool.StateMachine.ConnectionAction: Equatable { return lhsConn == rhsConn && lhsShut == rhsShut case (.cleanupConnections(let lhsContext, isShutdown: let lhsShut), .cleanupConnections(let rhsContext, isShutdown: let rhsShut)): return lhsContext == rhsContext && lhsShut == rhsShut + case ( + .migration( + let lhsCreateConnections, + let lhsCloseConnections, + let lhsScheduleTimeout + ), + .migration( + let rhsCreateConnections, + let rhsCloseConnections, + let rhsScheduleTimeout + ) + ): + return lhsCreateConnections.elementsEqual(rhsCreateConnections, by: { + $0.0 == $1.0 && $0.1 === $1.1 + }) && + lhsCloseConnections == rhsCloseConnections && + lhsScheduleTimeout?.0 == rhsScheduleTimeout?.0 && + lhsScheduleTimeout?.1 === rhsScheduleTimeout?.1 case (.none, .none): return true default: @@ -123,3 +141,24 @@ extension HTTPConnectionPool.StateMachine.Action: Equatable { lhs.connection == rhs.connection && lhs.request == rhs.request } } + +extension HTTPConnectionPool.HTTP2StateMachine.EstablishedConnectionAction: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + switch (lhs, rhs) { + case (.scheduleTimeoutTimer(let lhsConnID, on: let lhsEL), .scheduleTimeoutTimer(let rhsConnID, on: let rhsEL)): + return lhsConnID == rhsConnID && lhsEL === rhsEL + case (.closeConnection(let lhsConn, isShutdown: let lhsShut), .closeConnection(let rhsConn, isShutdown: let rhsShut)): + return lhsConn == rhsConn && lhsShut == rhsShut + case (.none, .none): + return true + default: + return false + } + } +} + +extension HTTPConnectionPool.HTTP2StateMachine.EstablishedAction: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + lhs.connection == rhs.connection && lhs.request == rhs.request + } +} diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index 009caa922..db770726d 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -506,7 +506,6 @@ extension MockConnectionPool { maxNumberOfConnections: Int = 8 ) throws -> (Self, HTTPConnectionPool.StateMachine) { var state = HTTPConnectionPool.StateMachine( - eventLoopGroup: elg, idGenerator: .init(), maximumConcurrentHTTP1Connections: maxNumberOfConnections )