Skip to content

Commit

Permalink
Enforce Sendability
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcairo committed Aug 11, 2023
1 parent fb70a0f commit 518911d
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 261 deletions.
9 changes: 6 additions & 3 deletions Sources/HTTPServerWithQuiescingDemo/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ private final class HTTPHandler: ChannelInboundHandler {
case .head(let head):
guard head.version == HTTPVersion(major: 1, minor: 1) else {
context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: head.version, status: .badRequest))), promise: nil)
let boundedContext = NIOLoopBound(context, eventLoop: context.eventLoop)
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { (_: Result<(), Error>) in
context.close(promise: nil)
boundedContext.value.close(promise: nil)
}
return
}
Expand All @@ -46,9 +47,11 @@ private final class HTTPHandler: ChannelInboundHandler {
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
buffer.clear()
buffer.writeStaticString("done with the request now\n")
let boundedContext = NIOLoopBound(context, eventLoop: context.eventLoop)
let boundedSelf = NIOLoopBound(self, eventLoop: context.eventLoop)
_ = context.eventLoop.scheduleTask(in: .seconds(30)) { [buffer] in
context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
boundedContext.value.write(boundedSelf.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
boundedContext.value.writeAndFlush(boundedSelf.value.wrapOutboundOut(.end(nil)), promise: nil)

}
}
Expand Down
35 changes: 18 additions & 17 deletions Sources/NIOExtras/PCAPRingBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
//
//===----------------------------------------------------------------------===//

import NIOConcurrencyHelpers
import NIOCore

// MARK: NIOPCAPRingBuffer
/// Storage for the most recent set of packets captured subject to constraints.
/// Use ``addFragment(_:)`` as the sink to a ``NIOWritePCAPHandler`` and call ``emitPCAP()``
/// when you wish to get the recorded data.
/// - Warning: This class is not thread safe so should only be called from one thread.
public class NIOPCAPRingBuffer {
private var pcapFragments: CircularBuffer<ByteBuffer>
private var pcapCurrentBytes: Int
public final class NIOPCAPRingBuffer: Sendable {
private let pcapFragments: NIOLockedValueBox<CircularBuffer<ByteBuffer>>
private let pcapCurrentBytes: NIOLockedValueBox<Int>
private let maximumFragments: Int
private let maximumBytes: Int

Expand All @@ -34,8 +35,8 @@ public class NIOPCAPRingBuffer {
precondition(maximumBytes > 0)
self.maximumFragments = maximumFragments
self.maximumBytes = maximumBytes
self.pcapCurrentBytes = 0
self.pcapFragments = CircularBuffer(initialCapacity: maximumFragments)
self.pcapCurrentBytes = .init(0)
self.pcapFragments = .init(CircularBuffer(initialCapacity: maximumFragments))
}

/// Initialise the buffer, setting constraints
Expand All @@ -52,17 +53,17 @@ public class NIOPCAPRingBuffer {

@discardableResult
private func popFirst() -> ByteBuffer? {
let popped = self.pcapFragments.popFirst()
let popped = self.pcapFragments.withLockedValue { $0.popFirst() }
if let popped = popped {
self.pcapCurrentBytes -= popped.readableBytes
self.pcapCurrentBytes.withLockedValue { $0 -= popped.readableBytes }
}
return popped
}

private func append(_ buffer: ByteBuffer) {
self.pcapFragments.append(buffer)
self.pcapCurrentBytes += buffer.readableBytes
assert(self.pcapFragments.count <= self.maximumFragments)
self.pcapFragments.withLockedValue { $0.append(buffer) }
self.pcapCurrentBytes.withLockedValue { $0 += buffer.readableBytes }
assert(self.pcapFragments.withLockedValue { $0.count } <= self.maximumFragments)
// It's expected that the caller will have made room if required
// for the fragment but we may well go over on bytes - they're
// expected to fix that afterwards.
Expand All @@ -72,27 +73,27 @@ public class NIOPCAPRingBuffer {
/// - Parameter buffer: ByteBuffer containing a pcap fragment to store
public func addFragment(_ buffer: ByteBuffer) {
// Make sure we don't go over on the number of fragments.
if self.pcapFragments.count >= self.maximumFragments {
if self.pcapFragments.withLockedValue({ $0.count }) >= self.maximumFragments {
self.popFirst()
}
precondition(self.pcapFragments.count < self.maximumFragments)
precondition(self.pcapFragments.withLockedValue { $0.count } < self.maximumFragments)

// Add the new fragment
self.append(buffer)

// Trim if we've exceeded byte limit - this could remove multiple, and indeed all fragments.
while self.pcapCurrentBytes > self.maximumBytes {
while self.pcapCurrentBytes.withLockedValue({ $0 }) > self.maximumBytes {
self.popFirst()
}
precondition(self.pcapCurrentBytes <= self.maximumBytes)
precondition(self.pcapCurrentBytes.withLockedValue { $0 } <= self.maximumBytes)
}

/// Emit the captured data to a consuming function; then clear the captured data.
/// - Returns: A ciruclar buffer of captured fragments.
public func emitPCAP() -> CircularBuffer<ByteBuffer> {
let toReturn = self.pcapFragments // Copy before clearing.
self.pcapFragments.removeAll(keepingCapacity: true)
self.pcapCurrentBytes = 0
let toReturn = self.pcapFragments.withLockedValue { $0 } // Copy before clearing.
self.pcapFragments.withLockedValue { $0.removeAll(keepingCapacity: true) }
self.pcapCurrentBytes.withLockedValue { $0 = 0 }
return toReturn
}
}
Expand Down
33 changes: 20 additions & 13 deletions Sources/NIOExtrasPerformanceTester/HTTP1PCAPPerformanceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,47 @@
//
//===----------------------------------------------------------------------===//

import NIOConcurrencyHelpers
import NIOCore
import NIOExtras
import Foundation

class HTTP1ThreadedPCapPerformanceTest: HTTP1ThreadedPerformanceTest {
private class SinkHolder {
var fileSink: NIOWritePCAPHandler.SynchronizedFileSink!
private final class SinkHolder: Sendable {
let fileSink: NIOLockedValueBox<NIOWritePCAPHandler.SynchronizedFileSink?> = .init(nil)

func setUp() throws {
let outputFile = NSTemporaryDirectory() + "/" + UUID().uuidString
self.fileSink = try NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: outputFile) { error in
print("ERROR: \(error)")
exit(1)
try self.fileSink.withLockedValue {
$0 = try NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: outputFile) { error in
print("ERROR: \(error)")
exit(1)
}
}
}

func tearDown() {
try! self.fileSink.syncClose()
try! self.fileSink.withLockedValue { try $0!.syncClose() }
}
}

init() {
let sinkHolder = SinkHolder()
func addPCap(channel: Channel) -> EventLoopFuture<Void> {
let pcapHandler = NIOWritePCAPHandler(mode: .client,
fileSink: sinkHolder.fileSink.write)
let addPCap: @Sendable (Channel) -> EventLoopFuture<Void> = { channel in
let pcapHandler = NIOWritePCAPHandler(
mode: .client,
fileSink: sinkHolder.fileSink.withLockedValue { $0!.write }
)
return channel.pipeline.addHandler(pcapHandler, position: .first)
}

self.sinkHolder = sinkHolder
super.init(numberOfRepeats: 50,
numberOfClients: System.coreCount,
requestsPerClient: 500,
extraInitialiser: { channel in return addPCap(channel: channel) })
super.init(
numberOfRepeats: 50,
numberOfClients: System.coreCount,
requestsPerClient: 500,
extraInitialiser: { channel in addPCap(channel) }
)
}

private let sinkHolder: SinkHolder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import NIOConcurrencyHelpers
import NIOCore
import NIOPosix
import NIOHTTP1
Expand Down Expand Up @@ -107,7 +108,7 @@ final class RepeatedRequests: ChannelInboundHandler {
let reqPart = self.unwrapInboundIn(data)
if case .end(nil) = reqPart {
if self.remainingNumberOfRequests <= 0 {
context.channel.close().map { self.doneRequests }.cascade(to: self.isDonePromise)
context.channel.close().map { [doneRequests] in doneRequests }.cascade(to: self.isDonePromise)
} else {
self.doneRequests += 1
self.remainingNumberOfRequests -= 1
Expand All @@ -124,7 +125,7 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
let numberOfRepeats: Int
let numberOfClients: Int
let requestsPerClient: Int
let extraInitialiser: (Channel) -> EventLoopFuture<Void>
let extraInitialiser: @Sendable (Channel) -> EventLoopFuture<Void>

let head: HTTPRequestHead

Expand All @@ -134,7 +135,7 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
init(numberOfRepeats: Int,
numberOfClients: Int,
requestsPerClient: Int,
extraInitialiser: @escaping (Channel) -> EventLoopFuture<Void>) {
extraInitialiser: @Sendable @escaping (Channel) -> EventLoopFuture<Void>) {
self.numberOfRepeats = numberOfRepeats
self.numberOfClients = numberOfClients
self.requestsPerClient = requestsPerClient
Expand Down Expand Up @@ -165,21 +166,23 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
var reqs: [Int] = []
reqs.reserveCapacity(self.numberOfRepeats)
for _ in 0..<self.numberOfRepeats {
var requestHandlers: [RepeatedRequests] = []
requestHandlers.reserveCapacity(self.numberOfClients)
let requestHandlerCompletedFutures: NIOLockedValueBox<[EventLoopFuture<Int>]> = .init([])
requestHandlerCompletedFutures.withLockedValue { $0.reserveCapacity(self.numberOfClients) }
var clientChannels: [Channel] = []
clientChannels.reserveCapacity(self.numberOfClients)
for _ in 0 ..< self.numberOfClients {
let clientChannel = try! ClientBootstrap(group: self.group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().flatMap {
let repeatedRequestsHandler = RepeatedRequests(numberOfRequests: self.requestsPerClient,
eventLoop: channel.eventLoop,
head: self.head)
requestHandlers.append(repeatedRequestsHandler)
.channelInitializer { [requestsPerClient, head, extraInitialiser] channel in
return channel.pipeline.addHTTPClientHandlers().flatMap {
let repeatedRequestsHandler = RepeatedRequests(
numberOfRequests: requestsPerClient,
eventLoop: channel.eventLoop,
head: head
)
requestHandlerCompletedFutures.withLockedValue { $0.append(repeatedRequestsHandler.completedFuture) }
return channel.pipeline.addHandler(repeatedRequestsHandler)
}.flatMap {
self.extraInitialiser(channel)
extraInitialiser(channel)
}
}
.connect(to: self.serverChannel.localAddress!)
Expand All @@ -195,8 +198,12 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
let allWrites = EventLoopFuture<Void>.andAllComplete(writeFutures, on: writeFutures.first!.eventLoop)
try! allWrites.wait()

let streamCompletedFutures = requestHandlers.map { rh in rh.completedFuture }
let requestsServed = EventLoopFuture<Int>.reduce(0, streamCompletedFutures, on: streamCompletedFutures.first!.eventLoop, +)
let requestsServed = EventLoopFuture<Int>.reduce(
0,
requestHandlerCompletedFutures.withLockedValue { $0 },
on: requestHandlerCompletedFutures.withLockedValue { $0.first!.eventLoop },
{ $0 + $1 }
)
reqs.append(try! requestsServed.wait())
}
return reqs.reduce(0, +) / self.numberOfRepeats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ import NIOExtras

class HTTP1ThreadedRollingPCapPerformanceTest: HTTP1ThreadedPerformanceTest {
init() {
func addRollingPCap(channel: Channel) -> EventLoopFuture<Void> {
let pcapRingBuffer = NIOPCAPRingBuffer(maximumFragments: 25,
maximumBytes: 1_000_000)
let pcapHandler = NIOWritePCAPHandler(mode: .client,
fileSink: pcapRingBuffer.addFragment)
let addRollingPCap: @Sendable (Channel) -> EventLoopFuture<Void> = { channel in
let pcapRingBuffer = NIOPCAPRingBuffer(
maximumFragments: 25,
maximumBytes: 1_000_000
)
let pcapHandler = NIOWritePCAPHandler(
mode: .client,
fileSink: pcapRingBuffer.addFragment
)
return channel.pipeline.addHandler(pcapHandler, position: .first)
}

super.init(numberOfRepeats: 50,
numberOfClients: System.coreCount,
requestsPerClient: 500,
extraInitialiser: { channel in return addRollingPCap(channel: channel) })
extraInitialiser: { channel in addRollingPCap(channel) })
}
}
3 changes: 2 additions & 1 deletion Sources/NIONFS3/NFSFileSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import NIOCore

public protocol NFS3FileSystemNoAuth {
@preconcurrency
public protocol NFS3FileSystemNoAuth: Sendable {
func mount(_ call: MountCallMount, promise: EventLoopPromise<MountReplyMount>)
func unmount(_ call: MountCallUnmount, promise: EventLoopPromise<MountReplyUnmount>)
func getattr(_ call: NFS3CallGetAttr, promise: EventLoopPromise<NFS3ReplyGetAttr>)
Expand Down
Loading

0 comments on commit 518911d

Please sign in to comment.