diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index 2c335779e..8436d1478 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -40,6 +40,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), ("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires), + ("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index e8a4edbbe..6f2e89142 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -943,4 +943,149 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action3.request, .none) XCTAssertEqual(action3.connection, .none) } + + func testMaxConcurrentStreamsIsRespected() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else { + return XCTFail("Test setup failed") + } + + let generalPurposeConnection = connections.randomParkedConnection()! + var queuer = MockRequestQueuer() + + // schedule 1000 requests on the pool. The first 100 will be executed right away. All others + // shall be queued. + for i in 0..<1000 { + let requestEL = elg.next() + let mockRequest = MockHTTPRequest(eventLoop: requestEL) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + switch i { + case 0: + XCTAssertEqual(executeAction.connection, .cancelTimeoutTimer(generalPurposeConnection.id)) + XCTAssertNoThrow(try connections.activateConnection(generalPurposeConnection.id)) + XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection)) + case 1..<100: + XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false)) + XCTAssertEqual(executeAction.connection, .none) + XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection)) + case 100..<1000: + XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: requestEL)) + XCTAssertEqual(executeAction.connection, .none) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + default: + XCTFail("Unexpected") + } + } + + // let's end processing 500 requests. For every finished request, we will execute another one + // right away + while queuer.count > 500 { + XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id)) + let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id) + XCTAssertEqual(finishAction.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else { + return XCTFail("Unexpected request action: \(finishAction.request)") + } + guard requests.count == 1, let request = requests.first else { + return XCTFail("Expected to get exactly one request!") + } + let mockRequest = request.__testOnly_wrapped_request() + XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection)) + } + + XCTAssertEqual(queuer.count, 500) + + // Next the server allows for more concurrent streams + let newMaxStreams = 200 + XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: newMaxStreams)) + let newMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: newMaxStreams) + XCTAssertEqual(newMaxStreamsAction.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = newMaxStreamsAction.request else { + return XCTFail("Unexpected request action after new max concurrent stream setting: \(newMaxStreamsAction.request)") + } + XCTAssertEqual(requests.count, 100, "Expected to execute 100 more requests") + for request in requests { + let mockRequest = request.__testOnly_wrapped_request() + XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection)) + XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest)) + } + + XCTAssertEqual(queuer.count, 400) + + // let's end processing 100 requests. For every finished request, we will execute another one + // right away + while queuer.count > 300 { + XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id)) + let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id) + XCTAssertEqual(finishAction.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else { + return XCTFail("Unexpected request action: \(finishAction.request)") + } + guard requests.count == 1, let request = requests.first else { + return XCTFail("Expected to get exactly one request!") + } + let mockRequest = request.__testOnly_wrapped_request() + XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection)) + } + + // Next the server allows for fewer concurrent streams + let fewerMaxStreams = 50 + XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: fewerMaxStreams)) + let fewerMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: fewerMaxStreams) + XCTAssertEqual(fewerMaxStreamsAction.connection, .none) + XCTAssertEqual(fewerMaxStreamsAction.request, .none) + + // for the next 150 requests that are finished, no new request must be executed. + for _ in 0..<150 { + XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id)) + XCTAssertEqual(state.http2ConnectionStreamClosed(generalPurposeConnection.id), .none) + } + + XCTAssertEqual(queuer.count, 300) + + // let's end all remaining requests. For every finished request, we will execute another one + // right away + while queuer.count > 0 { + XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id)) + let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id) + XCTAssertEqual(finishAction.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else { + return XCTFail("Unexpected request action: \(finishAction.request)") + } + guard requests.count == 1, let request = requests.first else { + return XCTFail("Expected to get exactly one request!") + } + let mockRequest = request.__testOnly_wrapped_request() + XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection)) + } + + // Now we only need to drain the remaining 50 requests on the connection + var timeoutTimerScheduled = false + for remaining in stride(from: 50, through: 1, by: -1) { + XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id)) + let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id) + XCTAssertEqual(finishAction.request, .none) + switch remaining { + case 1: + timeoutTimerScheduled = true + XCTAssertEqual(finishAction.connection, .scheduleTimeoutTimer(generalPurposeConnection.id, on: generalPurposeConnection.eventLoop)) + XCTAssertNoThrow(try connections.parkConnection(generalPurposeConnection.id)) + case 2...50: + XCTAssertEqual(finishAction.connection, .none) + default: + XCTFail("Unexpected value: \(remaining)") + } + } + XCTAssertTrue(timeoutTimerScheduled) + XCTAssertNotNil(connections.randomParkedConnection()) + XCTAssertEqual(connections.count, 1) + } } diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index db770726d..eedc499ad 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -34,6 +34,8 @@ struct MockConnectionPool { case connectionIsNotStarting case connectionIsNotExecuting case connectionDoesNotFulfillEventLoopRequirement + case connectionIsNotActive + case connectionIsNotHTTP2Connection case connectionDoesNotHaveHTTP2StreamAvailable case connectionBackoffTimerExists case connectionBackoffTimerNotFound @@ -256,6 +258,25 @@ struct MockConnectionPool { } } + mutating func newHTTP2SettingsReceived(maxConcurrentStreams newMaxStream: Int) throws { + switch self.state { + case .starting: + throw Errors.connectionIsNotActive + + case .http1: + throw Errors.connectionIsNotHTTP2Connection + + case .http2(.inUse(_, let used)): + self.state = .http2(.inUse(maxConcurrentStreams: newMaxStream, used: used)) + + case .http2(.idle(_, let parked, let lastIdle)): + self.state = .http2(.idle(maxConcurrentStreams: newMaxStream, parked: parked, lastIdle: lastIdle)) + + case .closed: + throw Errors.connectionIsClosed + } + } + mutating func close() throws { switch self.state { case .starting: @@ -378,6 +399,19 @@ struct MockConnectionPool { self.backoff.insert(connectionID) } + mutating func newHTTP2ConnectionSettingsReceived( + _ connectionID: Connection.ID, + maxConcurrentStreams: Int + ) throws -> Connection { + guard var connection = self.connections[connectionID] else { + throw Errors.connectionNotFound + } + + try connection.newHTTP2SettingsReceived(maxConcurrentStreams: maxConcurrentStreams) + self.connections[connection.id] = connection + return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop) + } + mutating func connectionBackoffTimerDone(_ connectionID: Connection.ID) throws { guard self.backoff.remove(connectionID) != nil else { throw Errors.connectionBackoffTimerNotFound @@ -561,6 +595,71 @@ extension MockConnectionPool { return (connections, state) } + + /// Sets up a MockConnectionPool with one established http2 connection + static func http2( + elg: EventLoopGroup, + on eventLoop: EventLoop? = nil, + maxConcurrentStreams: Int = 100 + ) throws -> (Self, HTTPConnectionPool.StateMachine) { + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8 + ) + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + + // 1. Schedule one request to create a connection + + let mockRequest = MockHTTPRequest(eventLoop: eventLoop ?? elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + let executeAction = state.executeRequest(request) + + guard case .scheduleRequestTimeout(request, on: let waitEL) = executeAction.request, mockRequest.eventLoop === waitEL else { + throw SetupError.expectedRequestToBeAddedToQueue + } + + guard case .createConnection(let connectionID, on: let eventLoop) = executeAction.connection else { + throw SetupError.expectedConnectionToBeCreated + } + + try connections.createConnection(connectionID, on: eventLoop) + try queuer.queue(mockRequest, id: request.id) + + // 2. the connection becomes available + + let newConnection = try connections.succeedConnectionCreationHTTP2(connectionID, maxConcurrentStreams: maxConcurrentStreams) + let action = state.newHTTP2ConnectionCreated(newConnection, maxConcurrentStreams: maxConcurrentStreams) + + guard case .executeRequestsAndCancelTimeouts([request], newConnection) = action.request else { + throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow + } + + guard case .migration(createConnections: let create, closeConnections: [], scheduleTimeout: nil) = action.connection, create.isEmpty else { + throw SetupError.expectedNoConnectionAction + } + + guard try queuer.get(request.id, request: request.__testOnly_wrapped_request()) === mockRequest else { + throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow + } + try connections.execute(mockRequest, on: newConnection) + + // 3. park connection + + try connections.finishExecution(newConnection.id) + + let expected: HTTPConnectionPool.StateMachine.ConnectionAction = .scheduleTimeoutTimer( + newConnection.id, + on: newConnection.eventLoop + ) + guard state.http2ConnectionStreamClosed(newConnection.id) == .init(request: .none, connection: expected) else { + throw SetupError.expectedConnectionToBeParked + } + + try connections.parkConnection(newConnection.id) + + return (connections, state) + } } /// A request that can be used when testing the `HTTPConnectionPool.StateMachine`