diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 2fefd0420..36d8328a0 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -108,7 +108,7 @@ extension HTTPConnectionPool { ) } - // MARK: - Events + // MARK: - Events - mutating func executeRequest(_ request: Request) -> Action { switch self.state { @@ -519,16 +519,20 @@ extension HTTPConnectionPool { // MARK: HTTP2 mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { - // It is save to bang the http2Connections here. If we get this callback but we don't have - // http2 connections something has gone terribly wrong. - _ = self.http2Connections!.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + // The `http2Connections` are optional here: + // Connections report events back to us, if they are in a shutdown that was + // initiated by the state machine. For this reason this callback might be invoked + // even though all references to HTTP2Connections have already been cleared. + _ = self.http2Connections?.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) return .none } mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { - // It is save to bang the http2Connections here. If we get this callback but we don't have - // http2 connections something has gone terribly wrong. - _ = self.http2Connections!.goAwayReceived(connectionID) + // The `http2Connections` are optional here: + // Connections report events back to us, if they are in a shutdown that was + // initiated by the state machine. For this reason this callback might be invoked + // even though all references to HTTP2Connections have already been cleared. + _ = self.http2Connections?.goAwayReceived(connectionID) return .none } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index e2ef07065..7aa504d03 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -523,9 +523,14 @@ extension HTTPConnectionPool { /// Sets the connection with the given `connectionId` to the draining state. /// - Returns: the `EventLoop` to create a new connection on if applicable /// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state - mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext { + mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext? { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { - preconditionFailure("go away recieved for a connection that does not exists") + // When a connection close is initiated by the connection pool (e.g. because the + // connection was idle for too long), the connection will still report further + // events to the state machine even though we don't care about its state anymore. + // + // This is because the HTTP2Connection has a strong let reference to its delegate. + return nil } let eventLoop = self.connections[index].goAwayReceived() return GoAwayContext(eventLoop: eventLoop) @@ -540,9 +545,13 @@ extension HTTPConnectionPool { mutating func newHTTP2MaxConcurrentStreamsReceived( _ connectionID: Connection.ID, newMaxStreams: Int - ) -> (Int, EstablishedConnectionContext) { + ) -> (Int, EstablishedConnectionContext)? { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { - preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists") + // When a connection close is initiated by the connection pool (e.g. because the + // connection was idle for too long), the connection will still report its events to + // the state machine and hence to this `HTTP2Connections` struct. In those cases we + // must ignore the event. + return nil } let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams) let context = EstablishedConnectionContext( @@ -661,8 +670,10 @@ extension HTTPConnectionPool { mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { - /// When a connection close is initiated by the connection pool (e.g. because the connection was idle for too long), the connection will - /// still report its close to the state machine and then to us. In those cases we must ignore the event. + // When a connection close is initiated by the connection pool (e.g. because the + // connection was idle for too long), the connection will still report its close to + // the state machine and then to this `HTTP2Connections` struct. In those cases we + // must ignore the event. return nil } self.connections[index].fail() diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 7a9f4dc7c..932175696 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -293,12 +293,22 @@ extension HTTPConnectionPool { } mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { - let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + guard let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) else { + // When a connection close is initiated by the connection pool, the connection will + // still report further events (like newMaxConcurrentStreamsReceived) to the state + // machine. In those cases we must ignore the event. + return .none + } return .init(self.nextActionForAvailableConnection(at: index, context: context)) } mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { - let context = self.connections.goAwayReceived(connectionID) + guard let context = self.connections.goAwayReceived(connectionID) else { + // When a connection close is initiated by the connection pool, the connection will + // still report further events (like GOAWAY received) to the state machine. In those + // cases we must ignore the event. + return .none + } return self.nextActionForClosingConnection(on: context.eventLoop) } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift index f9afa713e..95cade669 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift @@ -38,6 +38,7 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests { ("testLeasingAllConnections", testLeasingAllConnections), ("testGoAway", testGoAway), ("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting), + ("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed), ("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable), ("testMigrationFromHTTP1", testMigrationFromHTTP1), ("testMigrationToHTTP1", testMigrationToHTTP1), diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index 804707959..9e9ca1df6 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -371,7 +371,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertEqual(leasedConn1, conn1) XCTAssertEqual(leasdConnContext1.wasIdle, true) - XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1) + XCTAssertTrue(connections.goAwayReceived(conn1ID)?.eventLoop === el1) XCTAssertEqual( connections.stats, @@ -389,7 +389,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertNil(connections.leaseStream(onRequired: el1), "we should not be able to lease a stream because the connection is draining") // a server can potentially send more than one connection go away and we should not crash - XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1) + XCTAssertTrue(connections.goAwayReceived(conn1ID)?.eventLoop === el1) XCTAssertEqual( connections.stats, .init( @@ -454,7 +454,9 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use") - let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2) + guard let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2) else { + return XCTFail("Expected to get a new settings context") + } XCTAssertEqual(newSettingsContext1.availableStreams, 1) XCTAssertTrue(newSettingsContext1.eventLoop === el1) XCTAssertFalse(newSettingsContext1.isIdle) @@ -465,7 +467,9 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertEqual(leasedConn2, conn1) XCTAssertEqual(leaseContext2.wasIdle, false) - let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1) + guard let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1) else { + return XCTFail("Expected to get a new settings context") + } XCTAssertEqual(newSettingsContext2.availableStreams, 0) XCTAssertTrue(newSettingsContext2.eventLoop === el1) XCTAssertFalse(newSettingsContext2.isIdle) @@ -489,6 +493,41 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertEqual(leaseContext3.wasIdle, true) } + func testEventsAfterConnectionIsClosed() { + let elg = EmbeddedEventLoopGroup(loops: 2) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + let el1 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1) + XCTAssertEqual(conn1CreatedContext.availableStreams, 1) + + let (leasedConn1, leasdConnContext1) = connections.leaseStreams(at: conn1Index, count: 1) + XCTAssertEqual(leasedConn1, conn1) + XCTAssertEqual(leasdConnContext1.wasIdle, true) + + XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use") + + let (_, releaseContext) = connections.releaseStream(conn1ID) + XCTAssertTrue(releaseContext.eventLoop === el1) + XCTAssertEqual(releaseContext.availableStreams, 1) + XCTAssertEqual(releaseContext.connectionID, conn1ID) + XCTAssertEqual(releaseContext.isIdle, true) + + // schedule timeout... this should remove the connection from http2Connections + + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), conn1) + + // events race with the complete shutdown + + XCTAssertNil(connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2)) + XCTAssertNil(connections.goAwayReceived(conn1ID)) + + // finally close event + XCTAssertNil(connections.failConnection(conn1ID)) + } + func testLeaseOnPreferredEventLoopWithoutAnyAvailable() { let elg = EmbeddedEventLoopGroup(loops: 4) var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index 8436d1478..ef1ed1f04 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -41,6 +41,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), ("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires), ("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected), + ("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 6f2e89142..049d18f7f 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -1088,4 +1088,23 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertNotNil(connections.randomParkedConnection()) XCTAssertEqual(connections.count, 1) } + + func testEventsAfterConnectionIsClosed() { + let elg = EmbeddedEventLoopGroup(loops: 2) + guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else { + return XCTFail("Test setup failed") + } + + let connection = connections.randomParkedConnection()! + XCTAssertNoThrow(try connections.closeConnection(connection)) + + let idleTimeoutAction = state.connectionIdleTimeout(connection.id) + XCTAssertEqual(idleTimeoutAction.connection, .closeConnection(connection, isShutdown: .no)) + XCTAssertEqual(idleTimeoutAction.request, .none) + + XCTAssertEqual(state.newHTTP2MaxConcurrentStreamsReceived(connection.id, newMaxStreams: 50), .none) + XCTAssertEqual(state.http2ConnectionGoAwayReceived(connection.id), .none) + + XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none) + } }