Skip to content

Commit

Permalink
[HTTP2] Create new connections during migration if needed (#459)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnadoba authored Oct 27, 2021
1 parent 1361ecc commit 4147fd6
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ extension HTTPConnectionPool {
}
}

var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .leased, .backingOff, .idle, .starting:
return true
case .closed:
return false
}
}

var isLeased: Bool {
switch self.state {
case .leased:
Expand Down Expand Up @@ -281,6 +290,10 @@ extension HTTPConnectionPool {
return connecting
}

private var maximumAdditionalGeneralPurposeConnections: Int {
self.maximumConcurrentConnections - (self.overflowIndex - 1)
}

/// Is there at least one connection that is able to run requests
var hasActiveConnections: Bool {
self.connections.contains(where: { $0.isIdle || $0.isLeased })
Expand Down Expand Up @@ -530,8 +543,8 @@ extension HTTPConnectionPool {
return migrationContext
}

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
Expand All @@ -541,17 +554,96 @@ extension HTTPConnectionPool {
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
self.connections.insert(newConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}

for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
self.connections.insert(backingOffConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}
}

/// We will create new connections for each `requiredEventLoopOfPendingRequests`
/// In addition, we also create more general purpose connections if we do not have enough to execute
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
/// until we reach `maximumConcurrentConnections`
/// - Parameters:
/// - requiredEventLoopsForPendingRequests:
/// event loops for which we have requests with a required event loop.
/// Duplicates are not allowed.
/// - generalPurposeRequestCountPerPreferredEventLoop:
/// request count with no required event loop,
/// grouped by preferred event loop and ordered descending by number of requests
/// - Returns: new connections that must be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(EventLoop, Int)],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop

// we may already start connections for those requests and do not want to start to many
let startingRequiredEventLoopConnectionCount = Dictionary(
self.connections[self.overflowIndex..<self.connections.endIndex].lazy.map {
($0.eventLoop.id, 1)
},
uniquingKeysWith: +
)
var connectionToCreate = requiredEventLoopOfPendingRequests
.flatMap { (eventLoop, requestCount) -> [(Connection.ID, EventLoop)] in
// We need a connection for each queued request with a required event loop.
// Therefore, we look how many request we have queued for a given `eventLoop` and
// how many connections we are already starting on the given `eventLoop`.
// If we have not enough, we will create additional connections to have at least
// on connection per request.
let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0]
return stride(from: 0, to: connectionsToStart, by: 1).lazy.map { _ in
(self.createNewOverflowConnection(on: eventLoop), eventLoop)
}
}

// create new connections for requests without a required event loop

// TODO: improve algorithm to create connections uniformly across all preferred event loops
// while paying attention to the number of queued request per event loop
// Currently we start by creating new connections on the event loop with the most queued
// requests. If we have create a enough connections to cover all requests for the given
// event loop we will continue with the event loop with the second most queued requests
// and so on and so forth. We do not need to sort the array because
let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop
// we do not want to allocated intermediate arrays.
.lazy
// we flatten the grouped list of event loops by lazily repeating the event loop
// for each request.
// As a result we get one event loop per request (`[EventLoop]`).
.flatMap { eventLoop, requestCount in
repeatElement(eventLoop, count: requestCount)
}
// we may already start connections and do not want to start too many
.dropLast(self.startingGeneralPurposeConnections)
// we need to respect the used defined `maximumConcurrentConnections`
.prefix(self.maximumAdditionalGeneralPurposeConnections)
// we now create a connection for each remaining event loop
.map { eventLoop in
(self.createNewConnection(on: eventLoop), eventLoop)
}

connectionToCreate.append(contentsOf: newGeneralPurposeConnections)

return connectionToCreate
}

// MARK: Shutdown

mutating func shutdown() -> CleanupContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,40 @@ extension HTTPConnectionPool {
http2Connections: HTTP2Connections,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty)
precondition(self.http2Connections == nil)
precondition(self.requests.isEmpty)
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http2Connections == nil, "expected an empty state machine but http2Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests

// we may have remaining open http1 connections from a pervious migration to http2
if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()

self.connections.migrateFromHTTP2(
starting: migration.starting,
backingOff: migration.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: requests.requestCountGroupedByRequiredEventLoop(),
generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop()
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Close all idle connections from context.close
// TODO: Start new http1 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

self.requests = requests

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: migration.close,
createConnections: createConnections
)
}

// MARK: - Events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ extension HTTPConnectionPool {

// MARK: Migration

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
Expand All @@ -368,6 +368,31 @@ extension HTTPConnectionPool {
}
}

/// We will create new connections for `requiredEventLoopsOfPendingRequests`
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
/// - Parameters:
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [EventLoop]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
self.connections.lazy
.filter {
$0.canOrWillBeAbleToExecuteRequests
}.map {
$0.eventLoop.id
}
)
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
else { return nil }
let connectionID = self.createNewConnection(on: eventLoop)
return (connectionID, eventLoop)
}
}

struct HTTP2ToHTTP1MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ extension HTTPConnectionPool {
http2Connections: HTTP2Connections?,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.http1Connections == nil)
precondition(self.connections.isEmpty)
precondition(self.requests.isEmpty)
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http1Connections == nil, "expected an empty state machine but http1Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests

// we may have remaining open http2 connections from a pervious migration to http1
if let http2Connections = http2Connections {
self.connections = http2Connections
}
Expand All @@ -107,17 +110,19 @@ extension HTTPConnectionPool {
backingOff: context.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
)

if !http1Connections.isEmpty {
self.http1Connections = http1Connections
}

self.requests = requests

// TODO: Close all idle connections from context.close
// TODO: Start new http2 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: context.close,
createConnections: createConnections
)
}

mutating func executeRequest(_ request: Request) -> Action {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@

import NIOCore

private struct HashableEventLoop: Hashable {
static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool {
lhs.eventLoop === rhs.eventLoop
}

init(_ eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

let eventLoop: EventLoop
func hash(into hasher: inout Hasher) {
self.eventLoop.id.hash(into: &hasher)
}
}

extension HTTPConnectionPool {
/// A struct to store all queued requests.
struct RequestQueue {
Expand Down Expand Up @@ -131,6 +146,42 @@ extension HTTPConnectionPool {
}
return nil
}

/// - Returns: event loops with at least one request with a required event loop
func eventLoopsWithPendingRequests() -> [EventLoop] {
self.eventLoopQueues.compactMap {
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`
/// however, a queue can be empty
$0.value.first?.requiredEventLoop!
}
}

/// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order
func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] {
self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`,
/// however, a queue can be empty
guard let requiredEventLoop = requests.first?.requiredEventLoop! else {
return nil
}
return (requiredEventLoop, requests.count)
}
}

/// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests
func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] {
let requestCountPerEventLoop = Dictionary(
self.generalPurposeQueue.lazy.map { request in
(HashableEventLoop(request.preferredEventLoop), 1)
},
uniquingKeysWith: +
)
return requestCountPerEventLoop.lazy
.map { ($0.key.eventLoop, $0.value) }
.sorted { lhs, rhs in
lhs.1 > rhs.1
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests {
("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition),
("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition),
("testShutdown", testShutdown),
("testMigrationFromHTTP2", testMigrationFromHTTP2),
("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop),
("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop),
("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection),
("testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections", testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections),
("testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests", testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests),
]
}
}
Loading

0 comments on commit 4147fd6

Please sign in to comment.