From a9c45d21599c88715fd312704d4fbaaa084cf19a Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 14 Mar 2024 14:35:26 +0800 Subject: [PATCH] =?UTF-8?q?impr:=20=E4=BC=98=E5=8C=96=E5=BC=95=E7=94=A8?= =?UTF-8?q?=E6=B8=85=E7=90=86=E4=BB=BB=E5=8A=A1=20#1877?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/batch/FileReferenceCleanupJob.kt | 10 ++- .../bkrepo/job/batch/utils/NodeCommonUtils.kt | 90 +++++++++++++++---- 2 files changed, 77 insertions(+), 23 deletions(-) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt index 91bf431ff0..fadaf06bdc 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt @@ -47,7 +47,6 @@ import com.tencent.bkrepo.job.batch.context.FileJobContext import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties import com.tencent.bkrepo.job.exception.JobExecuteException -import com.tencent.bkrepo.repository.api.FileReferenceClient import com.tencent.bkrepo.repository.api.StorageCredentialsClient import com.tencent.bkrepo.repository.constant.SYSTEM_USER import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -73,7 +72,6 @@ class FileReferenceCleanupJob( private val storageCredentialsClient: StorageCredentialsClient, private val properties: FileReferenceCleanupJobProperties, private val archiveClient: ArchiveClient, - private val fileReferenceClient: FileReferenceClient, ) : MongoDbBatchJob(properties) { /** @@ -166,7 +164,11 @@ class FileReferenceCleanupJob( * 2. 真实判断存储实例的节点是否存在。(引用不正确的情况或者布隆过滤器的误报) * */ val query = Query(where(Node::sha256).isEqualTo(sha256)) - return bf.mightContain(sha256) && NodeCommonUtils.findNodes(query, key).isNotEmpty() + val mightContain = bf.mightContain(sha256) + if (mightContain) { + logger.info("Bloom filter might contain $sha256.") + } + return mightContain && NodeCommonUtils.exist(query, key) } private fun buildBloomFilter(): BloomFilter { @@ -178,7 +180,7 @@ class FileReferenceCleanupJob( ) val query = Query(Criteria.where(FOLDER).isEqualTo(false)) query.fields().include(SHA256) - NodeCommonUtils.forEachNode(query) { + NodeCommonUtils.forEachNodeByCollectionParallel(query) { val sha256 = it[SHA256]?.toString() if (sha256 != null) { bf.put(sha256) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt index ba19ba10f8..1d571069e9 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt @@ -1,5 +1,6 @@ package com.tencent.bkrepo.job.batch.utils +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID import com.tencent.bkrepo.common.mongo.dao.util.sharding.HashShardingUtils @@ -13,6 +14,10 @@ import org.springframework.data.mongodb.core.find import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.stereotype.Component +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit import java.util.function.Consumer @Component @@ -37,6 +42,15 @@ class NodeCommonUtils( companion object { lateinit var mongoTemplate: MongoTemplate private const val COLLECTION_NAME_PREFIX = "node_" + private val workPool = ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors(), + 1L, + TimeUnit.MINUTES, + ArrayBlockingQueue(DEFAULT_BUFFER_SIZE), + ThreadFactoryBuilder().setNameFormat("node-utils-%d").build(), + ) + fun findNodes(query: Query, storageCredentialsKey: String?): List { val nodes = mutableListOf() (0 until SHARDING_COUNT).map { "$COLLECTION_NAME_PREFIX$it" }.forEach { collection -> @@ -49,27 +63,65 @@ class NodeCommonUtils( return nodes } - fun forEachNode(query: Query, batchSize: Int = BATCH_SIZE, consumer: Consumer>) { - var querySize: Int - (0 until SHARDING_COUNT).map { "$COLLECTION_NAME_PREFIX$it" }.forEach { collection -> - var lastId = ObjectId(MIN_OBJECT_ID) - do { - val newQuery = Query.of(query) - .addCriteria(Criteria.where(ID).gt(lastId)) - .limit(batchSize) - .with(Sort.by(ID).ascending()) - val data = mongoTemplate.find>( - newQuery, - collection, - ) - if (data.isEmpty()) { - break + fun exist(query: Query, storageCredentialsKey: String?): Boolean { + for (i in 0 until SHARDING_COUNT) { + val collection = COLLECTION_NAME_PREFIX.plus(i) + val find = mongoTemplate.find(query, Node::class.java, collection) + .distinctBy { it.projectId + it.repoName } + .any { + val repo = RepositoryCommonUtils.getRepositoryDetail(it.projectId, it.repoName) + repo.storageCredentials?.key == storageCredentialsKey } - data.forEach { consumer.accept(it) } - querySize = data.size - lastId = data.last()[ID] as ObjectId - } while (querySize == batchSize) + if (find) { + return true + } } + return false + } + + fun forEachNodeByCollectionParallel( + query: Query, + batchSize: Int = BATCH_SIZE, + consumer: Consumer>, + ) { + val countDownLatch = CountDownLatch(SHARDING_COUNT) + for (i in 0 until SHARDING_COUNT) { + val collection = COLLECTION_NAME_PREFIX.plus(i) + workPool.execute { + try { + findByCollection(query, batchSize, collection, consumer) + } finally { + countDownLatch.countDown() + } + } + } + countDownLatch.await() + } + + private fun findByCollection( + query: Query, + batchSize: Int, + collection: String, + consumer: Consumer>, + ) { + var querySize: Int + var lastId = ObjectId(MIN_OBJECT_ID) + do { + val newQuery = Query.of(query) + .addCriteria(Criteria.where(ID).gt(lastId)) + .limit(batchSize) + .with(Sort.by(ID).ascending()) + val data = mongoTemplate.find>( + newQuery, + collection, + ) + if (data.isEmpty()) { + break + } + data.forEach { consumer.accept(it) } + querySize = data.size + lastId = data.last()[ID] as ObjectId + } while (querySize == batchSize) } fun collectionNames(projectIds: List): List {