Skip to content

Commit

Permalink
Crash fix: HTTP2Connections emit events after the pool has closed the…
Browse files Browse the repository at this point in the history
…m. (#481)
  • Loading branch information
fabianfett authored Nov 19, 2021
1 parent 1f3f141 commit 2fe3f42
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ extension HTTPConnectionPool {
)
}

// MARK: - Events
// MARK: - Events -

mutating func executeRequest(_ request: Request) -> Action {
switch self.state {
Expand Down Expand Up @@ -519,16 +519,20 @@ extension HTTPConnectionPool {
// MARK: HTTP2

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
// The `http2Connections` are optional here:
// Connections report events back to us, if they are in a shutdown that was
// initiated by the state machine. For this reason this callback might be invoked
// even though all references to HTTP2Connections have already been cleared.
_ = self.http2Connections?.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
return .none
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.goAwayReceived(connectionID)
// The `http2Connections` are optional here:
// Connections report events back to us, if they are in a shutdown that was
// initiated by the state machine. For this reason this callback might be invoked
// even though all references to HTTP2Connections have already been cleared.
_ = self.http2Connections?.goAwayReceived(connectionID)
return .none
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,14 @@ extension HTTPConnectionPool {
/// Sets the connection with the given `connectionId` to the draining state.
/// - Returns: the `EventLoop` to create a new connection on if applicable
/// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext {
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("go away recieved for a connection that does not exists")
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report further
// events to the state machine even though we don't care about its state anymore.
//
// This is because the HTTP2Connection has a strong let reference to its delegate.
return nil
}
let eventLoop = self.connections[index].goAwayReceived()
return GoAwayContext(eventLoop: eventLoop)
Expand All @@ -540,9 +545,13 @@ extension HTTPConnectionPool {
mutating func newHTTP2MaxConcurrentStreamsReceived(
_ connectionID: Connection.ID,
newMaxStreams: Int
) -> (Int, EstablishedConnectionContext) {
) -> (Int, EstablishedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists")
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report its events to
// the state machine and hence to this `HTTP2Connections` struct. In those cases we
// must ignore the event.
return nil
}
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
let context = EstablishedConnectionContext(
Expand Down Expand Up @@ -661,8 +670,10 @@ extension HTTPConnectionPool {

mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
/// When a connection close is initiated by the connection pool (e.g. because the connection was idle for too long), the connection will
/// still report its close to the state machine and then to us. In those cases we must ignore the event.
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report its close to
// the state machine and then to this `HTTP2Connections` struct. In those cases we
// must ignore the event.
return nil
}
self.connections[index].fail()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,22 @@ extension HTTPConnectionPool {
}

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
guard let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) else {
// When a connection close is initiated by the connection pool, the connection will
// still report further events (like newMaxConcurrentStreamsReceived) to the state
// machine. In those cases we must ignore the event.
return .none
}
return .init(self.nextActionForAvailableConnection(at: index, context: context))
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
let context = self.connections.goAwayReceived(connectionID)
guard let context = self.connections.goAwayReceived(connectionID) else {
// When a connection close is initiated by the connection pool, the connection will
// still report further events (like GOAWAY received) to the state machine. In those
// cases we must ignore the event.
return .none
}
return self.nextActionForClosingConnection(on: context.eventLoop)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests {
("testLeasingAllConnections", testLeasingAllConnections),
("testGoAway", testGoAway),
("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting),
("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed),
("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable),
("testMigrationFromHTTP1", testMigrationFromHTTP1),
("testMigrationToHTTP1", testMigrationToHTTP1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(leasedConn1, conn1)
XCTAssertEqual(leasdConnContext1.wasIdle, true)

XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1)
XCTAssertTrue(connections.goAwayReceived(conn1ID)?.eventLoop === el1)

XCTAssertEqual(
connections.stats,
Expand All @@ -389,7 +389,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertNil(connections.leaseStream(onRequired: el1), "we should not be able to lease a stream because the connection is draining")

// a server can potentially send more than one connection go away and we should not crash
XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1)
XCTAssertTrue(connections.goAwayReceived(conn1ID)?.eventLoop === el1)
XCTAssertEqual(
connections.stats,
.init(
Expand Down Expand Up @@ -454,7 +454,9 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {

XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use")

let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2)
guard let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2) else {
return XCTFail("Expected to get a new settings context")
}
XCTAssertEqual(newSettingsContext1.availableStreams, 1)
XCTAssertTrue(newSettingsContext1.eventLoop === el1)
XCTAssertFalse(newSettingsContext1.isIdle)
Expand All @@ -465,7 +467,9 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(leasedConn2, conn1)
XCTAssertEqual(leaseContext2.wasIdle, false)

let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1)
guard let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1) else {
return XCTFail("Expected to get a new settings context")
}
XCTAssertEqual(newSettingsContext2.availableStreams, 0)
XCTAssertTrue(newSettingsContext2.eventLoop === el1)
XCTAssertFalse(newSettingsContext2.isIdle)
Expand All @@ -489,6 +493,41 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(leaseContext3.wasIdle, true)
}

func testEventsAfterConnectionIsClosed() {
let elg = EmbeddedEventLoopGroup(loops: 2)
var connections = HTTPConnectionPool.HTTP2Connections(generator: .init())
let el1 = elg.next()

let conn1ID = connections.createNewConnection(on: el1)
let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1)
let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1)
XCTAssertEqual(conn1CreatedContext.availableStreams, 1)

let (leasedConn1, leasdConnContext1) = connections.leaseStreams(at: conn1Index, count: 1)
XCTAssertEqual(leasedConn1, conn1)
XCTAssertEqual(leasdConnContext1.wasIdle, true)

XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use")

let (_, releaseContext) = connections.releaseStream(conn1ID)
XCTAssertTrue(releaseContext.eventLoop === el1)
XCTAssertEqual(releaseContext.availableStreams, 1)
XCTAssertEqual(releaseContext.connectionID, conn1ID)
XCTAssertEqual(releaseContext.isIdle, true)

// schedule timeout... this should remove the connection from http2Connections

XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), conn1)

// events race with the complete shutdown

XCTAssertNil(connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2))
XCTAssertNil(connections.goAwayReceived(conn1ID))

// finally close event
XCTAssertNil(connections.failConnection(conn1ID))
}

func testLeaseOnPreferredEventLoopWithoutAnyAvailable() {
let elg = EmbeddedEventLoopGroup(loops: 4)
var connections = HTTPConnectionPool.HTTP2Connections(generator: .init())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration),
("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires),
("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected),
("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed),
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1088,4 +1088,23 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
XCTAssertNotNil(connections.randomParkedConnection())
XCTAssertEqual(connections.count, 1)
}

func testEventsAfterConnectionIsClosed() {
let elg = EmbeddedEventLoopGroup(loops: 2)
guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else {
return XCTFail("Test setup failed")
}

let connection = connections.randomParkedConnection()!
XCTAssertNoThrow(try connections.closeConnection(connection))

let idleTimeoutAction = state.connectionIdleTimeout(connection.id)
XCTAssertEqual(idleTimeoutAction.connection, .closeConnection(connection, isShutdown: .no))
XCTAssertEqual(idleTimeoutAction.request, .none)

XCTAssertEqual(state.newHTTP2MaxConcurrentStreamsReceived(connection.id, newMaxStreams: 50), .none)
XCTAssertEqual(state.http2ConnectionGoAwayReceived(connection.id), .none)

XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none)
}
}

0 comments on commit 2fe3f42

Please sign in to comment.