Skip to content

Commit

Permalink
Replace queues with locks
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Alekseev committed Mar 21, 2021
1 parent 1a6fa19 commit e28b5ff
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 121 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ let package = Package(
"BucketQueueModels",
.product(name: "DateProvider", package: "CommandLineToolkit"),
"EmceeLogging",
"Extensions",
"QueueModels",
"RunnerModels",
"TestHistoryModels",
Expand Down Expand Up @@ -411,6 +412,7 @@ let package = Package(
.target(
name: "DistWorkerModels",
dependencies: [
"Extensions",
"LoggingSetup",
"MetricsExtensions",
"QueueModels",
Expand Down Expand Up @@ -603,6 +605,7 @@ let package = Package(
.target(
name: "EventBus",
dependencies: [
"Extensions",
"RunnerModels",
],
path: "Sources/EventBus"
Expand Down Expand Up @@ -732,6 +735,7 @@ let package = Package(
.target(
name: "ListeningSemaphore",
dependencies: [
"Extensions",
],
path: "Sources/ListeningSemaphore"
),
Expand Down
63 changes: 30 additions & 33 deletions Sources/BalancingBucketQueue/New/MultipleQueuesContainer.swift
Original file line number Diff line number Diff line change
@@ -1,99 +1,96 @@
import CountedSet
import Extensions
import Foundation
import QueueModels

public final class MultipleQueuesContainer {
private let syncQueue = DispatchQueue(label: "MultipleQueuesContainer.syncQueue")
private let exclusiveAccessLock = NSLock()
private let discreteAccessLock = NSLock()
private let continousAccessLock = NSLock()

public init() {}

public func performWithExclusiveAccess<T>(
work: () throws -> T
) rethrows -> T {
exclusiveAccessLock.lock()
defer {
exclusiveAccessLock.unlock()
}
return try work()
try continousAccessLock.whileLocked(work)
}

public func runningAndDeletedJobQueues() -> [JobQueue] {
syncQueue.sync {
runningJobQueues_onSyncQueue + deletedJobQueues_onSyncQueue
discreteAccessLock.whileLocked {
unsafe_runningJobQueues + unsafe_deletedJobQueues
}
}

// MARK: - JobGroups

private var runningJobGroups_onSyncQueue = CountedSet<JobGroup>()
private var unsafe_runningJobGroups = CountedSet<JobGroup>()

public func track(jobGroup: JobGroup) {
syncQueue.sync {
_ = runningJobGroups_onSyncQueue.update(with: jobGroup)
discreteAccessLock.whileLocked {
_ = unsafe_runningJobGroups.update(with: jobGroup)
}
}

public func untrack(jobGroup: JobGroup) {
syncQueue.sync {
_ = runningJobGroups_onSyncQueue.remove(jobGroup)
discreteAccessLock.whileLocked {
_ = unsafe_runningJobGroups.remove(jobGroup)
}
}

public func trackedJobGroups() -> [JobGroup] {
syncQueue.sync {
Array(runningJobGroups_onSyncQueue)
discreteAccessLock.whileLocked {
Array(unsafe_runningJobGroups)
}
}

// MARK: - Running Job Queues

private var runningJobQueues_onSyncQueue = [JobQueue]()
private var unsafe_runningJobQueues = [JobQueue]()

public func runningJobQueues(jobId: JobId) -> [JobQueue] {
syncQueue.sync {
runningJobQueues_onSyncQueue.filter { $0.job.jobId == jobId }
discreteAccessLock.whileLocked {
unsafe_runningJobQueues.filter { $0.job.jobId == jobId }
}
}

public func add(runningJobQueue: JobQueue) {
syncQueue.sync {
runningJobQueues_onSyncQueue.append(runningJobQueue)
runningJobQueues_onSyncQueue.sort { $0.executionOrder(relativeTo: $1) == .before }
discreteAccessLock.whileLocked {
unsafe_runningJobQueues.append(runningJobQueue)
unsafe_runningJobQueues.sort { $0.executionOrder(relativeTo: $1) == .before }
}
}

public func removeRunningJobQueues(jobId: JobId) {
syncQueue.sync {
runningJobQueues_onSyncQueue.removeAll(where: { $0.job.jobId == jobId })
discreteAccessLock.whileLocked {
unsafe_runningJobQueues.removeAll(where: { $0.job.jobId == jobId })
}
}

public func allRunningJobQueues() -> [JobQueue] {
syncQueue.sync {
runningJobQueues_onSyncQueue
discreteAccessLock.whileLocked {
unsafe_runningJobQueues
}
}

// MARK: - Deleted Job Queues

private var deletedJobQueues_onSyncQueue = [JobQueue]()
private var unsafe_deletedJobQueues = [JobQueue]()

public func add(deletedJobQueues: [JobQueue]) {
syncQueue.sync {
deletedJobQueues_onSyncQueue.append(contentsOf: deletedJobQueues)
discreteAccessLock.whileLocked {
unsafe_deletedJobQueues.append(contentsOf: deletedJobQueues)
}
}

public func allDeletedJobQueues() -> [JobQueue] {
syncQueue.sync {
deletedJobQueues_onSyncQueue
discreteAccessLock.whileLocked {
unsafe_deletedJobQueues
}
}

public func removeFromDeleted(jobId: JobId) {
syncQueue.sync {
deletedJobQueues_onSyncQueue.removeAll(where: { $0.job.jobId == jobId })
discreteAccessLock.whileLocked {
unsafe_deletedJobQueues.removeAll(where: { $0.job.jobId == jobId })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ public final class MultipleQueuesJobResultsProvider: JobResultsProvider {
}

public func results(jobId: JobId) throws -> JobResults {
try multipleQueuesContainer.performWithExclusiveAccess {

if let jobQueue = multipleQueuesContainer.runningAndDeletedJobQueues().first(where: { jobQueue in jobQueue.job.jobId == jobId }) {
return JobResults(jobId: jobId, testingResults: jobQueue.resultsCollector.collectedResults)
}

throw NoQueueForJobIdFoundError.noQueue(jobId: jobId)
if let jobQueue = multipleQueuesContainer.runningAndDeletedJobQueues().first(where: { jobQueue in jobQueue.job.jobId == jobId }) {
return JobResults(jobId: jobId, testingResults: jobQueue.resultsCollector.collectedResults)
}

throw NoQueueForJobIdFoundError.noQueue(jobId: jobId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@ public final class MultipleQueuesJobStateProvider: JobStateProvider {
}

public func state(jobId: JobId) throws -> JobState {
try multipleQueuesContainer.performWithExclusiveAccess {
if let jobQueue = multipleQueuesContainer.allRunningJobQueues().first(where: { $0.job.jobId == jobId }) {
return JobState(
jobId: jobId,
queueState: QueueState.running(jobQueue.bucketQueue.runningQueueState)
)
}

if multipleQueuesContainer.allDeletedJobQueues().first(where: { $0.job.jobId == jobId }) != nil {
return JobState(
jobId: jobId,
queueState: QueueState.deleted
)
}

throw NoQueueForJobIdFoundError.noQueue(jobId: jobId)
if let jobQueue = multipleQueuesContainer.allRunningJobQueues().first(where: { $0.job.jobId == jobId }) {
return JobState(
jobId: jobId,
queueState: QueueState.running(jobQueue.bucketQueue.runningQueueState)
)
}

if multipleQueuesContainer.allDeletedJobQueues().first(where: { $0.job.jobId == jobId }) != nil {
return JobState(
jobId: jobId,
queueState: QueueState.deleted
)
}

throw NoQueueForJobIdFoundError.noQueue(jobId: jobId)
}
}
21 changes: 9 additions & 12 deletions Sources/BucketQueue/BucketQueueHolder.swift
Original file line number Diff line number Diff line change
@@ -1,53 +1,50 @@
import BucketQueueModels
import Extensions
import Foundation

public final class BucketQueueHolder {
private var enqueuedBuckets = [EnqueuedBucket]()
private var dequeuedBuckets = Set<DequeuedBucket>()

private let syncQueue = DispatchQueue(label: "BucketQueueHolder.syncQueue")
private let accessLock = NSLock()
private let exclusiveAccessLock = NSRecursiveLock()

public init() {}

public func performWithExclusiveAccess<T>(
work: () throws -> T
) rethrows -> T {
exclusiveAccessLock.lock()
defer {
exclusiveAccessLock.unlock()
}
return try work()
try exclusiveAccessLock.whileLocked(work)
}

public func removeAllEnqueuedBuckets() {
syncQueue.sync {
accessLock.whileLocked {
enqueuedBuckets.removeAll()
}
}

public var allEnqueuedBuckets: [EnqueuedBucket] {
syncQueue.sync { enqueuedBuckets }
accessLock.whileLocked { enqueuedBuckets }
}

public var allDequeuedBuckets: Set<DequeuedBucket> {
syncQueue.sync { dequeuedBuckets }
accessLock.whileLocked { dequeuedBuckets }
}

public func remove(dequeuedBucket: DequeuedBucket) {
syncQueue.sync {
accessLock.whileLocked {
_ = dequeuedBuckets.remove(dequeuedBucket)
}
}

public func insert(enqueuedBuckets: [EnqueuedBucket], position: Int) {
syncQueue.sync {
accessLock.whileLocked {
self.enqueuedBuckets.insert(contentsOf: enqueuedBuckets, at: position)
}
}

public func replacePreviouslyEnqueuedBucket(withDequeuedBucket dequeuedBucket: DequeuedBucket) {
syncQueue.sync {
accessLock.whileLocked {
enqueuedBuckets.removeAll(where: { $0 == dequeuedBucket.enqueuedBucket })
_ = dequeuedBuckets.insert(dequeuedBucket)
}
Expand Down
7 changes: 4 additions & 3 deletions Sources/Deployer/Deployer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ open class Deployer {
* from a URL with a package of the DeployableItem to a corresponding DeployableItem
*/
private func prepareDeployables() throws -> [AbsolutePath: DeployableItem] {
let syncQueue = DispatchQueue(label: "ru.avito.Deployer.syncQueue")
let syncQueue = DispatchQueue(label: "Deployer.syncQueue")
var deployablesFailedToPrepare = [DeployableItem]()
var pathToDeployable = [AbsolutePath: DeployableItem]()
let packager = Packager(processControllerProvider: processControllerProvider)

let queue = DispatchQueue(
label: "ru.avito.Deployer",
label: "Deployer.queue",
qos: .default,
attributes: .concurrent,
autoreleaseFrequency: .workItem,
target: nil)
target: DispatchQueue.global()
)
let group = DispatchGroup()
for deployable in deployables {
group.enter()
Expand Down
7 changes: 6 additions & 1 deletion Sources/DistWorker/DistWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ import WorkerCapabilities

public final class DistWorker: SchedulerDataSource, SchedulerDelegate {
private let di: DI
private let callbackQueue = DispatchQueue(label: "DistWorker.callbackQueue", qos: .default, attributes: .concurrent)
private let callbackQueue = DispatchQueue(
label: "DistWorker.callbackQueue",
qos: .default,
attributes: .concurrent,
target: .global()
)
private let currentlyBeingProcessedBucketsTracker = DefaultCurrentlyBeingProcessedBucketsTracker()
private let httpRestServer: HTTPRESTServer
private let version: Version
Expand Down
9 changes: 5 additions & 4 deletions Sources/DistWorkerModels/WorkerConfigurations.swift
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import Dispatch
import Extensions
import Foundation
import QueueModels

public final class WorkerConfigurations {
private let queue = DispatchQueue(label: "WorkerConfigurations.queue")
private let lock = NSLock()
private var workerIdToRunConfiguration = [WorkerId: WorkerConfiguration]()

public init() {}

public func add(workerId: WorkerId, configuration: WorkerConfiguration) {
queue.sync { workerIdToRunConfiguration[workerId] = configuration }
lock.whileLocked { workerIdToRunConfiguration[workerId] = configuration }
}

public func workerConfiguration(workerId: WorkerId) -> WorkerConfiguration? {
return queue.sync { workerIdToRunConfiguration[workerId] }
lock.whileLocked { workerIdToRunConfiguration[workerId] }
}

public var workerIds: Set<WorkerId> {
return Set(workerIdToRunConfiguration.keys)
return Set(lock.whileLocked{ workerIdToRunConfiguration.keys })
}
}
9 changes: 5 additions & 4 deletions Sources/EventBus/EventBus.swift
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import Dispatch
import Extensions
import Foundation

public final class EventBus {
private var streams = [EventStream]()
private let workQueue = DispatchQueue(label: "ru.avito.EventBus.workQueue")
private let lock = NSLock()
private let eventDeliveryQueue = DispatchQueue(label: "ru.avito.EventBus.eventDeliveryQueue")

public init() {}

public func add(stream: EventStream) {
workQueue.sync {
lock.whileLocked {
streams.append(stream)
}
}

public func post(event: BusEvent) {
workQueue.sync {
lock.whileLocked {
streams.forEach { stream in
eventDeliveryQueue.async {
stream.process(event: event)
Expand All @@ -32,6 +33,6 @@ public final class EventBus {

public func tearDown() {
post(event: .tearDown)
eventDeliveryQueue.sync {}
eventDeliveryQueue.sync(flags: .barrier) {}
}
}
5 changes: 3 additions & 2 deletions Sources/ListeningSemaphore/SettableOperation.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Dispatch
import Extensions
import Foundation

final class SettableOperation: Operation, CascadeCancellable {
private let syncQueue = DispatchQueue(label: "ru.avito.SettableOperation.syncQueue")
private let lock = NSLock()
private var cascaseCancellableDependencies = [Operation]()
private var isAbleToRun: Bool {
willSet {
Expand Down Expand Up @@ -30,7 +31,7 @@ final class SettableOperation: Operation, CascadeCancellable {
}

public func unblock() {
syncQueue.sync { isAbleToRun = true }
lock.whileLocked { isAbleToRun = true }
}

func addCascadeCancellableDependency(_ operation: Operation) {
Expand Down
Loading

0 comments on commit e28b5ff

Please sign in to comment.