From 28b7d8bc8112e6747b814e52dc7a73b4bd61619b Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Wed, 3 Apr 2024 00:54:22 +0530 Subject: [PATCH] logging for transform Signed-off-by: Sarthak Aggarwal --- .../transform/TransformRunner.kt | 17 +++++++++++++++++ .../transform/TransformSearchService.kt | 4 ++++ 2 files changed, 21 insertions(+) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 8ac49dce0..a27a9c6a3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -117,6 +117,7 @@ object TransformRunner : val transformLockManager = transformContext.transformLockManager transformLockManager.acquireLockForScheduledJob() try { + logger.debug("Transform Job ${transform.id} Initiating.") do { when { transformLockManager.lock == null -> { @@ -141,6 +142,7 @@ object TransformRunner : // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { // Note the timestamp when we got the shard global checkpoints to the user may know what data is included + logger.debug("Transform job ${transform.id} is populating shards to search for index ${transform.sourceIndex}.") newGlobalCheckpointTime = Instant.now() newGlobalCheckpoints = transformSearchService.getShardsGlobalCheckpoint(transform.sourceIndex) bucketsToTransform = @@ -148,6 +150,11 @@ object TransformRunner : metadata.shardIDToGlobalCheckpoint, newGlobalCheckpoints, ) + logger.debug( + "Transform job {} fetched global checkpoints {}.", + transform.id, + newGlobalCheckpoints + ) } // If there are shards to search do it here if (bucketsToTransform.currentShard != null) { @@ -161,6 +168,13 @@ object TransformRunner : bucketsToTransform.modifiedBuckets.filter { transformProcessedBucketLog.isNotProcessed(it) }.toMutableSet() + + logger.debug( + "Transform job {} recompute to start with modified buckets {}. Processing shard {}.", + transform.id, + modifiedBuckets.size, + bucketsToTransform.currentShard + ) // Recompute modified buckets and update them in targetIndex currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext) // Add processed buckets to 'processed set' so that we don't try to reprocess them again @@ -229,6 +243,7 @@ object TransformRunner : ) } currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets) + logger.debug("Transform job {} has current buckets {} to transform. Processing Shard {} with checkpoints from {} to {}.", transform.id,currentBucketsToTransform.modifiedBuckets.size, currentShard.shardId ,currentShard.from, currentShard.to) val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis + shardLevelModifiedBuckets.searchTimeInMillis @@ -325,7 +340,9 @@ object TransformRunner : } val indexTimeInMillis = withTransformSecurityContext(transform) { + logger.debug("Transform job {} starting to index for target index: {} with documents {}.", transform.id,transform.targetIndex, transformSearchResult.docsToIndex.size) transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) + logger.debug("Transform job {} completed to index for target index: {}.", transform.id, transform.targetIndex) } val stats = transformSearchResult.stats val updatedStats = diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index b9944e425..453785913 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -151,12 +151,16 @@ class TransformSearchService( return@suspendUntil } val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds) + logger.debug("Transform job {} is starting its search request {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, request.source(), currentShard.from, currentShard.to) search(request, listener) + logger.debug("Transform job {} has completed search request for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to) } } // If the request was successful, update page size transformContext.lastSuccessfulPageSize = pageSize + logger.debug("Transform job {} updated page size {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, pageSize, currentShard.shardId, currentShard.from, currentShard.to) transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) + logger.debug("Transform job {} is renewing lock for long search for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to, (Instant.now().epochSecond - searchStart)) return convertBucketSearchResponse(transform, searchResponse) } catch (e: TransformSearchServiceException) { throw e