Skip to content

Commit

Permalink
[HTTP2] Test HTTP2 max streams setting is respected. (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett authored Nov 3, 2021
1 parent 18a58bb commit b6fb33b
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections),
("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration),
("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires),
("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected),
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -943,4 +943,149 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
XCTAssertEqual(action3.request, .none)
XCTAssertEqual(action3.connection, .none)
}

func testMaxConcurrentStreamsIsRespected() {
let elg = EmbeddedEventLoopGroup(loops: 4)
defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) }

guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else {
return XCTFail("Test setup failed")
}

let generalPurposeConnection = connections.randomParkedConnection()!
var queuer = MockRequestQueuer()

// schedule 1000 requests on the pool. The first 100 will be executed right away. All others
// shall be queued.
for i in 0..<1000 {
let requestEL = elg.next()
let mockRequest = MockHTTPRequest(eventLoop: requestEL)
let request = HTTPConnectionPool.Request(mockRequest)

let executeAction = state.executeRequest(request)
switch i {
case 0:
XCTAssertEqual(executeAction.connection, .cancelTimeoutTimer(generalPurposeConnection.id))
XCTAssertNoThrow(try connections.activateConnection(generalPurposeConnection.id))
XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
case 1..<100:
XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false))
XCTAssertEqual(executeAction.connection, .none)
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
case 100..<1000:
XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: requestEL))
XCTAssertEqual(executeAction.connection, .none)
XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id))
default:
XCTFail("Unexpected")
}
}

// let's end processing 500 requests. For every finished request, we will execute another one
// right away
while queuer.count > 500 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
return XCTFail("Unexpected request action: \(finishAction.request)")
}
guard requests.count == 1, let request = requests.first else {
return XCTFail("Expected to get exactly one request!")
}
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
}

XCTAssertEqual(queuer.count, 500)

// Next the server allows for more concurrent streams
let newMaxStreams = 200
XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: newMaxStreams))
let newMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: newMaxStreams)
XCTAssertEqual(newMaxStreamsAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = newMaxStreamsAction.request else {
return XCTFail("Unexpected request action after new max concurrent stream setting: \(newMaxStreamsAction.request)")
}
XCTAssertEqual(requests.count, 100, "Expected to execute 100 more requests")
for request in requests {
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
}

XCTAssertEqual(queuer.count, 400)

// let's end processing 100 requests. For every finished request, we will execute another one
// right away
while queuer.count > 300 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
return XCTFail("Unexpected request action: \(finishAction.request)")
}
guard requests.count == 1, let request = requests.first else {
return XCTFail("Expected to get exactly one request!")
}
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
}

// Next the server allows for fewer concurrent streams
let fewerMaxStreams = 50
XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: fewerMaxStreams))
let fewerMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: fewerMaxStreams)
XCTAssertEqual(fewerMaxStreamsAction.connection, .none)
XCTAssertEqual(fewerMaxStreamsAction.request, .none)

// for the next 150 requests that are finished, no new request must be executed.
for _ in 0..<150 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
XCTAssertEqual(state.http2ConnectionStreamClosed(generalPurposeConnection.id), .none)
}

XCTAssertEqual(queuer.count, 300)

// let's end all remaining requests. For every finished request, we will execute another one
// right away
while queuer.count > 0 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
return XCTFail("Unexpected request action: \(finishAction.request)")
}
guard requests.count == 1, let request = requests.first else {
return XCTFail("Expected to get exactly one request!")
}
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
}

// Now we only need to drain the remaining 50 requests on the connection
var timeoutTimerScheduled = false
for remaining in stride(from: 50, through: 1, by: -1) {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.request, .none)
switch remaining {
case 1:
timeoutTimerScheduled = true
XCTAssertEqual(finishAction.connection, .scheduleTimeoutTimer(generalPurposeConnection.id, on: generalPurposeConnection.eventLoop))
XCTAssertNoThrow(try connections.parkConnection(generalPurposeConnection.id))
case 2...50:
XCTAssertEqual(finishAction.connection, .none)
default:
XCTFail("Unexpected value: \(remaining)")
}
}
XCTAssertTrue(timeoutTimerScheduled)
XCTAssertNotNil(connections.randomParkedConnection())
XCTAssertEqual(connections.count, 1)
}
}
99 changes: 99 additions & 0 deletions Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct MockConnectionPool {
case connectionIsNotStarting
case connectionIsNotExecuting
case connectionDoesNotFulfillEventLoopRequirement
case connectionIsNotActive
case connectionIsNotHTTP2Connection
case connectionDoesNotHaveHTTP2StreamAvailable
case connectionBackoffTimerExists
case connectionBackoffTimerNotFound
Expand Down Expand Up @@ -256,6 +258,25 @@ struct MockConnectionPool {
}
}

mutating func newHTTP2SettingsReceived(maxConcurrentStreams newMaxStream: Int) throws {
switch self.state {
case .starting:
throw Errors.connectionIsNotActive

case .http1:
throw Errors.connectionIsNotHTTP2Connection

case .http2(.inUse(_, let used)):
self.state = .http2(.inUse(maxConcurrentStreams: newMaxStream, used: used))

case .http2(.idle(_, let parked, let lastIdle)):
self.state = .http2(.idle(maxConcurrentStreams: newMaxStream, parked: parked, lastIdle: lastIdle))

case .closed:
throw Errors.connectionIsClosed
}
}

mutating func close() throws {
switch self.state {
case .starting:
Expand Down Expand Up @@ -378,6 +399,19 @@ struct MockConnectionPool {
self.backoff.insert(connectionID)
}

mutating func newHTTP2ConnectionSettingsReceived(
_ connectionID: Connection.ID,
maxConcurrentStreams: Int
) throws -> Connection {
guard var connection = self.connections[connectionID] else {
throw Errors.connectionNotFound
}

try connection.newHTTP2SettingsReceived(maxConcurrentStreams: maxConcurrentStreams)
self.connections[connection.id] = connection
return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop)
}

mutating func connectionBackoffTimerDone(_ connectionID: Connection.ID) throws {
guard self.backoff.remove(connectionID) != nil else {
throw Errors.connectionBackoffTimerNotFound
Expand Down Expand Up @@ -561,6 +595,71 @@ extension MockConnectionPool {

return (connections, state)
}

/// Sets up a MockConnectionPool with one established http2 connection
static func http2(
elg: EventLoopGroup,
on eventLoop: EventLoop? = nil,
maxConcurrentStreams: Int = 100
) throws -> (Self, HTTPConnectionPool.StateMachine) {
var state = HTTPConnectionPool.StateMachine(
idGenerator: .init(),
maximumConcurrentHTTP1Connections: 8
)
var connections = MockConnectionPool()
var queuer = MockRequestQueuer()

// 1. Schedule one request to create a connection

let mockRequest = MockHTTPRequest(eventLoop: eventLoop ?? elg.next())
let request = HTTPConnectionPool.Request(mockRequest)
let executeAction = state.executeRequest(request)

guard case .scheduleRequestTimeout(request, on: let waitEL) = executeAction.request, mockRequest.eventLoop === waitEL else {
throw SetupError.expectedRequestToBeAddedToQueue
}

guard case .createConnection(let connectionID, on: let eventLoop) = executeAction.connection else {
throw SetupError.expectedConnectionToBeCreated
}

try connections.createConnection(connectionID, on: eventLoop)
try queuer.queue(mockRequest, id: request.id)

// 2. the connection becomes available

let newConnection = try connections.succeedConnectionCreationHTTP2(connectionID, maxConcurrentStreams: maxConcurrentStreams)
let action = state.newHTTP2ConnectionCreated(newConnection, maxConcurrentStreams: maxConcurrentStreams)

guard case .executeRequestsAndCancelTimeouts([request], newConnection) = action.request else {
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
}

guard case .migration(createConnections: let create, closeConnections: [], scheduleTimeout: nil) = action.connection, create.isEmpty else {
throw SetupError.expectedNoConnectionAction
}

guard try queuer.get(request.id, request: request.__testOnly_wrapped_request()) === mockRequest else {
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
}
try connections.execute(mockRequest, on: newConnection)

// 3. park connection

try connections.finishExecution(newConnection.id)

let expected: HTTPConnectionPool.StateMachine.ConnectionAction = .scheduleTimeoutTimer(
newConnection.id,
on: newConnection.eventLoop
)
guard state.http2ConnectionStreamClosed(newConnection.id) == .init(request: .none, connection: expected) else {
throw SetupError.expectedConnectionToBeParked
}

try connections.parkConnection(newConnection.id)

return (connections, state)
}
}

/// A request that can be used when testing the `HTTPConnectionPool.StateMachine`
Expand Down

0 comments on commit b6fb33b

Please sign in to comment.