Skip to content

Commit

Permalink
Add metadata to analytics configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Alekseev committed Mar 17, 2021
1 parent 7bd41d6 commit 386fdc0
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 44 deletions.
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ let package = Package(
"Kibana",
.product(name: "Logging", package: "swift-log"),
"MetricsExtensions",
.product(name: "ProcessController", package: "CommandLineToolkit"),
"QueueModels",
],
path: "Sources/EmceeLogging"
Expand Down
8 changes: 2 additions & 6 deletions Sources/AppleTools/XcodebuildBasedTestRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,16 @@ public final class XcodebuildBasedTestRunner: TestRunner {
var observableFileReaderHandler: ObservableFileReaderHandler?

processController.onStart { [logger] sender, _ in
let logger = logger
.withMetadata(key: .subprocessId, value: "\(sender.subprocessInfo.subprocessId)")
.withMetadata(key: .subprocessName, value: sender.subprocessInfo.subprocessName)

testRunnerStream.openStream()
do {
observableFileReaderHandler = try observableFileReader.read(handler: resultStream.write(data:))
} catch {
logger.error("Failed to read stream file: \(error)")
logger.error("Failed to read stream file: \(error)", subprocessPidInfo: sender.subprocessInfo.pidInfo)
return sender.terminateAndForceKillIfNeeded()
}
resultStream.streamContents { error in
if let error = error {
logger.error("Result stream error: \(error)")
logger.error("Result stream error: \(error)", subprocessPidInfo: sender.subprocessInfo.pidInfo)
}
testRunnerStream.closeStream()
}
Expand Down
7 changes: 5 additions & 2 deletions Sources/EmceeLib/Commands/DumpCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public final class DumpCommand: Command {

private let di: DI
private let encoder = JSONEncoder.pretty()
private let logger: ContextualLogger
private let rootLogger: ContextualLogger

public init(di: DI) throws {
self.di = di
self.logger = try di.get(ContextualLogger.self)
self.rootLogger = try di.get(ContextualLogger.self)
.forType(Self.self)
}

Expand All @@ -66,6 +66,9 @@ public final class DumpCommand: Command {
if let kibanaConfiguration = testArgFile.prioritizedJob.analyticsConfiguration.kibanaConfiguration {
try di.get(LoggingSetup.self).set(kibanaConfiguration: kibanaConfiguration)
}
let logger = rootLogger.withMetadata(
testArgFile.prioritizedJob.analyticsConfiguration.metadata ?? [:]
)

let onDemandSimulatorPool = try OnDemandSimulatorPoolFactory.create(
di: di,
Expand Down
40 changes: 25 additions & 15 deletions Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public final class RunTestsOnRemoteQueueCommand: Command {

private let callbackQueue = DispatchQueue(label: "RunTestsOnRemoteQueueCommand.callbackQueue")
private let di: DI
private let logger: ContextualLogger
private let rootLogger: ContextualLogger
private let testArgFileValidator = TestArgFileValidator()

public init(di: DI) throws {
self.di = di
self.logger = try di.get(ContextualLogger.self).forType(Self.self)
self.rootLogger = try di.get(ContextualLogger.self).forType(Self.self)
}

public func run(payload: CommandPayload) throws {
Expand All @@ -76,6 +76,12 @@ public final class RunTestsOnRemoteQueueCommand: Command {
if let kibanaConfiguration = testArgFile.prioritizedJob.analyticsConfiguration.kibanaConfiguration {
try di.get(LoggingSetup.self).set(kibanaConfiguration: kibanaConfiguration)
}
try di.get(GlobalMetricRecorder.self).set(
analyticsConfiguration: testArgFile.prioritizedJob.analyticsConfiguration
)
let logger = rootLogger.withMetadata(
testArgFile.prioritizedJob.analyticsConfiguration.metadata ?? [:]
)

let remoteCacheConfig = try ArgumentsReader.remoteCacheConfig(
try payload.optionalSingleTypedValue(argumentName: ArgumentDescriptions.remoteCacheConfig.name)
Expand All @@ -87,13 +93,15 @@ public final class RunTestsOnRemoteQueueCommand: Command {
emceeVersion: emceeVersion,
queueServerDeploymentDestination: queueServerConfiguration.queueServerDeploymentDestination,
queueServerConfigurationLocation: queueServerConfigurationLocation,
jobId: testArgFile.prioritizedJob.jobId
jobId: testArgFile.prioritizedJob.jobId,
logger: logger
)
let jobResults = try runTestsOnRemotelyRunningQueue(
queueServerAddress: runningQueueServerAddress,
remoteCacheConfig: remoteCacheConfig,
testArgFile: testArgFile,
version: emceeVersion
version: emceeVersion,
logger: logger
)
let resultOutputGenerator = ResultingOutputGenerator(
testingResults: jobResults.testingResults,
Expand All @@ -107,7 +115,8 @@ public final class RunTestsOnRemoteQueueCommand: Command {
emceeVersion: Version,
queueServerDeploymentDestination: DeploymentDestination,
queueServerConfigurationLocation: QueueServerConfigurationLocation,
jobId: JobId
jobId: JobId,
logger: ContextualLogger
) throws -> SocketAddress {
logger.info("Searching for queue server on '\(queueServerDeploymentDestination.host)' with queue version \(emceeVersion)")
let remoteQueueDetector = DefaultRemoteQueueDetector(
Expand All @@ -134,7 +143,8 @@ public final class RunTestsOnRemoteQueueCommand: Command {
jobId: jobId,
queueServerDeploymentDestination: queueServerDeploymentDestination,
emceeVersion: emceeVersion,
queueServerConfigurationLocation: queueServerConfigurationLocation
queueServerConfigurationLocation: queueServerConfigurationLocation,
logger: logger
)

try di.get(Waiter.self).waitWhile(pollPeriod: 1.0, timeout: 30.0, description: "Wait for remote queue to start") {
Expand All @@ -155,7 +165,8 @@ public final class RunTestsOnRemoteQueueCommand: Command {
jobId: JobId,
queueServerDeploymentDestination: DeploymentDestination,
emceeVersion: Version,
queueServerConfigurationLocation: QueueServerConfigurationLocation
queueServerConfigurationLocation: QueueServerConfigurationLocation,
logger: ContextualLogger
) throws {
logger.info("No running queue server has been found. Will deploy and start remote queue.")
let remoteQueueStarter = RemoteQueueStarter(
Expand All @@ -175,10 +186,9 @@ public final class RunTestsOnRemoteQueueCommand: Command {
queueServerAddress: SocketAddress,
remoteCacheConfig: RuntimeDumpRemoteCacheConfig?,
testArgFile: TestArgFile,
version: Version
version: Version,
logger: ContextualLogger
) throws -> JobResults {
try di.get(GlobalMetricRecorder.self).set(analyticsConfiguration: testArgFile.prioritizedJob.analyticsConfiguration)

let onDemandSimulatorPool = try OnDemandSimulatorPoolFactory.create(
di: di,
logger: logger,
Expand Down Expand Up @@ -227,7 +237,7 @@ public final class RunTestsOnRemoteQueueCommand: Command {
)

defer {
deleteJob(jobId: testArgFile.prioritizedJob.jobId)
deleteJob(jobId: testArgFile.prioritizedJob.jobId, logger: logger)
}

try JobPreparer(di: di).formJob(
Expand All @@ -237,11 +247,11 @@ public final class RunTestsOnRemoteQueueCommand: Command {
testArgFile: testArgFile
)

try waitForJobQueueToDeplete(jobId: testArgFile.prioritizedJob.jobId)
try waitForJobQueueToDeplete(jobId: testArgFile.prioritizedJob.jobId, logger: logger)
return try fetchJobResults(jobId: testArgFile.prioritizedJob.jobId)
}

private func waitForJobQueueToDeplete(jobId: JobId) throws {
private func waitForJobQueueToDeplete(jobId: JobId, logger: ContextualLogger) throws {
var caughtSignal = false
SignalHandling.addSignalHandler(signals: [.int, .term]) { [logger] signal in
logger.info("Caught \(signal) signal")
Expand Down Expand Up @@ -279,7 +289,7 @@ public final class RunTestsOnRemoteQueueCommand: Command {
callbackQueue: callbackQueue,
completion: callbackWaiter.set
)
return try callbackWaiter.wait(timeout: .infinity, description: "").dematerialize()
return try callbackWaiter.wait(timeout: .infinity, description: "Fetch job state").dematerialize()
}

private func selectPort(ports: Set<SocketModels.Port>) throws -> SocketModels.Port {
Expand All @@ -291,7 +301,7 @@ public final class RunTestsOnRemoteQueueCommand: Command {
return port
}

private func deleteJob(jobId: JobId) {
private func deleteJob(jobId: JobId, logger: ContextualLogger) {
do {
let callbackWaiter: CallbackWaiter<Either<(), Error>> = try di.get(Waiter.self).createCallbackWaiter()
try di.get(JobDeleter.self).delete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,3 @@ public final class DetailedActivityLoggableProcessControllerProvider: ProcessCon
return processController
}
}

extension SubprocessInfo {
var pidInfo: PidInfo {
PidInfo(pid: subprocessId, name: subprocessName)
}
}
6 changes: 6 additions & 0 deletions Sources/EmceeLogging/ContextualLogger.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,10 @@ public extension ContextualLogger {
) {
log(.warning, message, subprocessPidInfo: subprocessPidInfo, workerId: workerId, persistentMetricsJobId: persistentMetricsJobId, source: source, file: file, function: function, line: line)
}

func withMetadata(_ keyValues: [String: String]) -> ContextualLogger {
var addedMetadata = self.addedMetadata
addedMetadata.merge(keyValues) { _, new -> String in new }
return ContextualLogger(logger: logger, addedMetadata: addedMetadata)
}
}
8 changes: 8 additions & 0 deletions Sources/EmceeLogging/SubprocessInfo+PidInfo.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import Foundation
import ProcessController

extension SubprocessInfo {
public var pidInfo: PidInfo {
PidInfo(pid: subprocessId, name: subprocessName)
}
}
6 changes: 5 additions & 1 deletion Sources/MetricsExtensions/AnalyticsConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ public struct AnalyticsConfiguration: Codable, Hashable {
public let graphiteConfiguration: MetricConfiguration?
public let statsdConfiguration: MetricConfiguration?
public let kibanaConfiguration: KibanaConfiguration?

public let persistentMetricsJobId: String?
public let metadata: [String: String]?

public init(
graphiteConfiguration: MetricConfiguration? = nil,
statsdConfiguration: MetricConfiguration? = nil,
kibanaConfiguration: KibanaConfiguration? = nil,
persistentMetricsJobId: String? = nil
persistentMetricsJobId: String? = nil,
metadata: [String: String]? = nil
) {
self.graphiteConfiguration = graphiteConfiguration
self.statsdConfiguration = statsdConfiguration
self.kibanaConfiguration = kibanaConfiguration
self.persistentMetricsJobId = persistentMetricsJobId
self.metadata = metadata
}
}
42 changes: 28 additions & 14 deletions Sources/Scheduler/Scheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import UniqueIdentifierGenerator

public final class Scheduler {
private let di: DI
private let logger: ContextualLogger
private let rootLogger: ContextualLogger
private let queue = OperationQueue()
private let resourceSemaphore: ListeningSemaphore<ResourceAmounts>
private let version: Version
Expand All @@ -40,7 +40,7 @@ public final class Scheduler {
version: Version
) {
self.di = di
self.logger = logger.forType(Self.self)
self.rootLogger = logger.forType(Self.self)
self.resourceSemaphore = ListeningSemaphore(
maximumValues: .of(
runningTests: Int(numberOfSimulators)
Expand Down Expand Up @@ -72,25 +72,29 @@ public final class Scheduler {
return
}
guard let bucket = self.schedulerDataSource?.nextBucket() else {
self.logger.debug("Data Source returned no bucket")
self.rootLogger.debug("Data Source returned no bucket")
return
}
self.logger.debug("Data Source returned bucket: \(bucket)")
self.runTestsFromFetchedBucket(bucket)
let logger = self.rootLogger.withMetadata(bucket.analyticsConfiguration.metadata ?? [:])
logger.debug("Data Source returned bucket: \(bucket)")
self.runTestsFromFetchedBucket(bucket: bucket, logger: logger)
}
}

private func runTestsFromFetchedBucket(_ bucket: SchedulerBucket) {
private func runTestsFromFetchedBucket(
bucket: SchedulerBucket,
logger: ContextualLogger
) {
do {
let acquireResources = try resourceSemaphore.acquire(.of(runningTests: 1))
let runTestsInBucketAfterAcquiringResources = BlockOperation {
do {
let testingResult = self.execute(bucket: bucket)
let testingResult = self.execute(bucket: bucket, logger: logger)
try self.resourceSemaphore.release(.of(runningTests: 1))
self.schedulerDelegate?.scheduler(self, obtainedTestingResult: testingResult, forBucket: bucket)
self.fetchAndRunBucket()
} catch {
self.logger.error("Error running tests from fetched bucket with error: \(error). Bucket: \(bucket)")
logger.error("Error running tests from fetched bucket with error: \(error). Bucket: \(bucket)")
}
}
acquireResources.addCascadeCancellableDependency(runTestsInBucketAfterAcquiringResources)
Expand All @@ -102,10 +106,13 @@ public final class Scheduler {

// MARK: - Running the Tests

private func execute(bucket: SchedulerBucket) -> TestingResult {
private func execute(
bucket: SchedulerBucket,
logger: ContextualLogger
) -> TestingResult {
let startedAt = Date()
do {
return try self.runRetrying(bucket: bucket)
return try runRetrying(bucket: bucket, logger: logger)
} catch {
logger.error("Failed to execute bucket \(bucket.bucketId): \(error)")
return TestingResult(
Expand Down Expand Up @@ -136,8 +143,11 @@ public final class Scheduler {
/**
Runs tests in a given Bucket, retrying failed tests multiple times if necessary.
*/
private func runRetrying(bucket: SchedulerBucket) throws -> TestingResult {
let firstRun = try runBucketOnce(bucket: bucket, testsToRun: bucket.testEntries)
private func runRetrying(
bucket: SchedulerBucket,
logger: ContextualLogger
) throws -> TestingResult {
let firstRun = try runBucketOnce(bucket: bucket, testsToRun: bucket.testEntries, logger: logger)

guard bucket.testExecutionBehavior.numberOfRetries > 0 else {
return firstRun
Expand All @@ -153,13 +163,17 @@ public final class Scheduler {
}
logger.debug("After last run \(failedTestEntriesAfterLastRun.count) tests have failed: \(failedTestEntriesAfterLastRun).")
logger.debug("Retrying them, attempt #\(retryNumber + 1) of maximum \(bucket.testExecutionBehavior.numberOfRetries) attempts")
lastRunResults = try runBucketOnce(bucket: bucket, testsToRun: failedTestEntriesAfterLastRun)
lastRunResults = try runBucketOnce(bucket: bucket, testsToRun: failedTestEntriesAfterLastRun, logger: logger)
results.append(lastRunResults)
}
return try combine(runResults: results)
}

private func runBucketOnce(bucket: SchedulerBucket, testsToRun: [TestEntry]) throws -> TestingResult {
private func runBucketOnce(
bucket: SchedulerBucket,
testsToRun: [TestEntry],
logger: ContextualLogger
) throws -> TestingResult {
let simulatorPool = try di.get(OnDemandSimulatorPool.self).pool(
key: OnDemandSimulatorPoolKey(
developerDir: bucket.developerDir,
Expand Down

0 comments on commit 386fdc0

Please sign in to comment.