From 386fdc0cf38db103ca3ae9128f1683c9edc70aff Mon Sep 17 00:00:00 2001 From: Vladislav Alekseev Date: Wed, 17 Mar 2021 12:50:48 +0300 Subject: [PATCH] Add metadata to analytics configuration --- Package.swift | 1 + .../XcodebuildBasedTestRunner.swift | 8 +--- Sources/EmceeLib/Commands/DumpCommand.swift | 7 +++- .../RunTestsOnRemoteQueueCommand.swift | 40 +++++++++++------- ...ityLoggableProcessControllerProvider.swift | 6 --- Sources/EmceeLogging/ContextualLogger.swift | 6 +++ .../EmceeLogging/SubprocessInfo+PidInfo.swift | 8 ++++ .../AnalyticsConfiguration.swift | 6 ++- Sources/Scheduler/Scheduler.swift | 42 ++++++++++++------- 9 files changed, 80 insertions(+), 44 deletions(-) create mode 100644 Sources/EmceeLogging/SubprocessInfo+PidInfo.swift diff --git a/Package.swift b/Package.swift index 92819f11..311664be 100644 --- a/Package.swift +++ b/Package.swift @@ -568,6 +568,7 @@ let package = Package( "Kibana", .product(name: "Logging", package: "swift-log"), "MetricsExtensions", + .product(name: "ProcessController", package: "CommandLineToolkit"), "QueueModels", ], path: "Sources/EmceeLogging" diff --git a/Sources/AppleTools/XcodebuildBasedTestRunner.swift b/Sources/AppleTools/XcodebuildBasedTestRunner.swift index 4e62fc90..e8d3ea00 100644 --- a/Sources/AppleTools/XcodebuildBasedTestRunner.swift +++ b/Sources/AppleTools/XcodebuildBasedTestRunner.swift @@ -91,20 +91,16 @@ public final class XcodebuildBasedTestRunner: TestRunner { var observableFileReaderHandler: ObservableFileReaderHandler? processController.onStart { [logger] sender, _ in - let logger = logger - .withMetadata(key: .subprocessId, value: "\(sender.subprocessInfo.subprocessId)") - .withMetadata(key: .subprocessName, value: sender.subprocessInfo.subprocessName) - testRunnerStream.openStream() do { observableFileReaderHandler = try observableFileReader.read(handler: resultStream.write(data:)) } catch { - logger.error("Failed to read stream file: \(error)") + logger.error("Failed to read stream file: \(error)", subprocessPidInfo: sender.subprocessInfo.pidInfo) return sender.terminateAndForceKillIfNeeded() } resultStream.streamContents { error in if let error = error { - logger.error("Result stream error: \(error)") + logger.error("Result stream error: \(error)", subprocessPidInfo: sender.subprocessInfo.pidInfo) } testRunnerStream.closeStream() } diff --git a/Sources/EmceeLib/Commands/DumpCommand.swift b/Sources/EmceeLib/Commands/DumpCommand.swift index 8b6bcb37..65af2576 100644 --- a/Sources/EmceeLib/Commands/DumpCommand.swift +++ b/Sources/EmceeLib/Commands/DumpCommand.swift @@ -41,11 +41,11 @@ public final class DumpCommand: Command { private let di: DI private let encoder = JSONEncoder.pretty() - private let logger: ContextualLogger + private let rootLogger: ContextualLogger public init(di: DI) throws { self.di = di - self.logger = try di.get(ContextualLogger.self) + self.rootLogger = try di.get(ContextualLogger.self) .forType(Self.self) } @@ -66,6 +66,9 @@ public final class DumpCommand: Command { if let kibanaConfiguration = testArgFile.prioritizedJob.analyticsConfiguration.kibanaConfiguration { try di.get(LoggingSetup.self).set(kibanaConfiguration: kibanaConfiguration) } + let logger = rootLogger.withMetadata( + testArgFile.prioritizedJob.analyticsConfiguration.metadata ?? [:] + ) let onDemandSimulatorPool = try OnDemandSimulatorPoolFactory.create( di: di, diff --git a/Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift b/Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift index e6c94711..4d744318 100644 --- a/Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift +++ b/Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift @@ -48,12 +48,12 @@ public final class RunTestsOnRemoteQueueCommand: Command { private let callbackQueue = DispatchQueue(label: "RunTestsOnRemoteQueueCommand.callbackQueue") private let di: DI - private let logger: ContextualLogger + private let rootLogger: ContextualLogger private let testArgFileValidator = TestArgFileValidator() public init(di: DI) throws { self.di = di - self.logger = try di.get(ContextualLogger.self).forType(Self.self) + self.rootLogger = try di.get(ContextualLogger.self).forType(Self.self) } public func run(payload: CommandPayload) throws { @@ -76,6 +76,12 @@ public final class RunTestsOnRemoteQueueCommand: Command { if let kibanaConfiguration = testArgFile.prioritizedJob.analyticsConfiguration.kibanaConfiguration { try di.get(LoggingSetup.self).set(kibanaConfiguration: kibanaConfiguration) } + try di.get(GlobalMetricRecorder.self).set( + analyticsConfiguration: testArgFile.prioritizedJob.analyticsConfiguration + ) + let logger = rootLogger.withMetadata( + testArgFile.prioritizedJob.analyticsConfiguration.metadata ?? [:] + ) let remoteCacheConfig = try ArgumentsReader.remoteCacheConfig( try payload.optionalSingleTypedValue(argumentName: ArgumentDescriptions.remoteCacheConfig.name) @@ -87,13 +93,15 @@ public final class RunTestsOnRemoteQueueCommand: Command { emceeVersion: emceeVersion, queueServerDeploymentDestination: queueServerConfiguration.queueServerDeploymentDestination, queueServerConfigurationLocation: queueServerConfigurationLocation, - jobId: testArgFile.prioritizedJob.jobId + jobId: testArgFile.prioritizedJob.jobId, + logger: logger ) let jobResults = try runTestsOnRemotelyRunningQueue( queueServerAddress: runningQueueServerAddress, remoteCacheConfig: remoteCacheConfig, testArgFile: testArgFile, - version: emceeVersion + version: emceeVersion, + logger: logger ) let resultOutputGenerator = ResultingOutputGenerator( testingResults: jobResults.testingResults, @@ -107,7 +115,8 @@ public final class RunTestsOnRemoteQueueCommand: Command { emceeVersion: Version, queueServerDeploymentDestination: DeploymentDestination, queueServerConfigurationLocation: QueueServerConfigurationLocation, - jobId: JobId + jobId: JobId, + logger: ContextualLogger ) throws -> SocketAddress { logger.info("Searching for queue server on '\(queueServerDeploymentDestination.host)' with queue version \(emceeVersion)") let remoteQueueDetector = DefaultRemoteQueueDetector( @@ -134,7 +143,8 @@ public final class RunTestsOnRemoteQueueCommand: Command { jobId: jobId, queueServerDeploymentDestination: queueServerDeploymentDestination, emceeVersion: emceeVersion, - queueServerConfigurationLocation: queueServerConfigurationLocation + queueServerConfigurationLocation: queueServerConfigurationLocation, + logger: logger ) try di.get(Waiter.self).waitWhile(pollPeriod: 1.0, timeout: 30.0, description: "Wait for remote queue to start") { @@ -155,7 +165,8 @@ public final class RunTestsOnRemoteQueueCommand: Command { jobId: JobId, queueServerDeploymentDestination: DeploymentDestination, emceeVersion: Version, - queueServerConfigurationLocation: QueueServerConfigurationLocation + queueServerConfigurationLocation: QueueServerConfigurationLocation, + logger: ContextualLogger ) throws { logger.info("No running queue server has been found. Will deploy and start remote queue.") let remoteQueueStarter = RemoteQueueStarter( @@ -175,10 +186,9 @@ public final class RunTestsOnRemoteQueueCommand: Command { queueServerAddress: SocketAddress, remoteCacheConfig: RuntimeDumpRemoteCacheConfig?, testArgFile: TestArgFile, - version: Version + version: Version, + logger: ContextualLogger ) throws -> JobResults { - try di.get(GlobalMetricRecorder.self).set(analyticsConfiguration: testArgFile.prioritizedJob.analyticsConfiguration) - let onDemandSimulatorPool = try OnDemandSimulatorPoolFactory.create( di: di, logger: logger, @@ -227,7 +237,7 @@ public final class RunTestsOnRemoteQueueCommand: Command { ) defer { - deleteJob(jobId: testArgFile.prioritizedJob.jobId) + deleteJob(jobId: testArgFile.prioritizedJob.jobId, logger: logger) } try JobPreparer(di: di).formJob( @@ -237,11 +247,11 @@ public final class RunTestsOnRemoteQueueCommand: Command { testArgFile: testArgFile ) - try waitForJobQueueToDeplete(jobId: testArgFile.prioritizedJob.jobId) + try waitForJobQueueToDeplete(jobId: testArgFile.prioritizedJob.jobId, logger: logger) return try fetchJobResults(jobId: testArgFile.prioritizedJob.jobId) } - private func waitForJobQueueToDeplete(jobId: JobId) throws { + private func waitForJobQueueToDeplete(jobId: JobId, logger: ContextualLogger) throws { var caughtSignal = false SignalHandling.addSignalHandler(signals: [.int, .term]) { [logger] signal in logger.info("Caught \(signal) signal") @@ -279,7 +289,7 @@ public final class RunTestsOnRemoteQueueCommand: Command { callbackQueue: callbackQueue, completion: callbackWaiter.set ) - return try callbackWaiter.wait(timeout: .infinity, description: "").dematerialize() + return try callbackWaiter.wait(timeout: .infinity, description: "Fetch job state").dematerialize() } private func selectPort(ports: Set) throws -> SocketModels.Port { @@ -291,7 +301,7 @@ public final class RunTestsOnRemoteQueueCommand: Command { return port } - private func deleteJob(jobId: JobId) { + private func deleteJob(jobId: JobId, logger: ContextualLogger) { do { let callbackWaiter: CallbackWaiter> = try di.get(Waiter.self).createCallbackWaiter() try di.get(JobDeleter.self).delete( diff --git a/Sources/EmceeLib/Utils/DetailedActivityLoggableProcessControllerProvider.swift b/Sources/EmceeLib/Utils/DetailedActivityLoggableProcessControllerProvider.swift index a1e5e6cf..f7904f9b 100644 --- a/Sources/EmceeLib/Utils/DetailedActivityLoggableProcessControllerProvider.swift +++ b/Sources/EmceeLib/Utils/DetailedActivityLoggableProcessControllerProvider.swift @@ -48,9 +48,3 @@ public final class DetailedActivityLoggableProcessControllerProvider: ProcessCon return processController } } - -extension SubprocessInfo { - var pidInfo: PidInfo { - PidInfo(pid: subprocessId, name: subprocessName) - } -} diff --git a/Sources/EmceeLogging/ContextualLogger.swift b/Sources/EmceeLogging/ContextualLogger.swift index ff6e223e..2ae64490 100644 --- a/Sources/EmceeLogging/ContextualLogger.swift +++ b/Sources/EmceeLogging/ContextualLogger.swift @@ -135,4 +135,10 @@ public extension ContextualLogger { ) { log(.warning, message, subprocessPidInfo: subprocessPidInfo, workerId: workerId, persistentMetricsJobId: persistentMetricsJobId, source: source, file: file, function: function, line: line) } + + func withMetadata(_ keyValues: [String: String]) -> ContextualLogger { + var addedMetadata = self.addedMetadata + addedMetadata.merge(keyValues) { _, new -> String in new } + return ContextualLogger(logger: logger, addedMetadata: addedMetadata) + } } diff --git a/Sources/EmceeLogging/SubprocessInfo+PidInfo.swift b/Sources/EmceeLogging/SubprocessInfo+PidInfo.swift new file mode 100644 index 00000000..34662c99 --- /dev/null +++ b/Sources/EmceeLogging/SubprocessInfo+PidInfo.swift @@ -0,0 +1,8 @@ +import Foundation +import ProcessController + +extension SubprocessInfo { + public var pidInfo: PidInfo { + PidInfo(pid: subprocessId, name: subprocessName) + } +} diff --git a/Sources/MetricsExtensions/AnalyticsConfiguration.swift b/Sources/MetricsExtensions/AnalyticsConfiguration.swift index 6d19cc8d..d9409d0a 100644 --- a/Sources/MetricsExtensions/AnalyticsConfiguration.swift +++ b/Sources/MetricsExtensions/AnalyticsConfiguration.swift @@ -4,17 +4,21 @@ public struct AnalyticsConfiguration: Codable, Hashable { public let graphiteConfiguration: MetricConfiguration? public let statsdConfiguration: MetricConfiguration? public let kibanaConfiguration: KibanaConfiguration? + public let persistentMetricsJobId: String? + public let metadata: [String: String]? public init( graphiteConfiguration: MetricConfiguration? = nil, statsdConfiguration: MetricConfiguration? = nil, kibanaConfiguration: KibanaConfiguration? = nil, - persistentMetricsJobId: String? = nil + persistentMetricsJobId: String? = nil, + metadata: [String: String]? = nil ) { self.graphiteConfiguration = graphiteConfiguration self.statsdConfiguration = statsdConfiguration self.kibanaConfiguration = kibanaConfiguration self.persistentMetricsJobId = persistentMetricsJobId + self.metadata = metadata } } diff --git a/Sources/Scheduler/Scheduler.swift b/Sources/Scheduler/Scheduler.swift index 084dd854..16a78672 100755 --- a/Sources/Scheduler/Scheduler.swift +++ b/Sources/Scheduler/Scheduler.swift @@ -24,7 +24,7 @@ import UniqueIdentifierGenerator public final class Scheduler { private let di: DI - private let logger: ContextualLogger + private let rootLogger: ContextualLogger private let queue = OperationQueue() private let resourceSemaphore: ListeningSemaphore private let version: Version @@ -40,7 +40,7 @@ public final class Scheduler { version: Version ) { self.di = di - self.logger = logger.forType(Self.self) + self.rootLogger = logger.forType(Self.self) self.resourceSemaphore = ListeningSemaphore( maximumValues: .of( runningTests: Int(numberOfSimulators) @@ -72,25 +72,29 @@ public final class Scheduler { return } guard let bucket = self.schedulerDataSource?.nextBucket() else { - self.logger.debug("Data Source returned no bucket") + self.rootLogger.debug("Data Source returned no bucket") return } - self.logger.debug("Data Source returned bucket: \(bucket)") - self.runTestsFromFetchedBucket(bucket) + let logger = self.rootLogger.withMetadata(bucket.analyticsConfiguration.metadata ?? [:]) + logger.debug("Data Source returned bucket: \(bucket)") + self.runTestsFromFetchedBucket(bucket: bucket, logger: logger) } } - private func runTestsFromFetchedBucket(_ bucket: SchedulerBucket) { + private func runTestsFromFetchedBucket( + bucket: SchedulerBucket, + logger: ContextualLogger + ) { do { let acquireResources = try resourceSemaphore.acquire(.of(runningTests: 1)) let runTestsInBucketAfterAcquiringResources = BlockOperation { do { - let testingResult = self.execute(bucket: bucket) + let testingResult = self.execute(bucket: bucket, logger: logger) try self.resourceSemaphore.release(.of(runningTests: 1)) self.schedulerDelegate?.scheduler(self, obtainedTestingResult: testingResult, forBucket: bucket) self.fetchAndRunBucket() } catch { - self.logger.error("Error running tests from fetched bucket with error: \(error). Bucket: \(bucket)") + logger.error("Error running tests from fetched bucket with error: \(error). Bucket: \(bucket)") } } acquireResources.addCascadeCancellableDependency(runTestsInBucketAfterAcquiringResources) @@ -102,10 +106,13 @@ public final class Scheduler { // MARK: - Running the Tests - private func execute(bucket: SchedulerBucket) -> TestingResult { + private func execute( + bucket: SchedulerBucket, + logger: ContextualLogger + ) -> TestingResult { let startedAt = Date() do { - return try self.runRetrying(bucket: bucket) + return try runRetrying(bucket: bucket, logger: logger) } catch { logger.error("Failed to execute bucket \(bucket.bucketId): \(error)") return TestingResult( @@ -136,8 +143,11 @@ public final class Scheduler { /** Runs tests in a given Bucket, retrying failed tests multiple times if necessary. */ - private func runRetrying(bucket: SchedulerBucket) throws -> TestingResult { - let firstRun = try runBucketOnce(bucket: bucket, testsToRun: bucket.testEntries) + private func runRetrying( + bucket: SchedulerBucket, + logger: ContextualLogger + ) throws -> TestingResult { + let firstRun = try runBucketOnce(bucket: bucket, testsToRun: bucket.testEntries, logger: logger) guard bucket.testExecutionBehavior.numberOfRetries > 0 else { return firstRun @@ -153,13 +163,17 @@ public final class Scheduler { } logger.debug("After last run \(failedTestEntriesAfterLastRun.count) tests have failed: \(failedTestEntriesAfterLastRun).") logger.debug("Retrying them, attempt #\(retryNumber + 1) of maximum \(bucket.testExecutionBehavior.numberOfRetries) attempts") - lastRunResults = try runBucketOnce(bucket: bucket, testsToRun: failedTestEntriesAfterLastRun) + lastRunResults = try runBucketOnce(bucket: bucket, testsToRun: failedTestEntriesAfterLastRun, logger: logger) results.append(lastRunResults) } return try combine(runResults: results) } - private func runBucketOnce(bucket: SchedulerBucket, testsToRun: [TestEntry]) throws -> TestingResult { + private func runBucketOnce( + bucket: SchedulerBucket, + testsToRun: [TestEntry], + logger: ContextualLogger + ) throws -> TestingResult { let simulatorPool = try di.get(OnDemandSimulatorPool.self).pool( key: OnDemandSimulatorPoolKey( developerDir: bucket.developerDir,