From 85fc87b9a3a374cfa09f34737f23054913633b5e Mon Sep 17 00:00:00 2001 From: Nico Koprowski Date: Fri, 16 Aug 2024 13:47:51 +0800 Subject: [PATCH] fix(Pool): Fix not resolving golden record tasks on exceptions - Now save unresolved tasks in a queue which gets checked and resolved on schedule - Make sure that no unnecessary transaction exceptions are disrupt the resolution process - reducing the number of unresolved tasks --- CHANGELOG.md | 5 +- .../bpdm/pool/entity/GoldenRecordTaskDb.kt | 40 ++++++ .../repository/GoldenRecordTaskRepository.kt | 31 ++++ .../bpdm/pool/service/TaskEntryBpnMapping.kt | 7 +- ...rveService.kt => TaskResolutionService.kt} | 134 +++++++++++++----- .../bpdm/pool/service/TaskStepBuildService.kt | 8 +- .../V6_2_0_0__add_golden_record_tasks.sql | 11 ++ ...ceTest.kt => TaskResolutionServiceTest.kt} | 4 +- 8 files changed, 194 insertions(+), 46 deletions(-) create mode 100644 bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/entity/GoldenRecordTaskDb.kt create mode 100644 bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/repository/GoldenRecordTaskRepository.kt rename bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/{TaskStepFetchAndReserveService.kt => TaskResolutionService.kt} (52%) create mode 100644 bpdm-pool/src/main/resources/db/migration/V6_2_0_0__add_golden_record_tasks.sql rename bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/{TaskStepFetchAndReserveServiceTest.kt => TaskResolutionServiceTest.kt} (99%) diff --git a/CHANGELOG.md b/CHANGELOG.md index da9811cb8..6be157a70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,11 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C ### Added --BPDM Pool: Post endpoint to create a site for LegalAndSiteMainAddress addressType.([#739](https://github.com/eclipse-tractusx/sig-release/issues/739)) +- BPDM Pool: Post endpoint to create a site for LegalAndSiteMainAddress addressType.([#739](https://github.com/eclipse-tractusx/sig-release/issues/739)) +### Changed + +- BPDM Pool: Fix not resolving golden record tasks on exceptions ## [6.1.0] - [2024-07-15] diff --git a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/entity/GoldenRecordTaskDb.kt b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/entity/GoldenRecordTaskDb.kt new file mode 100644 index 000000000..1b97a714e --- /dev/null +++ b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/entity/GoldenRecordTaskDb.kt @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright (c) 2021,2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.pool.entity + +import jakarta.persistence.Column +import jakarta.persistence.Entity +import jakarta.persistence.Table +import org.eclipse.tractusx.bpdm.common.model.BaseEntity +import java.time.Instant + + +@Entity +@Table( + name = "golden_record_tasks" +) +class GoldenRecordTaskDb( + @Column(name = "task_id", nullable = false, updatable = false) + val taskId: String, + @Column(name = "is_resolved", nullable = false) + var isResolved: Boolean, + @Column(name = "last_checked", nullable = false) + var lastChecked: Instant +): BaseEntity() \ No newline at end of file diff --git a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/repository/GoldenRecordTaskRepository.kt b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/repository/GoldenRecordTaskRepository.kt new file mode 100644 index 000000000..b3afcbac1 --- /dev/null +++ b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/repository/GoldenRecordTaskRepository.kt @@ -0,0 +1,31 @@ +/******************************************************************************* + * Copyright (c) 2021,2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.eclipse.tractusx.bpdm.pool.repository + +import org.eclipse.tractusx.bpdm.pool.entity.GoldenRecordTaskDb +import org.springframework.data.jpa.repository.JpaRepository +import java.time.Instant + +interface GoldenRecordTaskRepository: JpaRepository { + + fun findFirstByLastCheckedBefore(checkedBefore: Instant): GoldenRecordTaskDb? + + fun findFirstByIsResolved(isResolved: Boolean): GoldenRecordTaskDb? +} \ No newline at end of file diff --git a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskEntryBpnMapping.kt b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskEntryBpnMapping.kt index e60ea1adc..4e09d9556 100644 --- a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskEntryBpnMapping.kt +++ b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskEntryBpnMapping.kt @@ -21,9 +21,9 @@ package org.eclipse.tractusx.bpdm.pool.service import org.eclipse.tractusx.bpdm.pool.entity.BpnRequestIdentifierMappingDb import org.eclipse.tractusx.bpdm.pool.repository.BpnRequestIdentifierRepository +import org.eclipse.tractusx.orchestrator.api.model.BpnReference import org.eclipse.tractusx.orchestrator.api.model.BpnReferenceType import org.eclipse.tractusx.orchestrator.api.model.TaskStepReservationEntryDto -import org.eclipse.tractusx.orchestrator.api.model.BpnReference class TaskEntryBpnMapping(taskEntries: List, bpnRequestIdentifierRepository: BpnRequestIdentifierRepository) { @@ -41,9 +41,10 @@ class TaskEntryBpnMapping(taskEntries: List, bpnReq taskEntries.mapNotNull { it.businessPartner.site?.bpnReference } + taskEntries.mapNotNull { it.businessPartner.site?.siteMainAddress?.bpnReference } + taskEntries.mapNotNull { it.businessPartner.additionalAddress?.bpnReference } - .filter { it.referenceValue == null || it.referenceType == null } - val usedRequestIdentifiers: Collection = references + val filledReferences = references.filter { it.referenceValue != null && it.referenceType != null } + + val usedRequestIdentifiers: Collection = filledReferences .filter { it.referenceType == BpnReferenceType.BpnRequestIdentifier } .map { it.referenceValue!! } diff --git a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepFetchAndReserveService.kt b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskResolutionService.kt similarity index 52% rename from bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepFetchAndReserveService.kt rename to bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskResolutionService.kt index f6cb1a3f7..6309d769c 100644 --- a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepFetchAndReserveService.kt +++ b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskResolutionService.kt @@ -19,52 +19,126 @@ package org.eclipse.tractusx.bpdm.pool.service +import jakarta.persistence.EntityManager import mu.KotlinLogging import org.eclipse.tractusx.bpdm.pool.config.GoldenRecordTaskConfigProperties +import org.eclipse.tractusx.bpdm.pool.entity.GoldenRecordTaskDb import org.eclipse.tractusx.bpdm.pool.exception.BpdmValidationException import org.eclipse.tractusx.bpdm.pool.repository.BpnRequestIdentifierRepository +import org.eclipse.tractusx.bpdm.pool.repository.GoldenRecordTaskRepository import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient import org.eclipse.tractusx.orchestrator.api.model.* import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional +import org.springframework.web.reactive.function.client.WebClientException +import org.springframework.web.reactive.function.client.WebClientResponseException +import java.time.Instant @Service -class TaskReservationBatchService( - private val taskStepFetchAndReserveService: TaskStepFetchAndReserveService -){ +class TaskBatchResolutionService( + private val taskStepFetchAndReserveService: TaskResolutionService, + private val goldenRecordTaskConfigProperties: GoldenRecordTaskConfigProperties, + private val orchestrationClient: OrchestrationApiClient, + private val goldenRecordTaskRepository: GoldenRecordTaskRepository, + private val entityManager: EntityManager, + + ){ private val logger = KotlinLogging.logger { } @Scheduled(cron = "#{${GoldenRecordTaskConfigProperties.GET_CRON}}", zone = "UTC") - fun process(){ - logger.info { "Starting polling for cleaning tasks from Orchestrator..." } + fun processTasks(){ + logger.info { "Start golden record task processing schedule..." } + reserveAndResolve() + resolveUnresolved() + deleteResolved() + } + + fun reserveAndResolve(){ var totalTasksProcessed = 0 do{ - val tasksProcessed = taskStepFetchAndReserveService.fetchAndReserve() - totalTasksProcessed += tasksProcessed - }while (tasksProcessed != 0) + val reservationRequest = TaskStepReservationRequest(step = TaskStep.PoolSync, amount = goldenRecordTaskConfigProperties.batchSize) + val taskStepReservation = orchestrationClient.goldenRecordTasks.reserveTasksForStep(reservationRequest = reservationRequest) + + val successfullyResolved = try{ + taskStepFetchAndReserveService.resolveTasks(taskStepReservation) + true + } catch (ex: Throwable) { + logger.error(ex) { "Error while processing cleaning task" } + false + } + + totalTasksProcessed += taskStepReservation.reservedTasks.size + + if(!successfullyResolved) + goldenRecordTaskRepository.saveAll(taskStepReservation.reservedTasks.map { GoldenRecordTaskDb(it.taskId, true, Instant.now()) }) + + entityManager.clear() + }while (taskStepReservation.reservedTasks.isNotEmpty()) logger.info { "Total of $totalTasksProcessed processed" } } + + fun resolveUnresolved(){ + val scheduleTime = Instant.now() + + val resultTemplate = TaskStepResultEntryDto("", BusinessPartner.empty, listOf(TaskErrorDto(TaskErrorType.Unspecified, "Unknown golden record process error during Pool communication"))) + + var processedTasks = 0 + var resolvedTasks = 0 + do{ + val task = goldenRecordTaskRepository.findFirstByLastCheckedBefore(scheduleTime)?.let { task -> + if(!task.isResolved){ + try{ + orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(TaskStep.PoolSync, listOf(resultTemplate.copy(taskId = task.taskId)))) + task.isResolved = true + resolvedTasks++ + }catch (e: WebClientResponseException.BadRequest){ + logger.warn { "Tracked task ${task.taskId} seems to be missing in Orchestrator. Marking it as resolved..." } + task.isResolved = true + resolvedTasks++ + }catch (e: WebClientException){ + logger.error { "Could not resolve task '${task.taskId}: ${e.message}'" } + task.isResolved = false + } + } + + task.lastChecked = scheduleTime + goldenRecordTaskRepository.save(task) + entityManager.clear() + + processedTasks++ + } + }while (task != null) + + logger.info { "Resolving tasks: Checked $processedTasks tasks and resolved $resolvedTasks tasks as errors" } + + } + + fun deleteResolved(){ + var tasksDeleted = 0 + do{ + val task = goldenRecordTaskRepository.findFirstByIsResolved(true)?.let { task -> + goldenRecordTaskRepository.delete(task) + entityManager.clear() + tasksDeleted++ + } + }while (task != null) + + logger.info { "Deleted $tasksDeleted resolved tasks" } + } } @Service -class TaskStepFetchAndReserveService( +class TaskResolutionService( private val orchestrationClient: OrchestrationApiClient, private val taskStepBuildService: TaskStepBuildService, private val bpnRequestIdentifierRepository: BpnRequestIdentifierRepository, - private val goldenRecordTaskConfigProperties: GoldenRecordTaskConfigProperties + private val entityManager: EntityManager ) { private val logger = KotlinLogging.logger { } - @Transactional - fun fetchAndReserve(): Int { - try { - logger.info { "Reserving next chunk of cleaning tasks from Orchestrator..." } - val reservationRequest = TaskStepReservationRequest(step = TaskStep.PoolSync, amount = goldenRecordTaskConfigProperties.batchSize) - val taskStepReservation = orchestrationClient.goldenRecordTasks.reserveTasksForStep(reservationRequest = reservationRequest) - + fun resolveTasks(taskStepReservation: TaskStepReservationResponse) { logger.info { "${taskStepReservation.reservedTasks.size} tasks found for cleaning. Proceeding with cleaning..." } if (taskStepReservation.reservedTasks.isNotEmpty()) { @@ -76,37 +150,22 @@ class TaskStepFetchAndReserveService( error.copy(description = error.description.take(250)) }) } - try{ - orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step = TaskStep.PoolSync, results = resultsWithSafeErrors)) - }catch (e: Throwable){ - logger.error { "Some unexpected problem on the orchestrator side. Try to resolve the current tasks with generic exceptions to at least get them out of the reserved state" } - val genericErrorResults = resultsWithSafeErrors.map { TaskStepResultEntryDto(it.taskId, BusinessPartner.empty, listOf(TaskErrorDto(TaskErrorType.Unspecified, "Unknown exception when communicating with the golden record process"))) } - orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step = TaskStep.PoolSync, results = genericErrorResults )) - } - + orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step = TaskStep.PoolSync, results = resultsWithSafeErrors)) } logger.info { "Cleaning tasks processing completed for this iteration." } - - return taskStepReservation.reservedTasks.size - } catch (ex: Throwable) { - logger.error(ex) { "Error while processing cleaning task" } - return 0 - } } fun upsertGoldenRecordIntoPool(taskEntries: List): List { - val taskEntryBpnMapping = TaskEntryBpnMapping(taskEntries, bpnRequestIdentifierRepository) + val taskResults = taskEntries.map { businessPartnerTaskResult(it) } - val taskResults = taskEntries.map { businessPartnerTaskResult(it, taskEntryBpnMapping) } - taskEntryBpnMapping.writeCreatedMappingsToDb(bpnRequestIdentifierRepository) return taskResults } - private fun businessPartnerTaskResult(taskStep: TaskStepReservationEntryDto, taskEntryBpnMapping: TaskEntryBpnMapping): TaskStepResultEntryDto { + private fun businessPartnerTaskResult(taskStep: TaskStepReservationEntryDto): TaskStepResultEntryDto { return try { - taskStepBuildService.upsertBusinessPartner(taskStep, taskEntryBpnMapping) + taskStepBuildService.upsertBusinessPartner(taskStep) } catch (ex: BpdmValidationException) { TaskStepResultEntryDto( taskId = taskStep.taskId, @@ -132,5 +191,4 @@ class TaskStepFetchAndReserveService( ) } } - } \ No newline at end of file diff --git a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepBuildService.kt b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepBuildService.kt index b0f11a748..893373c65 100644 --- a/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepBuildService.kt +++ b/bpdm-pool/src/main/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepBuildService.kt @@ -26,6 +26,7 @@ import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest import org.eclipse.tractusx.bpdm.pool.api.model.* import org.eclipse.tractusx.bpdm.pool.api.model.request.* import org.eclipse.tractusx.bpdm.pool.exception.BpdmValidationException +import org.eclipse.tractusx.bpdm.pool.repository.BpnRequestIdentifierRepository import org.eclipse.tractusx.orchestrator.api.model.* import org.springframework.stereotype.Service import java.time.Instant @@ -43,7 +44,8 @@ class TaskStepBuildService( private val businessPartnerBuildService: BusinessPartnerBuildService, private val businessPartnerFetchService: BusinessPartnerFetchService, private val siteService: SiteService, - private val addressService: AddressService + private val addressService: AddressService, + private val bpnRequestIdentifierRepository: BpnRequestIdentifierRepository ) { enum class CleaningError(val message: String) { @@ -74,7 +76,8 @@ class TaskStepBuildService( } @Transactional - fun upsertBusinessPartner(taskEntry: TaskStepReservationEntryDto, taskEntryBpnMapping: TaskEntryBpnMapping): TaskStepResultEntryDto { + fun upsertBusinessPartner(taskEntry: TaskStepReservationEntryDto): TaskStepResultEntryDto { + val taskEntryBpnMapping = TaskEntryBpnMapping(listOf(taskEntry), bpnRequestIdentifierRepository) val businessPartnerDto = taskEntry.businessPartner val legalEntityBpns = processLegalEntity(businessPartnerDto, taskEntryBpnMapping) @@ -97,6 +100,7 @@ class TaskStepBuildService( ) } + taskEntryBpnMapping.writeCreatedMappingsToDb(bpnRequestIdentifierRepository) return TaskStepResultEntryDto( taskId = taskEntry.taskId, businessPartner = businessPartnerWithBpns, diff --git a/bpdm-pool/src/main/resources/db/migration/V6_2_0_0__add_golden_record_tasks.sql b/bpdm-pool/src/main/resources/db/migration/V6_2_0_0__add_golden_record_tasks.sql new file mode 100644 index 000000000..89424575c --- /dev/null +++ b/bpdm-pool/src/main/resources/db/migration/V6_2_0_0__add_golden_record_tasks.sql @@ -0,0 +1,11 @@ +create table golden_record_tasks +( + id bigint not null, + uuid UUID not null, + created_at timestamp without time zone not null, + updated_at timestamp without time zone not null, + task_id varchar(255) not null, + is_resolved boolean not null, + last_checked timestamp without time zone not null, + primary key (id) +); \ No newline at end of file diff --git a/bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepFetchAndReserveServiceTest.kt b/bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskResolutionServiceTest.kt similarity index 99% rename from bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepFetchAndReserveServiceTest.kt rename to bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskResolutionServiceTest.kt index 74ed35924..4901a247e 100644 --- a/bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskStepFetchAndReserveServiceTest.kt +++ b/bpdm-pool/src/test/kotlin/org/eclipse/tractusx/bpdm/pool/service/TaskResolutionServiceTest.kt @@ -53,8 +53,8 @@ import java.util.* ) @ActiveProfiles("test-no-auth") @ContextConfiguration(initializers = [PostgreSQLContextInitializer::class]) -class TaskStepFetchAndReserveServiceTest @Autowired constructor( - val cleaningStepService: TaskStepFetchAndReserveService, +class TaskResolutionServiceTest @Autowired constructor( + val cleaningStepService: TaskResolutionService, val bpnRequestIdentifierRepository: BpnRequestIdentifierRepository, val poolClient: PoolApiClient, val dbTestHelpers: DbTestHelpers,