Skip to content

Commit

Permalink
Merge pull request #1033 from eclipse-tractusx/fix/pool-gr-task-resol…
Browse files Browse the repository at this point in the history
…ution

fix(Pool): Fix not resolving golden record tasks on exceptions
  • Loading branch information
nicoprow authored Aug 16, 2024
2 parents 1b50da5 + 85fc87b commit d109108
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 46 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C

### Added

-BPDM Pool: Post endpoint to create a site for LegalAndSiteMainAddress addressType.([#739](https://github.com/eclipse-tractusx/sig-release/issues/739))
- BPDM Pool: Post endpoint to create a site for LegalAndSiteMainAddress addressType.([#739](https://github.com/eclipse-tractusx/sig-release/issues/739))

### Changed

- BPDM Pool: Fix not resolving golden record tasks on exceptions

## [6.1.0] - [2024-07-15]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*******************************************************************************
* Copyright (c) 2021,2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.pool.entity

import jakarta.persistence.Column
import jakarta.persistence.Entity
import jakarta.persistence.Table
import org.eclipse.tractusx.bpdm.common.model.BaseEntity
import java.time.Instant


@Entity
@Table(
name = "golden_record_tasks"
)
class GoldenRecordTaskDb(
@Column(name = "task_id", nullable = false, updatable = false)
val taskId: String,
@Column(name = "is_resolved", nullable = false)
var isResolved: Boolean,
@Column(name = "last_checked", nullable = false)
var lastChecked: Instant
): BaseEntity()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*******************************************************************************
* Copyright (c) 2021,2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/

package org.eclipse.tractusx.bpdm.pool.repository

import org.eclipse.tractusx.bpdm.pool.entity.GoldenRecordTaskDb
import org.springframework.data.jpa.repository.JpaRepository
import java.time.Instant

interface GoldenRecordTaskRepository: JpaRepository<GoldenRecordTaskDb, Long> {

fun findFirstByLastCheckedBefore(checkedBefore: Instant): GoldenRecordTaskDb?

fun findFirstByIsResolved(isResolved: Boolean): GoldenRecordTaskDb?
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ package org.eclipse.tractusx.bpdm.pool.service

import org.eclipse.tractusx.bpdm.pool.entity.BpnRequestIdentifierMappingDb
import org.eclipse.tractusx.bpdm.pool.repository.BpnRequestIdentifierRepository
import org.eclipse.tractusx.orchestrator.api.model.BpnReference
import org.eclipse.tractusx.orchestrator.api.model.BpnReferenceType
import org.eclipse.tractusx.orchestrator.api.model.TaskStepReservationEntryDto
import org.eclipse.tractusx.orchestrator.api.model.BpnReference

class TaskEntryBpnMapping(taskEntries: List<TaskStepReservationEntryDto>, bpnRequestIdentifierRepository: BpnRequestIdentifierRepository) {

Expand All @@ -41,9 +41,10 @@ class TaskEntryBpnMapping(taskEntries: List<TaskStepReservationEntryDto>, bpnReq
taskEntries.mapNotNull { it.businessPartner.site?.bpnReference } +
taskEntries.mapNotNull { it.businessPartner.site?.siteMainAddress?.bpnReference } +
taskEntries.mapNotNull { it.businessPartner.additionalAddress?.bpnReference }
.filter { it.referenceValue == null || it.referenceType == null }

val usedRequestIdentifiers: Collection<String> = references
val filledReferences = references.filter { it.referenceValue != null && it.referenceType != null }

val usedRequestIdentifiers: Collection<String> = filledReferences
.filter { it.referenceType == BpnReferenceType.BpnRequestIdentifier }
.map { it.referenceValue!! }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,126 @@

package org.eclipse.tractusx.bpdm.pool.service

import jakarta.persistence.EntityManager
import mu.KotlinLogging
import org.eclipse.tractusx.bpdm.pool.config.GoldenRecordTaskConfigProperties
import org.eclipse.tractusx.bpdm.pool.entity.GoldenRecordTaskDb
import org.eclipse.tractusx.bpdm.pool.exception.BpdmValidationException
import org.eclipse.tractusx.bpdm.pool.repository.BpnRequestIdentifierRepository
import org.eclipse.tractusx.bpdm.pool.repository.GoldenRecordTaskRepository
import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.*
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import org.springframework.web.reactive.function.client.WebClientException
import org.springframework.web.reactive.function.client.WebClientResponseException
import java.time.Instant

@Service
class TaskReservationBatchService(
private val taskStepFetchAndReserveService: TaskStepFetchAndReserveService
){
class TaskBatchResolutionService(
private val taskStepFetchAndReserveService: TaskResolutionService,
private val goldenRecordTaskConfigProperties: GoldenRecordTaskConfigProperties,
private val orchestrationClient: OrchestrationApiClient,
private val goldenRecordTaskRepository: GoldenRecordTaskRepository,
private val entityManager: EntityManager,

){
private val logger = KotlinLogging.logger { }

@Scheduled(cron = "#{${GoldenRecordTaskConfigProperties.GET_CRON}}", zone = "UTC")
fun process(){
logger.info { "Starting polling for cleaning tasks from Orchestrator..." }
fun processTasks(){
logger.info { "Start golden record task processing schedule..." }
reserveAndResolve()
resolveUnresolved()
deleteResolved()
}


fun reserveAndResolve(){
var totalTasksProcessed = 0
do{
val tasksProcessed = taskStepFetchAndReserveService.fetchAndReserve()
totalTasksProcessed += tasksProcessed
}while (tasksProcessed != 0)
val reservationRequest = TaskStepReservationRequest(step = TaskStep.PoolSync, amount = goldenRecordTaskConfigProperties.batchSize)
val taskStepReservation = orchestrationClient.goldenRecordTasks.reserveTasksForStep(reservationRequest = reservationRequest)

val successfullyResolved = try{
taskStepFetchAndReserveService.resolveTasks(taskStepReservation)
true
} catch (ex: Throwable) {
logger.error(ex) { "Error while processing cleaning task" }
false
}

totalTasksProcessed += taskStepReservation.reservedTasks.size

if(!successfullyResolved)
goldenRecordTaskRepository.saveAll(taskStepReservation.reservedTasks.map { GoldenRecordTaskDb(it.taskId, true, Instant.now()) })

entityManager.clear()
}while (taskStepReservation.reservedTasks.isNotEmpty())

logger.info { "Total of $totalTasksProcessed processed" }
}

fun resolveUnresolved(){
val scheduleTime = Instant.now()

val resultTemplate = TaskStepResultEntryDto("", BusinessPartner.empty, listOf(TaskErrorDto(TaskErrorType.Unspecified, "Unknown golden record process error during Pool communication")))

var processedTasks = 0
var resolvedTasks = 0
do{
val task = goldenRecordTaskRepository.findFirstByLastCheckedBefore(scheduleTime)?.let { task ->
if(!task.isResolved){
try{
orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(TaskStep.PoolSync, listOf(resultTemplate.copy(taskId = task.taskId))))
task.isResolved = true
resolvedTasks++
}catch (e: WebClientResponseException.BadRequest){
logger.warn { "Tracked task ${task.taskId} seems to be missing in Orchestrator. Marking it as resolved..." }
task.isResolved = true
resolvedTasks++
}catch (e: WebClientException){
logger.error { "Could not resolve task '${task.taskId}: ${e.message}'" }
task.isResolved = false
}
}

task.lastChecked = scheduleTime
goldenRecordTaskRepository.save(task)
entityManager.clear()

processedTasks++
}
}while (task != null)

logger.info { "Resolving tasks: Checked $processedTasks tasks and resolved $resolvedTasks tasks as errors" }

}

fun deleteResolved(){
var tasksDeleted = 0
do{
val task = goldenRecordTaskRepository.findFirstByIsResolved(true)?.let { task ->
goldenRecordTaskRepository.delete(task)
entityManager.clear()
tasksDeleted++
}
}while (task != null)

logger.info { "Deleted $tasksDeleted resolved tasks" }
}
}

@Service
class TaskStepFetchAndReserveService(
class TaskResolutionService(
private val orchestrationClient: OrchestrationApiClient,
private val taskStepBuildService: TaskStepBuildService,
private val bpnRequestIdentifierRepository: BpnRequestIdentifierRepository,
private val goldenRecordTaskConfigProperties: GoldenRecordTaskConfigProperties
private val entityManager: EntityManager
) {
private val logger = KotlinLogging.logger { }

@Transactional
fun fetchAndReserve(): Int {
try {
logger.info { "Reserving next chunk of cleaning tasks from Orchestrator..." }
val reservationRequest = TaskStepReservationRequest(step = TaskStep.PoolSync, amount = goldenRecordTaskConfigProperties.batchSize)
val taskStepReservation = orchestrationClient.goldenRecordTasks.reserveTasksForStep(reservationRequest = reservationRequest)

fun resolveTasks(taskStepReservation: TaskStepReservationResponse) {
logger.info { "${taskStepReservation.reservedTasks.size} tasks found for cleaning. Proceeding with cleaning..." }

if (taskStepReservation.reservedTasks.isNotEmpty()) {
Expand All @@ -76,37 +150,22 @@ class TaskStepFetchAndReserveService(
error.copy(description = error.description.take(250))
})
}
try{
orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step = TaskStep.PoolSync, results = resultsWithSafeErrors))
}catch (e: Throwable){
logger.error { "Some unexpected problem on the orchestrator side. Try to resolve the current tasks with generic exceptions to at least get them out of the reserved state" }
val genericErrorResults = resultsWithSafeErrors.map { TaskStepResultEntryDto(it.taskId, BusinessPartner.empty, listOf(TaskErrorDto(TaskErrorType.Unspecified, "Unknown exception when communicating with the golden record process"))) }
orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step = TaskStep.PoolSync, results = genericErrorResults ))
}

orchestrationClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step = TaskStep.PoolSync, results = resultsWithSafeErrors))
}
logger.info { "Cleaning tasks processing completed for this iteration." }

return taskStepReservation.reservedTasks.size
} catch (ex: Throwable) {
logger.error(ex) { "Error while processing cleaning task" }
return 0
}
}

fun upsertGoldenRecordIntoPool(taskEntries: List<TaskStepReservationEntryDto>): List<TaskStepResultEntryDto> {

val taskEntryBpnMapping = TaskEntryBpnMapping(taskEntries, bpnRequestIdentifierRepository)
val taskResults = taskEntries.map { businessPartnerTaskResult(it) }

val taskResults = taskEntries.map { businessPartnerTaskResult(it, taskEntryBpnMapping) }
taskEntryBpnMapping.writeCreatedMappingsToDb(bpnRequestIdentifierRepository)
return taskResults
}

private fun businessPartnerTaskResult(taskStep: TaskStepReservationEntryDto, taskEntryBpnMapping: TaskEntryBpnMapping): TaskStepResultEntryDto {
private fun businessPartnerTaskResult(taskStep: TaskStepReservationEntryDto): TaskStepResultEntryDto {

return try {
taskStepBuildService.upsertBusinessPartner(taskStep, taskEntryBpnMapping)
taskStepBuildService.upsertBusinessPartner(taskStep)
} catch (ex: BpdmValidationException) {
TaskStepResultEntryDto(
taskId = taskStep.taskId,
Expand All @@ -132,5 +191,4 @@ class TaskStepFetchAndReserveService(
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest
import org.eclipse.tractusx.bpdm.pool.api.model.*
import org.eclipse.tractusx.bpdm.pool.api.model.request.*
import org.eclipse.tractusx.bpdm.pool.exception.BpdmValidationException
import org.eclipse.tractusx.bpdm.pool.repository.BpnRequestIdentifierRepository
import org.eclipse.tractusx.orchestrator.api.model.*
import org.springframework.stereotype.Service
import java.time.Instant
Expand All @@ -43,7 +44,8 @@ class TaskStepBuildService(
private val businessPartnerBuildService: BusinessPartnerBuildService,
private val businessPartnerFetchService: BusinessPartnerFetchService,
private val siteService: SiteService,
private val addressService: AddressService
private val addressService: AddressService,
private val bpnRequestIdentifierRepository: BpnRequestIdentifierRepository
) {

enum class CleaningError(val message: String) {
Expand Down Expand Up @@ -74,7 +76,8 @@ class TaskStepBuildService(
}

@Transactional
fun upsertBusinessPartner(taskEntry: TaskStepReservationEntryDto, taskEntryBpnMapping: TaskEntryBpnMapping): TaskStepResultEntryDto {
fun upsertBusinessPartner(taskEntry: TaskStepReservationEntryDto): TaskStepResultEntryDto {
val taskEntryBpnMapping = TaskEntryBpnMapping(listOf(taskEntry), bpnRequestIdentifierRepository)
val businessPartnerDto = taskEntry.businessPartner

val legalEntityBpns = processLegalEntity(businessPartnerDto, taskEntryBpnMapping)
Expand All @@ -97,6 +100,7 @@ class TaskStepBuildService(
)
}

taskEntryBpnMapping.writeCreatedMappingsToDb(bpnRequestIdentifierRepository)
return TaskStepResultEntryDto(
taskId = taskEntry.taskId,
businessPartner = businessPartnerWithBpns,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table golden_record_tasks
(
id bigint not null,
uuid UUID not null,
created_at timestamp without time zone not null,
updated_at timestamp without time zone not null,
task_id varchar(255) not null,
is_resolved boolean not null,
last_checked timestamp without time zone not null,
primary key (id)
);
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ import java.util.*
)
@ActiveProfiles("test-no-auth")
@ContextConfiguration(initializers = [PostgreSQLContextInitializer::class])
class TaskStepFetchAndReserveServiceTest @Autowired constructor(
val cleaningStepService: TaskStepFetchAndReserveService,
class TaskResolutionServiceTest @Autowired constructor(
val cleaningStepService: TaskResolutionService,
val bpnRequestIdentifierRepository: BpnRequestIdentifierRepository,
val poolClient: PoolApiClient,
val dbTestHelpers: DbTestHelpers,
Expand Down

0 comments on commit d109108

Please sign in to comment.