Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 修复升级框架后使用consumer方式消费消息报错#2446 #2447

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ package com.tencent.bkrepo.analyst.configuration
import com.tencent.bkrepo.analysis.executor.api.ExecutorClient
import com.tencent.bkrepo.analyst.dispatcher.SubtaskDispatcherFactory
import com.tencent.bkrepo.analyst.dispatcher.SubtaskPoller
import com.tencent.bkrepo.analyst.event.ScanEventConsumer
import com.tencent.bkrepo.analyst.service.ExecutionClusterService
import com.tencent.bkrepo.analyst.service.ScannerService
import com.tencent.bkrepo.analyst.service.impl.OperateLogServiceImpl
import com.tencent.bkrepo.analyst.service.impl.ProjectUsageStatisticsServiceImpl
import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.common.operate.api.OperateLogService
import com.tencent.bkrepo.common.operate.api.ProjectUsageStatisticsService
import com.tencent.bkrepo.common.service.condition.ConditionalOnNotAssembly
Expand All @@ -45,6 +47,7 @@ import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.function.Consumer

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(
Expand Down Expand Up @@ -84,4 +87,14 @@ class ScannerConfiguration {
): ProjectUsageStatisticsService {
return ProjectUsageStatisticsServiceImpl(client)
}


@Bean("scanEventConsumer")
fun scanEventConsumer(
scanEventConsumer: ScanEventConsumer
): Consumer<ArtifactEvent> {
return Consumer {
scanEventConsumer.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import com.tencent.bkrepo.repository.pojo.packages.PackageSummary
import org.slf4j.LoggerFactory
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.stereotype.Component
import java.util.function.Consumer

/**
* 构件事件消费者,用于触发制品更新扫描
Expand All @@ -71,7 +70,7 @@ class ScanEventConsumer(
private val scanPlanDao: ScanPlanDao,
private val projectScanConfigurationService: ProjectScanConfigurationService,
private val executor: ThreadPoolTaskExecutor
) : Consumer<ArtifactEvent> {
) {

/**
* 允许接收的事件类型
Expand All @@ -82,7 +81,7 @@ class ScanEventConsumer(
EventType.VERSION_UPDATED
)

override fun accept(event: ArtifactEvent) {
fun accept(event: ArtifactEvent) {
if (!acceptTypes.contains(event.type)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.tencent.bkrepo.fs.server.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.fs.server.listener.NodeModifyListener
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import java.util.function.Consumer

@Configuration
class FsConsumerConfig {

@Bean("artifactEventFs")
fun nodeModifyListener(
nodeModifyListener: NodeModifyListener
): Consumer<Message<ArtifactEvent>> {
return Consumer {
nodeModifyListener.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.concurrent.Executors
import java.util.function.Consumer

@Component("artifactEventFs")
@Component
class NodeModifyListener(
private val rRepositoryClient: RRepositoryClient,
private val fileNodeService: FileNodeService
) : Consumer<Message<ArtifactEvent>> {
) {

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
val event = message.payload
val type = event.type
// 覆盖创建也会先删除,再创建。所以这里只需关注删除事件即可。
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.tencent.bkrepo.helm.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.helm.listener.consumer.PackageReplicationEventConsumer
import com.tencent.bkrepo.helm.listener.consumer.RemoteRepoEventConsumer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import java.util.function.Consumer

@Configuration
class HelmConsumerConfig {

@Bean("packageReplication")
fun packageReplicationEventConsumer(
packageReplicationEventConsumer: PackageReplicationEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
packageReplicationEventConsumer.accept(it)
}
}

@Bean("remoteRepo")
fun remoteRepoEventConsumer(
remoteRepoEventConsumer: RemoteRepoEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
remoteRepoEventConsumer.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@ import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor
import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

/**
* 消费基于MQ传递的事件
* 消费分发同步的Package, 用于更新index文件
*/
@Component("packageReplication")
@Component
class PackageReplicationEventConsumer(
private val remoteEventJobExecutor: RemoteEventJobExecutor
) : Consumer<Message<ArtifactEvent>> {
) {

/**
* 允许接收的事件类型
Expand All @@ -52,7 +51,7 @@ class PackageReplicationEventConsumer(
EventType.VERSION_UPDATED,
)

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
if (!acceptTypes.contains(message.payload.type)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ package com.tencent.bkrepo.helm.listener.consumer
import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.common.artifact.event.base.EventType
import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor
import java.util.function.Consumer
import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
Expand All @@ -39,10 +38,10 @@ import org.springframework.stereotype.Component
* 构件事件消费者,用于实时同步
* 对应destination为对应ArtifactEvent.topic
*/
@Component("remoteRepo")
@Component
class RemoteRepoEventConsumer(
private val remoteEventJobExecutor: RemoteEventJobExecutor
) : Consumer<Message<ArtifactEvent>> {
) {

/**
* 允许接收的事件类型
Expand All @@ -53,7 +52,7 @@ class RemoteRepoEventConsumer(
EventType.REPO_REFRESHED
)

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
if (!acceptTypes.contains(message.payload.type)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@

package com.tencent.bkrepo.job.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.job.executor.BlockThreadPoolTaskExecutorDecorator
import com.tencent.bkrepo.job.migrate.config.MigrateRepoStorageProperties
import com.tencent.bkrepo.job.separation.config.DataSeparationConfig
import com.tencent.bkrepo.job.separation.listener.SeparationRecoveryEventConsumer
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.function.Consumer

/**
* Job配置
Expand All @@ -57,4 +61,13 @@ class JobConfig {
Runtime.getRuntime().availableProcessors()
)
}

@Bean("separationRecovery")
fun separationRecoveryEventConsumer(
separationRecoveryEventConsumer: SeparationRecoveryEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
separationRecoveryEventConsumer.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,18 @@ import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.time.format.DateTimeFormatter
import java.util.function.Consumer

/**
* 消费降冷自动恢复事件
*/
@Component("separationRecovery")
@Component
class SeparationRecoveryEventConsumer(
private val separationTaskService: SeparationTaskService,
private val dataSeparationConfig: DataSeparationConfig,
private val separationPackageDao: SeparationPackageDao,
private val separationPackageVersionDao: SeparationPackageVersionDao,
private val separationNodeDao: SeparationNodeDao
) : Consumer<Message<ArtifactEvent>> {
) {

/**
* 允许接收的事件类型
Expand All @@ -70,7 +69,7 @@ class SeparationRecoveryEventConsumer(
EventType.NODE_SEPARATION_RECOVERY,
)

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
if (!dataSeparationConfig.enableAutoRecovery) return
if (!acceptTypes.contains(message.payload.type)) {
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.tencent.bkrepo.oci.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.oci.listener.consumer.RemoteImageRepoEventConsumer
import com.tencent.bkrepo.oci.listener.consumer.ThirdPartyReplicationEventConsumer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import java.util.function.Consumer

@Configuration
class OciConsumerConfig {

// 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704
@Bean("remoteOciRepo")
fun remoteImageRepoEventConsumer(
remoteImageRepoEventConsumer: RemoteImageRepoEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
remoteImageRepoEventConsumer.accept(it)
}
}

@Bean("thirdPartyReplication")
fun thirdPartyReplicationEventConsumer(
thirdPartyReplicationEventConsumer: ThirdPartyReplicationEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
thirdPartyReplicationEventConsumer.accept(it)
}
}
}
Loading
Loading