Skip to content

Commit

Permalink
Merge pull request #1021 from eclipse-tractusx/perf/schedule-throughput
Browse files Browse the repository at this point in the history
Perf: Increase throughput for Golden Record Task Processing
  • Loading branch information
nicoprow authored Aug 6, 2024
2 parents ccea3cb + 35397ef commit 1568055
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import mu.KotlinLogging
import org.eclipse.tractusx.bpdm.cleaning.util.toUUID
import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.*
import org.eclipse.tractusx.orchestrator.api.model.BpnReferenceType
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.time.Instant
Expand Down Expand Up @@ -57,22 +56,23 @@ class CleaningServiceDummy(
try {
logger.info { "Starting polling for cleaning tasks from Orchestrator... TaskStep ${step.name}" }

val cleaningRequest = orchestrationApiClient.goldenRecordTasks
.reserveTasksForStep(TaskStepReservationRequest(amount = 10, step))
do{
val cleaningRequest = orchestrationApiClient.goldenRecordTasks
.reserveTasksForStep(TaskStepReservationRequest(amount = 10, step))

val cleaningTasks = cleaningRequest.reservedTasks
val cleaningTasks = cleaningRequest.reservedTasks

logger.info { "${cleaningTasks.size} tasks found for cleaning. Proceeding with cleaning..." }
logger.info { "${cleaningTasks.size} tasks found for cleaning. Proceeding with cleaning..." }

if (cleaningTasks.isNotEmpty()) {
val cleaningResults = cleaningTasks.map { reservedTask ->
processCleaningTask(reservedTask)
}

orchestrationApiClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step, cleaningResults))
logger.info { "Cleaning tasks processing completed for this iteration." }
}
if (cleaningTasks.isNotEmpty()) {
val cleaningResults = cleaningTasks.map { reservedTask ->
processCleaningTask(reservedTask)
}

orchestrationApiClient.goldenRecordTasks.resolveStepResults(TaskStepResultRequest(step, cleaningResults))
logger.info { "Cleaning tasks processing completed for this iteration." }
}
}while (cleaningRequest.reservedTasks.isNotEmpty())
} catch (e: Exception) {
logger.error(e) { "Error while processing cleaning task" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.github.tomakehurst.wiremock.admin.model.ServeEventQuery
import com.github.tomakehurst.wiremock.client.WireMock.*
import com.github.tomakehurst.wiremock.core.WireMockConfiguration
import com.github.tomakehurst.wiremock.junit5.WireMockExtension
import com.github.tomakehurst.wiremock.stubbing.Scenario
import com.github.tomakehurst.wiremock.stubbing.StubMapping
import org.assertj.core.api.Assertions.assertThat
import org.eclipse.tractusx.bpdm.cleaning.config.OrchestratorConfigProperties
Expand Down Expand Up @@ -58,6 +59,9 @@ class CleaningServiceApiCallsTest @Autowired constructor(
const val ORCHESTRATOR_RESERVE_TASKS_URL = "${GoldenRecordTaskApi.TASKS_PATH}/step-reservations"
const val ORCHESTRATOR_RESOLVE_TASKS_URL = "${GoldenRecordTaskApi.TASKS_PATH}/step-results"

const val RESERVATION_SCENARIO = "Reservation"
const val RESERVED_STATE = "Reserved"

@JvmField
@RegisterExtension
val orchestratorMockApi: WireMockExtension = WireMockExtension.newInstance()
Expand All @@ -80,6 +84,13 @@ class CleaningServiceApiCallsTest @Autowired constructor(
@BeforeEach
fun beforeEach() {
orchestratorMockApi.resetAll()

orchestratorMockApi.stubFor(
post(urlPathEqualTo(ORCHESTRATOR_RESERVE_TASKS_URL))
.inScenario(RESERVATION_SCENARIO)
.whenScenarioStateIs(RESERVED_STATE)
.willReturn(okJson(jacksonObjectMapper.writeValueAsString(TaskStepReservationResponse(emptyList(), Instant.now()))))
)
}

@Test
Expand Down Expand Up @@ -358,6 +369,9 @@ class CleaningServiceApiCallsTest @Autowired constructor(
// Orchestrator reserve
return orchestratorMockApi.stubFor(
post(urlPathEqualTo(ORCHESTRATOR_RESERVE_TASKS_URL))
.inScenario(RESERVATION_SCENARIO)
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo(RESERVED_STATE)
.willReturn(
okJson(jacksonObjectMapper.writeValueAsString(createSampleTaskStepReservationResponse(businessPartner)))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.eclipse.tractusx.bpdm.gate.config

import jakarta.annotation.PostConstruct
import org.eclipse.tractusx.bpdm.gate.service.GoldenRecordUpdateService
import org.eclipse.tractusx.bpdm.gate.service.TaskCreationService
import org.eclipse.tractusx.bpdm.gate.service.TaskResolutionService
import org.eclipse.tractusx.bpdm.gate.service.GoldenRecordUpdateBatchService
import org.eclipse.tractusx.bpdm.gate.service.TaskCreationBatchService
import org.eclipse.tractusx.bpdm.gate.service.TaskResolutionBatchService
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.TaskScheduler
import org.springframework.scheduling.support.CronTrigger
Expand All @@ -31,9 +31,9 @@ import org.springframework.scheduling.support.CronTrigger
class GoldenRecordTaskConfiguration(
private val configProperties: GoldenRecordTaskConfigProperties,
private val taskScheduler: TaskScheduler,
private val taskCreationService: TaskCreationService,
private val taskResolutionService: TaskResolutionService,
private val updateService: GoldenRecordUpdateService
private val taskCreationService: TaskCreationBatchService,
private val taskResolutionService: TaskResolutionBatchService,
private val updateService: GoldenRecordUpdateBatchService
) {

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,31 @@ import org.springframework.transaction.annotation.Transactional
import kotlin.reflect.KProperty

@Service
class GoldenRecordUpdateService(
class GoldenRecordUpdateBatchService(
private val goldenRecordUpdateService: GoldenRecordUpdateChunkService
){
private val logger = KotlinLogging.logger { }

fun updateOutputOnGoldenRecordChange(){
logger.info { "Update Business Partner Output based on Golden Record Updates from the Pool..." }

var totalLegalEntitiesUpdated = 0
var totalSitesUpdated = 0
var totalAddressesUpdated = 0
do {
val stats = goldenRecordUpdateService.updateFromNextChunk()

totalLegalEntitiesUpdated += stats.updatedLegalEntities
totalSitesUpdated += stats.updatedSites
totalAddressesUpdated += stats.updatedAddresses
}while (stats.foundChangelogEntries != 0)

logger.debug { "In total updated '$totalLegalEntitiesUpdated' legal entities, '$totalSitesUpdated' sites and '$totalAddressesUpdated' addresses." }
}
}

@Service
class GoldenRecordUpdateChunkService(
private val poolClient: PoolApiClient,
private val syncRecordService: SyncRecordService,
private val taskConfigProperties: GoldenRecordTaskConfigProperties,
Expand All @@ -54,8 +78,8 @@ class GoldenRecordUpdateService(
private val logger = KotlinLogging.logger { }

@Transactional
fun updateOutputOnGoldenRecordChange(){
logger.info { "Update Business Partner Output based on Golden Record Updates from the Pool..." }
fun updateFromNextChunk(): UpdateStats{
logger.info { "Update next chunk of Business Partner Output based on Golden Record Updates from the Pool..." }

val syncRecord = syncRecordService.setSynchronizationStart(SyncTypeDb.POOL_TO_GATE_OUTPUT)

Expand All @@ -77,8 +101,12 @@ class GoldenRecordUpdateService(
syncRecordService.setSynchronizationSuccess(SyncTypeDb.POOL_TO_GATE_OUTPUT)

logger.debug { "Updated '$updatedLegalEntities' legal entities, '$updatedSites' sites and '$updatedAddresses' addresses." }

return UpdateStats(poolChangelogEntries.content.size, updatedLegalEntities, updatedSites, updatedAddresses)
}



private fun updateLegalEntities(changedBpnLs: Collection<String>): List<BusinessPartnerService.UpsertResult> {
val businessPartnersToUpdate = businessPartnerRepository.findByStageAndBpnLIn(StageType.Output, changedBpnLs)

Expand Down Expand Up @@ -379,4 +407,10 @@ class GoldenRecordUpdateService(
return BpdmNullMappingException(BusinessPartnerDb::class, OutputUpsertData::class, property, entityId.toString())
}

data class UpdateStats(
val foundChangelogEntries: Int,
val updatedLegalEntities: Int,
val updatedSites: Int,
val updatedAddresses: Int
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,28 @@ import org.springframework.transaction.annotation.Transactional
import java.util.*

@Service
class TaskCreationService(
class TaskCreationBatchService(
private val taskCreationService: TaskCreationChunkService
){
private val logger = KotlinLogging.logger { }


fun createTasksForReadyBusinessPartners(){
logger.info { "Started scheduled task to create golden record tasks from ready business partners" }

var totalCreatedTasks = 0
do {
val createdTasks = taskCreationService.createTasksForReadyBusinessPartners()
totalCreatedTasks += createdTasks
}while (createdTasks != 0)

logger.debug { "Total created $totalCreatedTasks new golden record tasks from ready business partners" }

}
}

@Service
class TaskCreationChunkService(
private val sharingStateRepository: SharingStateRepository,
private val sharingStateService: SharingStateService,
private val businessPartnerRepository: BusinessPartnerRepository,
Expand All @@ -47,8 +68,8 @@ class TaskCreationService(
private val logger = KotlinLogging.logger { }

@Transactional
fun createTasksForReadyBusinessPartners() {
logger.info { "Started scheduled task to create golden record tasks from ready business partners" }
fun createTasksForReadyBusinessPartners(): Int {
logger.info { "Create next chunk of golden record tasks from ready business partners" }

val pageRequest = Pageable.ofSize(properties.creation.fromSharingMember.batchSize)
val foundStates = sharingStateRepository.findBySharingStateType(SharingStateType.Ready, pageRequest).content
Expand All @@ -64,7 +85,7 @@ class TaskCreationService(
sharingStateService.setPending(partner.sharingState, task.taskId)
}

logger.info { "Created ${createdTasks.size} new golden record tasks from ready business partners" }
return createdTasks.size
}

private fun createGoldenRecordTasks(mode: TaskMode, orchestratorBusinessPartnersDto: List<TaskCreateRequestEntry>): List<TaskClientStateDto> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,44 @@ import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.ResultState
import org.eclipse.tractusx.orchestrator.api.model.TaskClientStateDto
import org.eclipse.tractusx.orchestrator.api.model.TaskStateRequest
import org.springframework.data.domain.Pageable
import org.springframework.data.domain.PageRequest
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional

@Service
class TaskResolutionService(
class TaskResolutionBatchService(
private val taskResolutionService: TaskResolutionChunkService
){
private val logger = KotlinLogging.logger { }

fun resolveTasks(){
logger.info { "Start batch process for resolving pending tasks..." }

var pageToQuery = 0
var totalSuccesses = 0
var totalErrors = 0
var totalUnresolved = 0
do{

val stats = taskResolutionService.resolveTasks(pageToQuery)

if(stats.unresolved == stats.foundTasks)
pageToQuery++
else
pageToQuery = 0

totalSuccesses += stats.resolvedAsSuccess
totalErrors += stats.resolvedAsError
totalUnresolved += stats.unresolved

}while (stats.foundTasks != 0)

logger.debug { "Total Resolved $totalSuccesses tasks as successful, $totalErrors as errors and $totalUnresolved still unresolved" }
}
}

@Service
class TaskResolutionChunkService(
private val sharingStateRepository: SharingStateRepository,
private val sharingStateService: SharingStateService,
private val businessPartnerRepository: BusinessPartnerRepository,
Expand All @@ -49,8 +82,11 @@ class TaskResolutionService(

private val logger = KotlinLogging.logger { }

fun resolveTasks() {
val pageRequest = Pageable.ofSize(taskProperties.check.batchSize)
@Transactional
fun resolveTasks(pageToQuery: Int): ResolutionStats {
logger.info { "Start next chunk for resolving pending tasks..." }

val pageRequest = PageRequest.of(pageToQuery, taskProperties.check.batchSize)
val sharingStates = sharingStateRepository.findBySharingStateTypeAndTaskIdNotNull(SharingStateType.Pending, pageRequest).content

val tasks = orchestrationApiClient.goldenRecordTasks.searchTaskStates(TaskStateRequest(sharingStates.map { it.taskId!! })).tasks
Expand All @@ -70,7 +106,9 @@ class TaskResolutionService(
resolveAsUpserts(successes)
resolveAsErrors(errors)

logger.info { "Resolved ${successes.size} tasks as successful, ${errors.size} as errors and ${unresolved.size} still unresolved" }
logger.debug { "Resolved ${successes.size} tasks as successful, ${errors.size} as errors and ${unresolved.size} still unresolved" }

return ResolutionStats(successes.size, errors.size, unresolved.size)
}

private fun tryCreateUpsertRequest(sharingState: SharingStateDb, task: TaskClientStateDto?, input: BusinessPartnerDb?): RequestCreationResult {
Expand Down Expand Up @@ -138,4 +176,12 @@ class TaskResolutionService(
}
}

data class ResolutionStats(
val resolvedAsSuccess: Int,
val resolvedAsError: Int,
val unresolved: Int
){
val foundTasks = resolvedAsSuccess + resolvedAsError + unresolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.eclipse.tractusx.bpdm.gate.api.model.SharingStateType
import org.eclipse.tractusx.bpdm.gate.api.model.request.BusinessPartnerInputRequest
import org.eclipse.tractusx.bpdm.gate.api.model.request.PostSharingStateReadyRequest
import org.eclipse.tractusx.bpdm.gate.api.model.response.SharingStateDto
import org.eclipse.tractusx.bpdm.gate.service.TaskCreationService
import org.eclipse.tractusx.bpdm.gate.service.TaskResolutionService
import org.eclipse.tractusx.bpdm.gate.service.TaskCreationChunkService
import org.eclipse.tractusx.bpdm.gate.service.TaskResolutionChunkService
import org.eclipse.tractusx.bpdm.gate.util.MockAndAssertUtils
import org.eclipse.tractusx.bpdm.test.containers.PostgreSQLContextInitializer
import org.eclipse.tractusx.bpdm.test.testdata.gate.BusinessPartnerNonVerboseValues
Expand Down Expand Up @@ -57,8 +57,8 @@ class BusinessPartnerControllerAndSharingControllerIT @Autowired constructor(
val testHelpers: DbTestHelpers,
val assertHelpers: AssertHelpers,
val gateClient: GateClient,
val taskCreationService: TaskCreationService,
val taskResolutionService: TaskResolutionService,
val taskCreationService: TaskCreationChunkService,
val taskResolutionService: TaskResolutionChunkService,
val mockAndAssertUtils: MockAndAssertUtils
) {

Expand All @@ -85,7 +85,7 @@ class BusinessPartnerControllerAndSharingControllerIT @Autowired constructor(
@BeforeEach
fun beforeEach() {
testHelpers.truncateDbTables()
gateWireMockServer.resetAll();
gateWireMockServer.resetAll()
poolWireMockServer.resetAll()
this.mockAndAssertUtils.mockOrchestratorApi(gateWireMockServer)
}
Expand Down Expand Up @@ -183,7 +183,7 @@ class BusinessPartnerControllerAndSharingControllerIT @Autowired constructor(
.isEqualTo(createdSharingState)

// Call Finish Cleaning Method
taskResolutionService.resolveTasks()
taskResolutionService.resolveTasks(0)

val cleanedSharingState = listOf(
SharingStateDto(
Expand Down Expand Up @@ -248,7 +248,7 @@ class BusinessPartnerControllerAndSharingControllerIT @Autowired constructor(
.isEqualTo(createdSharingState)

// Call Finish Cleaning Method
taskResolutionService.resolveTasks()
taskResolutionService.resolveTasks(0)

val cleanedSharingState = listOf(
SharingStateDto(
Expand Down
Loading

0 comments on commit 1568055

Please sign in to comment.