diff --git a/Package.swift b/Package.swift index 311664bea..ec15df574 100644 --- a/Package.swift +++ b/Package.swift @@ -168,6 +168,7 @@ let package = Package( "BucketQueueModels", .product(name: "DateProvider", package: "CommandLineToolkit"), "EmceeLogging", + "Extensions", "QueueModels", "RunnerModels", "TestHistoryModels", @@ -411,6 +412,7 @@ let package = Package( .target( name: "DistWorkerModels", dependencies: [ + "Extensions", "LoggingSetup", "MetricsExtensions", "QueueModels", @@ -603,6 +605,7 @@ let package = Package( .target( name: "EventBus", dependencies: [ + "Extensions", "RunnerModels", ], path: "Sources/EventBus" @@ -732,6 +735,7 @@ let package = Package( .target( name: "ListeningSemaphore", dependencies: [ + "Extensions", ], path: "Sources/ListeningSemaphore" ), diff --git a/Sources/BalancingBucketQueue/New/MultipleQueuesContainer.swift b/Sources/BalancingBucketQueue/New/MultipleQueuesContainer.swift index f02d77db5..068e3349d 100644 --- a/Sources/BalancingBucketQueue/New/MultipleQueuesContainer.swift +++ b/Sources/BalancingBucketQueue/New/MultipleQueuesContainer.swift @@ -1,99 +1,96 @@ import CountedSet +import Extensions import Foundation import QueueModels public final class MultipleQueuesContainer { - private let syncQueue = DispatchQueue(label: "MultipleQueuesContainer.syncQueue") - private let exclusiveAccessLock = NSLock() + private let discreteAccessLock = NSLock() + private let continousAccessLock = NSLock() public init() {} public func performWithExclusiveAccess( work: () throws -> T ) rethrows -> T { - exclusiveAccessLock.lock() - defer { - exclusiveAccessLock.unlock() - } - return try work() + try continousAccessLock.whileLocked(work) } public func runningAndDeletedJobQueues() -> [JobQueue] { - syncQueue.sync { - runningJobQueues_onSyncQueue + deletedJobQueues_onSyncQueue + discreteAccessLock.whileLocked { + unsafe_runningJobQueues + unsafe_deletedJobQueues } } // MARK: - JobGroups - private var runningJobGroups_onSyncQueue = CountedSet() + private var unsafe_runningJobGroups = CountedSet() public func track(jobGroup: JobGroup) { - syncQueue.sync { - _ = runningJobGroups_onSyncQueue.update(with: jobGroup) + discreteAccessLock.whileLocked { + _ = unsafe_runningJobGroups.update(with: jobGroup) } } public func untrack(jobGroup: JobGroup) { - syncQueue.sync { - _ = runningJobGroups_onSyncQueue.remove(jobGroup) + discreteAccessLock.whileLocked { + _ = unsafe_runningJobGroups.remove(jobGroup) } } public func trackedJobGroups() -> [JobGroup] { - syncQueue.sync { - Array(runningJobGroups_onSyncQueue) + discreteAccessLock.whileLocked { + Array(unsafe_runningJobGroups) } } // MARK: - Running Job Queues - private var runningJobQueues_onSyncQueue = [JobQueue]() + private var unsafe_runningJobQueues = [JobQueue]() public func runningJobQueues(jobId: JobId) -> [JobQueue] { - syncQueue.sync { - runningJobQueues_onSyncQueue.filter { $0.job.jobId == jobId } + discreteAccessLock.whileLocked { + unsafe_runningJobQueues.filter { $0.job.jobId == jobId } } } public func add(runningJobQueue: JobQueue) { - syncQueue.sync { - runningJobQueues_onSyncQueue.append(runningJobQueue) - runningJobQueues_onSyncQueue.sort { $0.executionOrder(relativeTo: $1) == .before } + discreteAccessLock.whileLocked { + unsafe_runningJobQueues.append(runningJobQueue) + unsafe_runningJobQueues.sort { $0.executionOrder(relativeTo: $1) == .before } } } public func removeRunningJobQueues(jobId: JobId) { - syncQueue.sync { - runningJobQueues_onSyncQueue.removeAll(where: { $0.job.jobId == jobId }) + discreteAccessLock.whileLocked { + unsafe_runningJobQueues.removeAll(where: { $0.job.jobId == jobId }) } } public func allRunningJobQueues() -> [JobQueue] { - syncQueue.sync { - runningJobQueues_onSyncQueue + discreteAccessLock.whileLocked { + unsafe_runningJobQueues } } // MARK: - Deleted Job Queues - private var deletedJobQueues_onSyncQueue = [JobQueue]() + private var unsafe_deletedJobQueues = [JobQueue]() public func add(deletedJobQueues: [JobQueue]) { - syncQueue.sync { - deletedJobQueues_onSyncQueue.append(contentsOf: deletedJobQueues) + discreteAccessLock.whileLocked { + unsafe_deletedJobQueues.append(contentsOf: deletedJobQueues) } } public func allDeletedJobQueues() -> [JobQueue] { - syncQueue.sync { - deletedJobQueues_onSyncQueue + discreteAccessLock.whileLocked { + unsafe_deletedJobQueues } } public func removeFromDeleted(jobId: JobId) { - syncQueue.sync { - deletedJobQueues_onSyncQueue.removeAll(where: { $0.job.jobId == jobId }) + discreteAccessLock.whileLocked { + unsafe_deletedJobQueues.removeAll(where: { $0.job.jobId == jobId }) } } } diff --git a/Sources/BalancingBucketQueue/New/MultipleQueuesJobResultsProvider.swift b/Sources/BalancingBucketQueue/New/MultipleQueuesJobResultsProvider.swift index de6a7c16e..778ace737 100644 --- a/Sources/BalancingBucketQueue/New/MultipleQueuesJobResultsProvider.swift +++ b/Sources/BalancingBucketQueue/New/MultipleQueuesJobResultsProvider.swift @@ -9,13 +9,10 @@ public final class MultipleQueuesJobResultsProvider: JobResultsProvider { } public func results(jobId: JobId) throws -> JobResults { - try multipleQueuesContainer.performWithExclusiveAccess { - - if let jobQueue = multipleQueuesContainer.runningAndDeletedJobQueues().first(where: { jobQueue in jobQueue.job.jobId == jobId }) { - return JobResults(jobId: jobId, testingResults: jobQueue.resultsCollector.collectedResults) - } - - throw NoQueueForJobIdFoundError.noQueue(jobId: jobId) + if let jobQueue = multipleQueuesContainer.runningAndDeletedJobQueues().first(where: { jobQueue in jobQueue.job.jobId == jobId }) { + return JobResults(jobId: jobId, testingResults: jobQueue.resultsCollector.collectedResults) } + + throw NoQueueForJobIdFoundError.noQueue(jobId: jobId) } } diff --git a/Sources/BalancingBucketQueue/New/MultipleQueuesJobStateProvider.swift b/Sources/BalancingBucketQueue/New/MultipleQueuesJobStateProvider.swift index 276c3d3ad..4350e5ef1 100644 --- a/Sources/BalancingBucketQueue/New/MultipleQueuesJobStateProvider.swift +++ b/Sources/BalancingBucketQueue/New/MultipleQueuesJobStateProvider.swift @@ -17,22 +17,20 @@ public final class MultipleQueuesJobStateProvider: JobStateProvider { } public func state(jobId: JobId) throws -> JobState { - try multipleQueuesContainer.performWithExclusiveAccess { - if let jobQueue = multipleQueuesContainer.allRunningJobQueues().first(where: { $0.job.jobId == jobId }) { - return JobState( - jobId: jobId, - queueState: QueueState.running(jobQueue.bucketQueue.runningQueueState) - ) - } - - if multipleQueuesContainer.allDeletedJobQueues().first(where: { $0.job.jobId == jobId }) != nil { - return JobState( - jobId: jobId, - queueState: QueueState.deleted - ) - } - - throw NoQueueForJobIdFoundError.noQueue(jobId: jobId) + if let jobQueue = multipleQueuesContainer.allRunningJobQueues().first(where: { $0.job.jobId == jobId }) { + return JobState( + jobId: jobId, + queueState: QueueState.running(jobQueue.bucketQueue.runningQueueState) + ) } + + if multipleQueuesContainer.allDeletedJobQueues().first(where: { $0.job.jobId == jobId }) != nil { + return JobState( + jobId: jobId, + queueState: QueueState.deleted + ) + } + + throw NoQueueForJobIdFoundError.noQueue(jobId: jobId) } } diff --git a/Sources/BucketQueue/BucketQueueHolder.swift b/Sources/BucketQueue/BucketQueueHolder.swift index c61c4ed64..39376250a 100644 --- a/Sources/BucketQueue/BucketQueueHolder.swift +++ b/Sources/BucketQueue/BucketQueueHolder.swift @@ -1,11 +1,12 @@ import BucketQueueModels +import Extensions import Foundation public final class BucketQueueHolder { private var enqueuedBuckets = [EnqueuedBucket]() private var dequeuedBuckets = Set() - private let syncQueue = DispatchQueue(label: "BucketQueueHolder.syncQueue") + private let accessLock = NSLock() private let exclusiveAccessLock = NSRecursiveLock() public init() {} @@ -13,41 +14,37 @@ public final class BucketQueueHolder { public func performWithExclusiveAccess( work: () throws -> T ) rethrows -> T { - exclusiveAccessLock.lock() - defer { - exclusiveAccessLock.unlock() - } - return try work() + try exclusiveAccessLock.whileLocked(work) } public func removeAllEnqueuedBuckets() { - syncQueue.sync { + accessLock.whileLocked { enqueuedBuckets.removeAll() } } public var allEnqueuedBuckets: [EnqueuedBucket] { - syncQueue.sync { enqueuedBuckets } + accessLock.whileLocked { enqueuedBuckets } } public var allDequeuedBuckets: Set { - syncQueue.sync { dequeuedBuckets } + accessLock.whileLocked { dequeuedBuckets } } public func remove(dequeuedBucket: DequeuedBucket) { - syncQueue.sync { + accessLock.whileLocked { _ = dequeuedBuckets.remove(dequeuedBucket) } } public func insert(enqueuedBuckets: [EnqueuedBucket], position: Int) { - syncQueue.sync { + accessLock.whileLocked { self.enqueuedBuckets.insert(contentsOf: enqueuedBuckets, at: position) } } public func replacePreviouslyEnqueuedBucket(withDequeuedBucket dequeuedBucket: DequeuedBucket) { - syncQueue.sync { + accessLock.whileLocked { enqueuedBuckets.removeAll(where: { $0 == dequeuedBucket.enqueuedBucket }) _ = dequeuedBuckets.insert(dequeuedBucket) } diff --git a/Sources/Deployer/Deployer.swift b/Sources/Deployer/Deployer.swift index 40210e2b2..693e84cfa 100644 --- a/Sources/Deployer/Deployer.swift +++ b/Sources/Deployer/Deployer.swift @@ -48,17 +48,18 @@ open class Deployer { * from a URL with a package of the DeployableItem to a corresponding DeployableItem */ private func prepareDeployables() throws -> [AbsolutePath: DeployableItem] { - let syncQueue = DispatchQueue(label: "ru.avito.Deployer.syncQueue") + let syncQueue = DispatchQueue(label: "Deployer.syncQueue") var deployablesFailedToPrepare = [DeployableItem]() var pathToDeployable = [AbsolutePath: DeployableItem]() let packager = Packager(processControllerProvider: processControllerProvider) let queue = DispatchQueue( - label: "ru.avito.Deployer", + label: "Deployer.queue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, - target: nil) + target: DispatchQueue.global() + ) let group = DispatchGroup() for deployable in deployables { group.enter() diff --git a/Sources/DistWorker/DistWorker.swift b/Sources/DistWorker/DistWorker.swift index 10398b849..21bc2732c 100644 --- a/Sources/DistWorker/DistWorker.swift +++ b/Sources/DistWorker/DistWorker.swift @@ -35,7 +35,12 @@ import WorkerCapabilities public final class DistWorker: SchedulerDataSource, SchedulerDelegate { private let di: DI - private let callbackQueue = DispatchQueue(label: "DistWorker.callbackQueue", qos: .default, attributes: .concurrent) + private let callbackQueue = DispatchQueue( + label: "DistWorker.callbackQueue", + qos: .default, + attributes: .concurrent, + target: .global() + ) private let currentlyBeingProcessedBucketsTracker = DefaultCurrentlyBeingProcessedBucketsTracker() private let httpRestServer: HTTPRESTServer private let version: Version diff --git a/Sources/DistWorkerModels/WorkerConfigurations.swift b/Sources/DistWorkerModels/WorkerConfigurations.swift index b2eb2bfa4..89c699025 100644 --- a/Sources/DistWorkerModels/WorkerConfigurations.swift +++ b/Sources/DistWorkerModels/WorkerConfigurations.swift @@ -1,22 +1,23 @@ import Dispatch +import Extensions import Foundation import QueueModels public final class WorkerConfigurations { - private let queue = DispatchQueue(label: "WorkerConfigurations.queue") + private let lock = NSLock() private var workerIdToRunConfiguration = [WorkerId: WorkerConfiguration]() public init() {} public func add(workerId: WorkerId, configuration: WorkerConfiguration) { - queue.sync { workerIdToRunConfiguration[workerId] = configuration } + lock.whileLocked { workerIdToRunConfiguration[workerId] = configuration } } public func workerConfiguration(workerId: WorkerId) -> WorkerConfiguration? { - return queue.sync { workerIdToRunConfiguration[workerId] } + lock.whileLocked { workerIdToRunConfiguration[workerId] } } public var workerIds: Set { - return Set(workerIdToRunConfiguration.keys) + return Set(lock.whileLocked{ workerIdToRunConfiguration.keys }) } } diff --git a/Sources/EventBus/EventBus.swift b/Sources/EventBus/EventBus.swift index 7f52aa713..198a822e8 100644 --- a/Sources/EventBus/EventBus.swift +++ b/Sources/EventBus/EventBus.swift @@ -1,21 +1,22 @@ import Dispatch +import Extensions import Foundation public final class EventBus { private var streams = [EventStream]() - private let workQueue = DispatchQueue(label: "ru.avito.EventBus.workQueue") + private let lock = NSLock() private let eventDeliveryQueue = DispatchQueue(label: "ru.avito.EventBus.eventDeliveryQueue") public init() {} public func add(stream: EventStream) { - workQueue.sync { + lock.whileLocked { streams.append(stream) } } public func post(event: BusEvent) { - workQueue.sync { + lock.whileLocked { streams.forEach { stream in eventDeliveryQueue.async { stream.process(event: event) @@ -32,6 +33,6 @@ public final class EventBus { public func tearDown() { post(event: .tearDown) - eventDeliveryQueue.sync {} + eventDeliveryQueue.sync(flags: .barrier) {} } } diff --git a/Sources/ListeningSemaphore/SettableOperation.swift b/Sources/ListeningSemaphore/SettableOperation.swift index 838b9642e..cb14c645b 100644 --- a/Sources/ListeningSemaphore/SettableOperation.swift +++ b/Sources/ListeningSemaphore/SettableOperation.swift @@ -1,8 +1,9 @@ import Dispatch +import Extensions import Foundation final class SettableOperation: Operation, CascadeCancellable { - private let syncQueue = DispatchQueue(label: "ru.avito.SettableOperation.syncQueue") + private let lock = NSLock() private var cascaseCancellableDependencies = [Operation]() private var isAbleToRun: Bool { willSet { @@ -30,7 +31,7 @@ final class SettableOperation: Operation, CascadeCancellable { } public func unblock() { - syncQueue.sync { isAbleToRun = true } + lock.whileLocked { isAbleToRun = true } } func addCascadeCancellableDependency(_ operation: Operation) { diff --git a/Sources/QueueCommunication/DefaultWorkerUtilizationStatusPoller.swift b/Sources/QueueCommunication/DefaultWorkerUtilizationStatusPoller.swift index 839cbd95a..11b56fde6 100644 --- a/Sources/QueueCommunication/DefaultWorkerUtilizationStatusPoller.swift +++ b/Sources/QueueCommunication/DefaultWorkerUtilizationStatusPoller.swift @@ -75,16 +75,16 @@ public class DefaultWorkerUtilizationStatusPoller: WorkerUtilizationStatusPoller } public func utilizationPermissionForWorker(workerId: WorkerId) -> WorkerUtilizationPermission { - return workerIdsToUtilize.withExclusiveAccess { workerIds in - return workerIds.contains(workerId) ? .allowedToUtilize : .notAllowedToUtilize - } + workerIdsToUtilize.currentValue().contains(workerId) ? .allowedToUtilize : .notAllowedToUtilize } private func reportMetric() { - workerIdsToUtilize.withExclusiveAccess { - globalMetricRecorder.capture( - NumberOfWorkersToUtilizeMetric(emceeVersion: emceeVersion, queueHost: queueHost, workersCount: $0.count) + globalMetricRecorder.capture( + NumberOfWorkersToUtilizeMetric( + emceeVersion: emceeVersion, + queueHost: queueHost, + workersCount: workerIdsToUtilize.currentValue().count ) - } + ) } } diff --git a/Sources/Runner/Metrics/MetricReportingTestRunnerStream.swift b/Sources/Runner/Metrics/MetricReportingTestRunnerStream.swift index 3f8a041da..68a134dd0 100644 --- a/Sources/Runner/Metrics/MetricReportingTestRunnerStream.swift +++ b/Sources/Runner/Metrics/MetricReportingTestRunnerStream.swift @@ -124,17 +124,15 @@ public final class MetricReportingTestRunnerStream: TestRunnerStream { value = nil } - willRunEventTimestamp.withExclusiveAccess { value in - if let streamOpenEventTimestamp = value { - specificMetricRecorder.capture( - UselessTestRunnerInvocationMetric( - host: host, - version: version, - duration: dateProvider.currentDate().timeIntervalSince(streamOpenEventTimestamp), - timestamp: dateProvider.currentDate() - ) + if let streamOpenEventTimestamp = willRunEventTimestamp.currentValue() { + specificMetricRecorder.capture( + UselessTestRunnerInvocationMetric( + host: host, + version: version, + duration: dateProvider.currentDate().timeIntervalSince(streamOpenEventTimestamp), + timestamp: dateProvider.currentDate() ) - } + ) } } } diff --git a/Sources/WorkerAlivenessProvider/WorkerAlivenessProviderImpl.swift b/Sources/WorkerAlivenessProvider/WorkerAlivenessProviderImpl.swift index 5848b4e10..f927b6189 100644 --- a/Sources/WorkerAlivenessProvider/WorkerAlivenessProviderImpl.swift +++ b/Sources/WorkerAlivenessProvider/WorkerAlivenessProviderImpl.swift @@ -11,9 +11,9 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider { private let lock = NSLock() private let knownWorkerIds: Set private let logger: ContextualLogger - private var registeredWorkerIds = Set() - private var disabledWorkerIds = Set() - private var silentWorkerIds = Set() + private var unsafe_registeredWorkerIds = Set() + private var unsafe_disabledWorkerIds = Set() + private var unsafe_silentWorkerIds = Set() private let workerPermissionProvider: WorkerPermissionProvider private let workerBucketIdsBeingProcessed: WorkerCurrentlyProcessingBucketsTracker @@ -56,13 +56,13 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider { public func isWorkerRegistered(workerId: WorkerId) -> Bool { lock.whileLocked { - registeredWorkerIds.contains(workerId) + unsafe_registeredWorkerIds.contains(workerId) } } public func didRegisterWorker(workerId: WorkerId) { lock.whileLocked { - registeredWorkerIds.insert(workerId) + unsafe_registeredWorkerIds.insert(workerId) unsafe_markWorkerAsAlive(workerId: workerId) } } @@ -75,31 +75,31 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider { public func alivenessForWorker(workerId: WorkerId) -> WorkerAliveness { lock.whileLocked { - onSyncQueue_alivenessForWorker(workerId: workerId) + unsafe_alivenessForWorker(workerId: workerId) } } public func enableWorker(workerId: WorkerId) { lock.whileLocked { - if disabledWorkerIds.contains(workerId) { + if unsafe_disabledWorkerIds.contains(workerId) { logger.debug("Enabling \(workerId)") - _ = disabledWorkerIds.remove(workerId) + _ = unsafe_disabledWorkerIds.remove(workerId) } } } public func disableWorker(workerId: WorkerId) { lock.whileLocked { - if !disabledWorkerIds.contains(workerId) { + if !unsafe_disabledWorkerIds.contains(workerId) { logger.debug("Disabling \(workerId)") - _ = disabledWorkerIds.insert(workerId) + _ = unsafe_disabledWorkerIds.insert(workerId) } } } public func isWorkerEnabled(workerId: WorkerId) -> Bool { lock.whileLocked { - !disabledWorkerIds.contains(workerId) + !unsafe_disabledWorkerIds.contains(workerId) } } @@ -111,40 +111,40 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider { public func isWorkerSilent(workerId: WorkerId) -> Bool { lock.whileLocked { - silentWorkerIds.contains(workerId) + unsafe_silentWorkerIds.contains(workerId) } } private func unsafe_markWorkerAsAlive(workerId: WorkerId) { - if silentWorkerIds.contains(workerId) { + if unsafe_silentWorkerIds.contains(workerId) { logger.debug("Marking \(workerId) as alive") - _ = silentWorkerIds.remove(workerId) + _ = unsafe_silentWorkerIds.remove(workerId) } } private func unsafe_markWorkerAsSilent(workerId: WorkerId) { - if !silentWorkerIds.contains(workerId) { + if !unsafe_silentWorkerIds.contains(workerId) { logger.debug("Marking \(workerId) as silent") - _ = silentWorkerIds.insert(workerId) + _ = unsafe_silentWorkerIds.insert(workerId) } } private func unsafe_workerAliveness() -> [WorkerId: WorkerAliveness] { - var workerAliveness = [WorkerId: WorkerAliveness]() + var workerAliveness = [WorkerId: WorkerAliveness](minimumCapacity: knownWorkerIds.count) for id in knownWorkerIds { - workerAliveness[id] = onSyncQueue_alivenessForWorker(workerId: id) + workerAliveness[id] = unsafe_alivenessForWorker(workerId: id) } return workerAliveness } - private func onSyncQueue_alivenessForWorker(workerId: WorkerId) -> WorkerAliveness { + private func unsafe_alivenessForWorker(workerId: WorkerId) -> WorkerAliveness { WorkerAliveness( - registered: registeredWorkerIds.contains(workerId), + registered: unsafe_registeredWorkerIds.contains(workerId), bucketIdsBeingProcessed: workerBucketIdsBeingProcessed.bucketIdsBeingProcessedBy( workerId: workerId ), - disabled: disabledWorkerIds.contains(workerId), - silent: silentWorkerIds.contains(workerId), + disabled: unsafe_disabledWorkerIds.contains(workerId), + silent: unsafe_silentWorkerIds.contains(workerId), workerUtilizationPermission: workerPermissionProvider.utilizationPermissionForWorker( workerId: workerId )