Skip to content

Commit

Permalink
Extract WorkerRegisterer from SynchronousQueueClient
Browse files Browse the repository at this point in the history
  • Loading branch information
artyom-razinov committed Sep 11, 2019
1 parent f43bda0 commit e0d4b04
Show file tree
Hide file tree
Showing 28 changed files with 302 additions and 119 deletions.
12 changes: 11 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,10 @@ let package = Package(
"RESTServerTestHelpers",
"ResourceLocationResolver",
"ResultsCollector",
"RequestSenderTestHelpers",
"ScheduleStrategy",
"TemporaryStuff",
"TestHelpers",
"UniqueIdentifierGeneratorTestHelpers",
"VersionTestHelpers",
"WorkerAlivenessTracker",
Expand Down Expand Up @@ -840,7 +842,8 @@ let package = Package(
dependencies: [
"Models",
"RequestSender",
"Swifter"
"RequestSenderTestHelpers",
"Swifter",
]
),
.target(
Expand Down Expand Up @@ -1164,6 +1167,13 @@ let package = Package(
"Plugin"
]
),
.target(
// MARK: TestHelpers
name: "TestHelpers",
dependencies: [
],
path: "Tests/TestHelpers"
),
.target(
// MARK: TestsWorkingDirectorySupport
name: "TestsWorkingDirectorySupport",
Expand Down
73 changes: 46 additions & 27 deletions Sources/DistWorker/DistWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import TemporaryStuff
import RESTMethods
import Timer


public final class DistWorker: SchedulerDelegate {
private let onDemandSimulatorPool: OnDemandSimulatorPool
private let queueClient: SynchronousQueueClient
private let queueServerAddress: SocketAddress
private let syncQueue = DispatchQueue(label: "ru.avito.DistWorker")
private var requestIdForBucketId = [BucketId: RequestId]()
private let resourceLocationResolver: ResourceLocationResolver
Expand All @@ -30,6 +30,9 @@ public final class DistWorker: SchedulerDelegate {
private var requestSignature = Either<RequestSignature, DistWorkerError>.error(DistWorkerError.missingRequestSignature)
private let temporaryFolder: TemporaryFolder
private let testRunnerProvider: TestRunnerProvider
private let workerRegisterer: WorkerRegisterer
private let reportAliveSender: ReportAliveSender
private let bucketResultSender: BucketResultSender

private enum BucketFetchResult: Equatable {
case result(SchedulerBucket?)
Expand All @@ -38,38 +41,61 @@ public final class DistWorker: SchedulerDelegate {

public init(
onDemandSimulatorPool: OnDemandSimulatorPool,
queueServerAddress: SocketAddress,
queueClient: SynchronousQueueClient,
workerId: WorkerId,
resourceLocationResolver: ResourceLocationResolver,
temporaryFolder: TemporaryFolder,
testRunnerProvider: TestRunnerProvider
testRunnerProvider: TestRunnerProvider,
reportAliveSender: ReportAliveSender,
workerRegisterer: WorkerRegisterer,
bucketResultSender: BucketResultSender
) {
self.onDemandSimulatorPool = onDemandSimulatorPool
self.resourceLocationResolver = resourceLocationResolver
self.queueClient = SynchronousQueueClient(queueServerAddress: queueServerAddress)
self.queueServerAddress = queueServerAddress
self.queueClient = queueClient
self.workerId = workerId
self.resourceLocationResolver = resourceLocationResolver
self.temporaryFolder = temporaryFolder
self.testRunnerProvider = testRunnerProvider
self.reportAliveSender = reportAliveSender
self.workerRegisterer = workerRegisterer
self.bucketResultSender = bucketResultSender
}

public func start(
didFetchAnalyticsConfiguration: (AnalyticsConfiguration) throws -> ()
didFetchAnalyticsConfiguration: @escaping (AnalyticsConfiguration) throws -> (),
completion: @escaping () -> ()
) throws {
let workerConfiguration = try queueClient.registerWithServer(workerId: workerId)
requestSignature = .success(workerConfiguration.requestSignature)
Logger.debug("Registered with server. Worker configuration: \(workerConfiguration)")

try didFetchAnalyticsConfiguration(workerConfiguration.analyticsConfiguration)

startReportingWorkerIsAlive(interval: workerConfiguration.reportAliveInterval)
try workerRegisterer.registerWithServer(workerId: workerId) { [weak self] result in
do {
guard let strongSelf = self else {
Logger.error("self is nil in start() in DistWorker")
completion()
return
}

let workerConfiguration = try result.dematerialize()

strongSelf.requestSignature = .success(workerConfiguration.requestSignature)
Logger.debug("Registered with server. Worker configuration: \(workerConfiguration)")

try didFetchAnalyticsConfiguration(workerConfiguration.analyticsConfiguration)

strongSelf.startReportingWorkerIsAlive(interval: workerConfiguration.reportAliveInterval)

_ = try strongSelf.runTests(
workerConfiguration: workerConfiguration,
onDemandSimulatorPool: strongSelf.onDemandSimulatorPool
)
Logger.verboseDebug("Dist worker has finished")
strongSelf.cleanUpAndStop()

completion()
} catch {
Logger.error("Caught unexpected error: \(error)")
completion()
}
}

_ = try runTests(
workerConfiguration: workerConfiguration,
onDemandSimulatorPool: onDemandSimulatorPool
)
Logger.verboseDebug("Dist worker has finished")
cleanUpAndStop()
}

private func startReportingWorkerIsAlive(interval: TimeInterval) {
Expand All @@ -86,9 +112,6 @@ public final class DistWorker: SchedulerDelegate {
}

private func reportAliveness() throws {
let reportAliveSender = ReportAliveSenderImpl(
requestSender: DefaultRequestSenderProvider().requestSender(socketAddress: queueServerAddress)
)
try reportAliveSender.reportAlive(
bucketIdsBeingProcessedProvider: currentlyBeingProcessedBucketsTracker.bucketIdsBeingProcessed,
workerId: workerId,
Expand Down Expand Up @@ -221,10 +244,6 @@ public final class DistWorker: SchedulerDelegate {
return requestId
}

let bucketResultSender = BucketResultSenderImpl(
requestSender: DefaultRequestSenderProvider().requestSender(socketAddress: queueServerAddress)
)

try bucketResultSender.send(
testingResult: testingResult,
requestId: requestId,
Expand Down
62 changes: 54 additions & 8 deletions Sources/EmceeLib/Commands/DistWorkCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import LoggingSetup
import Models
import PathLib
import ResourceLocationResolver
import RequestSender
import SimulatorPool
import TemporaryStuff
import QueueClient

public final class DistWorkCommand: Command {
public let name = "distWork"
Expand All @@ -25,7 +28,6 @@ public final class DistWorkCommand: Command {
public func run(payload: CommandPayload) throws {
let queueServerAddress: SocketAddress = try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.queueServer.name)
let workerId: WorkerId = try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.workerId.name)

let temporaryFolder = try createScopedTemporaryFolder()

let onDemandSimulatorPool = OnDemandSimulatorPoolFactory.create(
Expand All @@ -34,21 +36,65 @@ public final class DistWorkCommand: Command {
)
defer { onDemandSimulatorPool.deleteSimulators() }

let distWorker = DistWorker(
onDemandSimulatorPool: onDemandSimulatorPool,
let distWorker = self.distWorker(
queueServerAddress: queueServerAddress,
workerId: workerId,
temporaryFolder: temporaryFolder,
onDemandSimulatorPool: onDemandSimulatorPool
)

startWorker(distWorker: distWorker)
}

private func distWorker(
queueServerAddress: SocketAddress,
workerId: WorkerId,
temporaryFolder: TemporaryFolder,
onDemandSimulatorPool: OnDemandSimulatorPool
) -> DistWorker {
let requestSender = DefaultRequestSenderProvider().requestSender(socketAddress: queueServerAddress)

let reportAliveSender = ReportAliveSenderImpl(requestSender: requestSender)
let workerRegisterer = WorkerRegistererImpl(requestSender: requestSender)
let bucketResultSender = BucketResultSenderImpl(requestSender: requestSender)

return DistWorker(
onDemandSimulatorPool: onDemandSimulatorPool,
queueClient: SynchronousQueueClient(queueServerAddress: queueServerAddress),
workerId: workerId,
resourceLocationResolver: resourceLocationResolver,
temporaryFolder: temporaryFolder,
testRunnerProvider: DefaultTestRunnerProvider(
resourceLocationResolver: resourceLocationResolver
)
),
reportAliveSender: reportAliveSender,
workerRegisterer: workerRegisterer,
bucketResultSender: bucketResultSender
)
try distWorker.start(
didFetchAnalyticsConfiguration: { analyticsConfiguration in
try LoggingSetup.setupAnalytics(analyticsConfiguration: analyticsConfiguration)
}

private func startWorker(distWorker: DistWorker) {
let dispatchQueue = DispatchQueue(label: "DistWorker.queue")
let dispatchGroup = DispatchGroup()

dispatchGroup.enter()
dispatchQueue.async {
do {
try distWorker.start(
didFetchAnalyticsConfiguration: { analyticsConfiguration in
try LoggingSetup.setupAnalytics(analyticsConfiguration: analyticsConfiguration)
},
completion: {
dispatchGroup.leave()
}
)
} catch {
Logger.error("\(error)")
dispatchGroup.leave()
}
)

dispatchGroup.wait()
}
}

private func createScopedTemporaryFolder() throws -> TemporaryFolder {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import Models
import RequestSender

public protocol WorkerRegisterer {
func registerWithServer(
workerId: WorkerId,
completion: @escaping (Either<WorkerConfiguration, RequestSenderError>) -> Void
) throws
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import RequestSender
import Models
import RESTMethods

public final class WorkerRegistererImpl: WorkerRegisterer {
private let requestSender: RequestSender

public init(requestSender: RequestSender) {
self.requestSender = requestSender
}

public func registerWithServer(
workerId: WorkerId,
completion: @escaping (Either<WorkerConfiguration, RequestSenderError>) -> Void
) throws {
try requestSender.sendRequestWithCallback(
pathWithSlash: RESTMethod.registerWorker.withPrependingSlash,
payload: RegisterWorkerRequest(workerId: workerId),
callback: { (result: Either<RegisterWorkerResponse, RequestSenderError>) in
switch result {
case .left(let response):
switch response {
case .workerRegisterSuccess(let workerConfiguration):
completion(Either.success(workerConfiguration))
}
case .right(let requestSenderError):
completion(Either.error(requestSenderError))
}
}
)
}
}
15 changes: 0 additions & 15 deletions Sources/QueueClient/QueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public final class QueueClient {
close()
}

public func registerWithServer(workerId: WorkerId) throws {
try sendRequest(
.registerWorker,
payload: RegisterWorkerRequest(workerId: workerId),
completionHandler: handleRegisterWorkerResponse
)
}

public func close() {
requestSender.close()

Expand Down Expand Up @@ -155,13 +147,6 @@ public final class QueueClient {

// MARK: - Response Handlers

private func handleRegisterWorkerResponse(response: RegisterWorkerResponse) {
switch response {
case .workerRegisterSuccess(let workerConfiguration):
delegate?.queueClient(self, didReceiveWorkerConfiguration: workerConfiguration)
}
}

private func handleFetchBucketResponse(response: DequeueBucketResponse) {
switch response {
case .bucketDequeued(let bucket):
Expand Down
1 change: 0 additions & 1 deletion Sources/QueueClient/QueueClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ public protocol QueueClientDelegate: class {
func queueClientWorkerConsideredNotAlive(_ sender: QueueClient)
func queueClientWorkerHasBeenBlocked(_ sender: QueueClient)
func queueClient(_ sender: QueueClient, fetchBucketLaterAfter after: TimeInterval)
func queueClient(_ sender: QueueClient, didReceiveWorkerConfiguration workerConfiguration: WorkerConfiguration)
func queueClient(_ sender: QueueClient, didFetchBucket bucket: Bucket)
func queueClientWorkerHasBeenIndicatedAsAlive(_ sender: QueueClient)
func queueClient(_ sender: QueueClient, didFetchQueueServerVersion version: Version)
Expand Down
17 changes: 0 additions & 17 deletions Sources/QueueClient/SynchronousQueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {
}

private let queueClient: QueueClient
private var registrationResult: Either<WorkerConfiguration, QueueClientError>?
private var bucketFetchResult: Either<BucketFetchResult, QueueClientError>?
private var alivenessReportResult: Either<Bool, QueueClientError>?
private var queueServerVersionResult: Either<Version, QueueClientError>?
Expand Down Expand Up @@ -49,17 +48,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {

// MARK: Public API

public func registerWithServer(workerId: WorkerId) throws -> WorkerConfiguration {
return try synchronize {
registrationResult = nil
try queueClient.registerWithServer(workerId: workerId)
try SynchronousWaiter.waitWhile(timeout: requestTimeout, description: "Wait for registration with server") {
self.registrationResult == nil
}
return try registrationResult!.dematerialize()
}
}

public func fetchBucket(requestId: RequestId, workerId: WorkerId, requestSignature: RequestSignature) throws -> BucketFetchResult {
return try synchronize {
bucketFetchResult = nil
Expand Down Expand Up @@ -158,7 +146,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {
// MARK: - Queue Delegate

public func queueClient(_ sender: QueueClient, didFailWithError error: QueueClientError) {
registrationResult = Either.error(error)
bucketFetchResult = Either.error(error)
alivenessReportResult = Either.error(error)
queueServerVersionResult = Either.error(error)
Expand All @@ -168,10 +155,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {
jobDeleteResult = Either.error(error)
}

public func queueClient(_ sender: QueueClient, didReceiveWorkerConfiguration workerConfiguration: WorkerConfiguration) {
registrationResult = Either.success(workerConfiguration)
}

public func queueClientQueueIsEmpty(_ sender: QueueClient) {
bucketFetchResult = Either.success(.queueIsEmpty)
}
Expand Down
5 changes: 0 additions & 5 deletions Tests/QueueClientTests/FakeQueueClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ class FakeQueueClientDelegate: QueueClientDelegate {
case error(QueueClientError)
case queueIsEmpty
case checkAfter(TimeInterval)
case workerConfiguration(WorkerConfiguration)
case bucket(Bucket)
case workerHasBeenBlocked
case workerConsideredNotAlive
Expand Down Expand Up @@ -43,10 +42,6 @@ class FakeQueueClientDelegate: QueueClientDelegate {
responses.append(ServerResponse.checkAfter(after))
}

func queueClient(_ sender: QueueClient, didReceiveWorkerConfiguration workerConfiguration: WorkerConfiguration) {
responses.append(ServerResponse.workerConfiguration(workerConfiguration))
}

func queueClient(_ sender: QueueClient, didFetchBucket bucket: Bucket) {
responses.append(ServerResponse.bucket(bucket))
}
Expand Down
Loading

0 comments on commit e0d4b04

Please sign in to comment.