Skip to content

Commit

Permalink
Merge pull request #1192 from eclipse-tractusx/fix/gate-outdated-task
Browse files Browse the repository at this point in the history
fix(bpdm-gate): update outdated task result to latest stage
  • Loading branch information
SujitMBRDI authored Jan 29, 2025
2 parents c18042c + 96f2e5f commit e2f4ec3
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 176 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ For changes to the BPDM Helm charts please consult the [changelog](charts/bpdm/C
- BPDM Gate: Fetched and updated legal name of legal entity from pool while performing partner upload process via CSV([#1141](https://github.com/eclipse-tractusx/bpdm/issues/1141))
- BPDM Orchestrator: When trying to resolve tasks for a step that have has been resolved before, the request is ignored. A HTTP OK instead of a BadRequest will be returned ([#1092](https://github.com/eclipse-tractusx/bpdm/issues/1092))
- BPDM Pool: Remove duplicate BPN request identifier mapping entries and prevent the creation of new duplicates ([#1159](https://github.com/eclipse-tractusx/bpdm/issues/1159))
- BPDM Gate: Fix the Gate not updating business partner output data when it receives already outdated output ([#1185](https://github.com/eclipse-tractusx/bpdm/issues/1185))

## [6.2.0] - 2024-11-28

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.eclipse.tractusx.bpdm.common.dto.IBaseStateDto
import org.eclipse.tractusx.bpdm.common.dto.PaginationRequest
import org.eclipse.tractusx.bpdm.common.exception.BpdmNullMappingException
import org.eclipse.tractusx.bpdm.common.model.StageType
import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType
import org.eclipse.tractusx.bpdm.gate.config.GoldenRecordTaskConfigProperties
import org.eclipse.tractusx.bpdm.gate.entity.*
import org.eclipse.tractusx.bpdm.gate.entity.generic.*
Expand Down Expand Up @@ -106,14 +107,22 @@ class GoldenRecordUpdateChunkService(
return UpdateStats(poolChangelogEntries.content.size, updatedLegalEntities, updatedSites, updatedAddresses)
}


fun updateAgainstPool(businessPartners: List<BusinessPartnerDb>){
updateLegalEntitiesByReference(businessPartners).size
updateSitesByReference(businessPartners.filter { it.bpnS != null }).size
updateAddressesByReference(businessPartners).size
}

private fun updateLegalEntities(changedBpnLs: Collection<String>): List<BusinessPartnerService.UpsertResult> {
val businessPartnersToUpdate = businessPartnerRepository.findByStageAndBpnLIn(StageType.Output, changedBpnLs)
val referencingBusinessPartners = businessPartnerRepository.findByStageAndBpnLIn(StageType.Output, changedBpnLs)
val businessPartnersToUpdate = referencingBusinessPartners.filter { it.sharingState.sharingStateType == SharingStateType.Success }
return updateLegalEntitiesByReference(businessPartnersToUpdate)
}

val bpnLsToQuery = businessPartnersToUpdate.mapNotNull { it.bpnL }
private fun updateLegalEntitiesByReference(businessPartnersToUpdate: Collection<BusinessPartnerDb>): List<BusinessPartnerService.UpsertResult> {
val bpnLsToQuery = businessPartnersToUpdate.mapNotNull { it.bpnL }.toSet()

val searchRequest = LegalEntitySearchRequest(bpnLs = bpnLsToQuery)
val searchRequest = LegalEntitySearchRequest(bpnLs = bpnLsToQuery.toList())
val legalEntities = if(searchRequest.bpnLs.isNotEmpty())
poolClient.legalEntities.getLegalEntities(searchRequest, PaginationRequest(size = searchRequest.bpnLs.size)).content.map { it.legalEntity }
else
Expand All @@ -129,12 +138,16 @@ class GoldenRecordUpdateChunkService(
}

private fun updateSites(changedSiteBpns: Collection<String>): List<BusinessPartnerService.UpsertResult> {
val businessPartnersToUpdate = businessPartnerRepository.findByStageAndBpnSIn(StageType.Output, changedSiteBpns)
val referencingBusinessPartners = businessPartnerRepository.findByStageAndBpnSIn(StageType.Output, changedSiteBpns)
val businessPartnersToUpdate = referencingBusinessPartners.filter { it.sharingState.sharingStateType == SharingStateType.Success }
return updateSitesByReference(businessPartnersToUpdate)
}

private fun updateSitesByReference(businessPartnersToUpdate: Collection<BusinessPartnerDb>): List<BusinessPartnerService.UpsertResult> {
logger.debug { "Found ${businessPartnersToUpdate.size} business partners with matching BPNS to update." }

val siteBpnsToQuery = businessPartnersToUpdate.mapNotNull { it.bpnS }
val searchRequest = SiteSearchRequest(siteBpns = siteBpnsToQuery)
val siteBpnsToQuery = businessPartnersToUpdate.mapNotNull { it.bpnS }.toSet()
val searchRequest = SiteSearchRequest(siteBpns = siteBpnsToQuery.toList())
val sites = if(searchRequest.siteBpns.isNotEmpty())
poolClient.sites.getSites(searchRequest, PaginationRequest(size = searchRequest.siteBpns.size)).content.map { it.site }
else
Expand All @@ -151,13 +164,17 @@ class GoldenRecordUpdateChunkService(
}

private fun updateAddresses(changedAddressBpns: Collection<String>): List<BusinessPartnerService.UpsertResult> {
val businessPartnersToUpdate = businessPartnerRepository.findByStageAndBpnAIn(StageType.Output, changedAddressBpns)
val referencingBusinessPartners = businessPartnerRepository.findByStageAndBpnAIn(StageType.Output, changedAddressBpns)
val businessPartnersToUpdate = referencingBusinessPartners.filter { it.sharingState.sharingStateType == SharingStateType.Success }
return updateAddressesByReference(businessPartnersToUpdate)
}

private fun updateAddressesByReference(businessPartnersToUpdate: Collection<BusinessPartnerDb>): List<BusinessPartnerService.UpsertResult> {
logger.debug { "Found ${businessPartnersToUpdate.size} business partners with matching BPNA to update." }

val addressBpnsToQuery = businessPartnersToUpdate.mapNotNull { it.bpnA }
val addressBpnsToQuery = businessPartnersToUpdate.mapNotNull { it.bpnA }.toSet()

val searchRequest = AddressSearchRequest(addressBpns = addressBpnsToQuery)
val searchRequest = AddressSearchRequest(addressBpns = addressBpnsToQuery.toList())
val addresses = if(searchRequest.addressBpns.isNotEmpty())
poolClient.addresses.getAddresses(searchRequest, PaginationRequest(size = searchRequest.addressBpns.size)).content
else
Expand Down Expand Up @@ -218,7 +235,7 @@ class GoldenRecordUpdateChunkService(
alternativePostalAddress = postalAddress.alternativePostalAddress?.toUpsertData(),
legalEntityConfidence = legalEntityConfidence?.toUpsertData() ?: throw createMappingException(BusinessPartnerDb::legalEntityConfidence, id),
siteConfidence = siteConfidence?.toUpsertData(),
addressConfidence = addressConfidence?.toUpsertData() ?: throw createMappingException(BusinessPartnerDb::addressConfidence, id),
addressConfidence = addressConfidence?.toUpsertData() ?: throw createMappingException(BusinessPartnerDb::addressConfidence, id)
)
}

Expand Down Expand Up @@ -297,6 +314,7 @@ class GoldenRecordUpdateChunkService(
updateStates(businessPartner.states, legalEntity.states, BusinessPartnerType.LEGAL_ENTITY)
businessPartner.legalName = legalEntity.legalName
businessPartner.legalForm = legalEntity.legalForm
businessPartner.shortName = legalEntity.legalShortName
businessPartner.legalEntityConfidence?.let { update(it, legalEntity.confidenceCriteria) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class TaskResolutionChunkService(
private val businessPartnerService: BusinessPartnerService,
private val orchestratorMappings: OrchestratorMappings,
private val synchRecordService: SyncRecordService,
private val syncRecordRepository: SyncRecordRepository
private val syncRecordRepository: SyncRecordRepository,
private val goldenRecordUpdateService: GoldenRecordUpdateChunkService
) {

private val logger = KotlinLogging.logger { }
Expand Down Expand Up @@ -210,13 +211,15 @@ class TaskResolutionChunkService(
}

private fun resolveAsUpserts(requests: List<RequestCreationResult>) {
requests.forEach { sharingStateService.setSuccess(it.sharingState) }
businessPartnerService.upsertBusinessPartnersOutput(requests.map {
val upsertResults = businessPartnerService.upsertBusinessPartnersOutput(requests.map {
BusinessPartnerService.OutputUpsertRequest(
it.sharingState,
it.businessPartnerResult!!
)
})
// check against the Pool again in case the business partner data is already outdated
goldenRecordUpdateService.updateAgainstPool(upsertResults.map { it.businessPartner })
requests.forEach { sharingStateService.setSuccess(it.sharingState) }
}

private fun resolveAsErrors(errors: List<RequestCreationResult>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,85 +139,6 @@ class BusinessPartnerControllerAndSharingControllerIT @Autowired constructor(

}

@Test
fun `insert one business partners and finalize cleaning task without error`() {
this.mockAndAssertUtils.mockOrchestratorApiCleaned(gateWireMockServer)

// Expect outputBusinessPartner without identifiers as there are not Address identifier provided.
val outputBusinessPartners = listOf(
BusinessPartnerVerboseValues.bpOutputDtoCleaned.copy(identifiers = emptyList(), states = emptyList()),
)

val upsertRequests = listOf(
BusinessPartnerNonVerboseValues.bpInputRequestCleaned,
BusinessPartnerNonVerboseValues.bpInputRequestError
)
upsertBusinessPartnersAndShare(upsertRequests)

val externalId4 = BusinessPartnerNonVerboseValues.bpInputRequestCleaned.externalId
val externalId5 = BusinessPartnerNonVerboseValues.bpInputRequestError.externalId

val createdSharingState = listOf(
SharingStateDto(
externalId = externalId4,
sharingStateType = SharingStateType.Pending,
sharingErrorCode = null,
sharingErrorMessage = null,
sharingProcessStarted = null,
taskId = "0"
),
SharingStateDto(
externalId = externalId5,
sharingStateType = SharingStateType.Pending,
sharingErrorCode = null,
sharingErrorMessage = null,
sharingProcessStarted = null,
taskId = "1"
)
)

//Firstly verifies if the Sharing States was created for new Business Partners
val externalIds = listOf(externalId4, externalId5)
val upsertSharingStateResponses = this.mockAndAssertUtils.readSharingStates(externalIds)
assertHelpers
.assertRecursively(upsertSharingStateResponses)
.ignoringFieldsMatchingRegexes(".*${SharingStateDto::sharingProcessStarted.name}")
.isEqualTo(createdSharingState)

// Call Finish Cleaning Method
taskResolutionService.resolveTasks()

val cleanedSharingState = listOf(
SharingStateDto(
externalId = externalId4,
sharingStateType = SharingStateType.Success,
sharingErrorCode = null,
sharingErrorMessage = null,
sharingProcessStarted = null,
taskId = "0"
),
SharingStateDto(
externalId = externalId5,
sharingStateType = SharingStateType.Error,
sharingErrorCode = BusinessPartnerSharingError.SharingTimeout,
sharingErrorMessage = "Major Error // Minor Error",
sharingProcessStarted = null,
taskId = "1"
)
)

//Check for both Sharing State changes (Error and Success)
val readCleanedSharingState = this.mockAndAssertUtils.readSharingStates(externalIds)
assertHelpers.assertRecursively(readCleanedSharingState)
.ignoringFieldsMatchingRegexes(".*${SharingStateDto::sharingProcessStarted.name}")
.isEqualTo(cleanedSharingState)

//Assert that Cleaned Golden Record is persisted in the Output correctly
val searchResponsePage = gateClient.businessParters.getBusinessPartnersOutput(listOf(externalId4))
this.mockAndAssertUtils.assertUpsertOutputResponsesMatchRequests(searchResponsePage.content, outputBusinessPartners)

}

@Test
fun `insert one business partners but task is missing in orchestrator`() {
this.mockAndAssertUtils.mockOrchestratorApiCleaned(gateWireMockServer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,89 +301,6 @@ class BusinessPartnerControllerIT @Autowired constructor(
this.mockAndAssertUtils.assertUpsertResponsesMatchRequests(searchResponsePage.content, laterUpsertRequest)
}





@Test
fun `insert one business partners and finalize cleaning task without error`() {
this.mockAndAssertUtils.mockOrchestratorApiCleaned(gateWireMockServer)

// Expect outputBusinessPartner without identifiers and states as there are no Address identifier and states provided.
val outputBusinessPartners = listOf(
BusinessPartnerVerboseValues.bpOutputDtoCleaned.copy(identifiers = emptyList(), states = emptyList())
)

val upsertRequests = listOf(
BusinessPartnerNonVerboseValues.bpInputRequestCleaned,
BusinessPartnerNonVerboseValues.bpInputRequestError
)
upsertBusinessPartnersAndShare(upsertRequests)

val externalId4 = BusinessPartnerNonVerboseValues.bpInputRequestCleaned.externalId
val externalId5 = BusinessPartnerNonVerboseValues.bpInputRequestError.externalId

val createdSharingState = listOf(
SharingStateDto(
externalId = externalId4,
sharingStateType = SharingStateType.Pending,
sharingErrorCode = null,
sharingErrorMessage = null,
sharingProcessStarted = null,
taskId = "0"
),
SharingStateDto(
externalId = externalId5,
sharingStateType = SharingStateType.Pending,
sharingErrorCode = null,
sharingErrorMessage = null,
sharingProcessStarted = null,
taskId = "1"
)
)

//Firstly verifies if the Sharing States was created for new Business Partners
val externalIds = listOf(externalId4, externalId5)
val upsertSharingStateResponses = this.mockAndAssertUtils.readSharingStates(externalIds)
assertHelpers
.assertRecursively(upsertSharingStateResponses)
.ignoringFieldsMatchingRegexes(".*${SharingStateDto::sharingProcessStarted.name}")
.isEqualTo(createdSharingState)

// Call Finish Cleaning Method
taskResolutionService.resolveTasks()

val cleanedSharingState = listOf(
SharingStateDto(
externalId = externalId4,
sharingStateType = SharingStateType.Success,
sharingErrorCode = null,
sharingErrorMessage = null,
sharingProcessStarted = null,
taskId = "0"
),
SharingStateDto(
externalId = externalId5,
sharingStateType = SharingStateType.Error,
sharingErrorCode = BusinessPartnerSharingError.SharingTimeout,
sharingErrorMessage = "Major Error // Minor Error",
sharingProcessStarted = null,
taskId = "1"
)
)

//Check for both Sharing State changes (Error and Success)
val readCleanedSharingState = this.mockAndAssertUtils.readSharingStates(externalIds)
assertHelpers.assertRecursively(readCleanedSharingState)
.ignoringFieldsMatchingRegexes(".*${SharingStateDto::sharingProcessStarted.name}")
.isEqualTo(cleanedSharingState)

//Assert that Cleaned Golden Record is persisted in the Output correctly
val searchResponsePage = gateClient.businessParters.getBusinessPartnersOutput(listOf(externalId4))
this.mockAndAssertUtils.assertUpsertOutputResponsesMatchRequests(searchResponsePage.content, outputBusinessPartners)

}

@Test
fun `insert one business partners but task is missing in orchestrator`() {
this.mockAndAssertUtils.mockOrchestratorApiCleaned(gateWireMockServer)
Expand Down

0 comments on commit e2f4ec3

Please sign in to comment.