Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dmp 4646 implement batching arm rpo polling automated task #2511

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class ArmRpoPollServiceIntTest extends PostgresIntegrationBase {
private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity;
private String uniqueProductionName;
private final Duration pollDuration = Duration.ofHours(4);
private int batchSize = 10;


@Autowired
private ArmRpoPollServiceImpl armRpoPollService;
Expand Down Expand Up @@ -120,6 +122,8 @@ void pollArmRpo_shouldPollSuccessfully_WithSaveBackgroundCompleted() throws IOEx
armRpoExecutionDetailEntity.setProductionId(PRODUCTION_ID);
armRpoExecutionDetailEntity = dartsPersistence.save(armRpoExecutionDetailEntity);

batchSize = 5;

when(armApiService.getArmBearerToken()).thenReturn(BEARER_TOKEN);
when(armRpoClient.getExtendedSearchesByMatter(any(), any()))
.thenReturn(getExtendedSearchesByMatterResponse());
Expand All @@ -139,7 +143,7 @@ void pollArmRpo_shouldPollSuccessfully_WithSaveBackgroundCompleted() throws IOEx
.thenReturn(getRemoveProductionResponse());

// when
armRpoPollService.pollArmRpo(false, pollDuration);
armRpoPollService.pollArmRpo(false, pollDuration, batchSize);

// then
var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId());
Expand Down Expand Up @@ -188,7 +192,7 @@ void pollArmRpo_shouldPollSuccessfully_WithGetExtendedSearchesByMatterInProgress
.thenReturn(getRemoveProductionResponse());

// when
armRpoPollService.pollArmRpo(false, pollDuration);
armRpoPollService.pollArmRpo(false, pollDuration, batchSize);

// then
var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId());
Expand Down Expand Up @@ -228,7 +232,7 @@ void pollArmRpo_shouldPollSuccessfully_WithSaveBackgroundCompletedAndFinishWithC
.thenReturn(getCreateExportBasedOnSearchResultsTableResponseInProgress());

// when
armRpoPollService.pollArmRpo(false, pollDuration);
armRpoPollService.pollArmRpo(false, pollDuration, batchSize);

// then
var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId());
Expand Down Expand Up @@ -276,7 +280,7 @@ void pollArmRpo_shouldPollSuccessfully_WithGetExtendedProductionsByMatterInProgr
.thenReturn(getRemoveProductionResponse());

// when
armRpoPollService.pollArmRpo(false, pollDuration);
armRpoPollService.pollArmRpo(false, pollDuration, batchSize);

// then
var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId());
Expand Down Expand Up @@ -329,7 +333,7 @@ void pollArmRpo_shouldPollSuccessfully_WithGetProductionOutputFilesInProgress()
.thenReturn(getRemoveProductionResponse());

// when
armRpoPollService.pollArmRpo(false, pollDuration);
armRpoPollService.pollArmRpo(false, pollDuration, batchSize);

// then
var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId());
Expand Down Expand Up @@ -379,7 +383,7 @@ void pollArmRpo_shouldPollSuccessfully_WithFailedDownloadProductionIsManual() th
.thenReturn(getRemoveProductionResponse());

// when
armRpoPollService.pollArmRpo(true, pollDuration);
armRpoPollService.pollArmRpo(true, pollDuration, batchSize);

// then
var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ArmRpoServiceIntTest extends PostgresIntegrationBase {

private static final LocalDateTime HEARING_DATE = LocalDateTime.of(2023, 9, 26, 10, 0, 0);
private static final String PRODUCTION_ID = "ProductionId";
private static final int BATCH_SIZE = 10;

@MockitoBean
private UserIdentity userIdentity;
Expand Down Expand Up @@ -116,7 +117,7 @@ void reconcileArmRpoCsvData_Success() {
File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv");

// when
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file));
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE);

// then
List<ExternalObjectDirectoryEntity> foundMediaList1 = dartsDatabase.getExternalObjectDirectoryRepository()
Expand Down Expand Up @@ -161,7 +162,7 @@ void reconcileArmRpoCsvData_DoesNothing_WhenIngestionDateOutsideOfRange() {
File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv");

// when
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file));
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE);

// then
List<ExternalObjectDirectoryEntity> foundMediaList1 = dartsDatabase.getExternalObjectDirectoryRepository()
Expand Down Expand Up @@ -197,7 +198,7 @@ void reconcileArmRpoCsvData_UpdatesStatus_WhenCsvDataMatches() {
File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv");

// when
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file));
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE);

// then
List<ExternalObjectDirectoryEntity> foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository()
Expand Down Expand Up @@ -225,7 +226,7 @@ void reconcileArmRpoCsvData_UpdatesEodStatusToReplay_WhenCsvDataContainsEodsThat
File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvDataNotFoundEods.csv");

// when
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file));
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE);

// then
List<ExternalObjectDirectoryEntity> foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository()
Expand Down Expand Up @@ -253,7 +254,7 @@ void reconcileArmRpoCsvData_DoesNothing_WhenNoEodsInRange() {
File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvDataNotFoundEods.csv");

// when
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file));
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE);

// then
List<ExternalObjectDirectoryEntity> foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

public interface ArmRpoPollService {

void pollArmRpo(boolean isManualRun, Duration pollDuration);
void pollArmRpo(boolean isManualRun, Duration pollDuration, int batchSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ void updateArmRpoStatus(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity,

ArmRpoExecutionDetailEntity saveArmRpoExecutionDetailEntity(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity);

void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List<File> csvFiles);
void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List<File> csvFiles, int batchSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class ArmRpoPollServiceImpl implements ArmRpoPollService {
private List<Integer> allowableInProgressStates;

@Override
public void pollArmRpo(boolean isManualRun, Duration pollDuration) {
log.info("Polling ARM RPO service - isManualRun: {}", isManualRun);
public void pollArmRpo(boolean isManualRun, Duration pollDuration, int batchSize) {
log.info("Polling ARM RPO - isManualRun: {} poll duration: {}, batchSize: {}", isManualRun, pollDuration, batchSize);
setupFailedStatuses();
setupAllowableInProgressStates();
Integer executionId = null;
Expand Down Expand Up @@ -96,7 +96,8 @@ public void pollArmRpo(boolean isManualRun, Duration pollDuration) {
uniqueProductionName = armRpoExecutionDetailEntity.getProductionName();
}
if (createExportBasedOnSearchResultsTable) {
processProductions(bearerToken, executionId, uniqueProductionName, userAccount, armRpoExecutionDetailEntity);
processProductions(bearerToken, executionId, uniqueProductionName, userAccount, armRpoExecutionDetailEntity,
batchSize);
} else {
log.warn("ARM RPO Polling is still in-progress for createExportBasedOnSearchResultsTable");
}
Expand All @@ -120,7 +121,7 @@ boolean skipSteps(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) {
}

private void processProductions(String bearerToken, Integer executionId, String productionName, UserAccountEntity userAccount,
ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) throws IOException {
ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, int batchSize) throws IOException {
// step to call ARM RPO API to get the extended productions by matter
boolean getExtendedProductionsByMatter = armRpoApi.getExtendedProductionsByMatter(bearerToken, executionId, productionName, userAccount);
if (getExtendedProductionsByMatter) {
Expand All @@ -134,7 +135,7 @@ private void processProductions(String bearerToken, Integer executionId, String
// step to call ARM RPO API to remove the production
armRpoApi.removeProduction(bearerToken, executionId, userAccount);
log.debug("About to reconcile production files");
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, tempProductionFiles);
armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, tempProductionFiles, batchSize);
} else {
log.warn("No production export files found");
}
Expand Down Expand Up @@ -208,7 +209,7 @@ private String generateTempProductionExportFilename(String productionExportFileI

private ArmRpoExecutionDetailEntity getArmRpoExecutionDetailEntity(boolean isManualRun) {
var armRpoExecutionDetailEntity = armRpoService.getLatestArmRpoExecutionDetailEntity();
if (isNull(armRpoExecutionDetailEntity)) {
if (isNull(armRpoExecutionDetailEntity) || isNull(armRpoExecutionDetailEntity.getArmRpoState())) {
return null;
}

Expand All @@ -229,20 +230,17 @@ private ArmRpoExecutionDetailEntity getArmRpoExecutionDetailEntity(boolean isMan
}

private boolean pollServiceFailed(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) {
return nonNull(armRpoExecutionDetailEntity.getArmRpoState())
&& ArmRpoHelper.failedRpoStatus().getId().equals(armRpoExecutionDetailEntity.getArmRpoStatus().getId())
return ArmRpoHelper.failedRpoStatus().getId().equals(armRpoExecutionDetailEntity.getArmRpoStatus().getId())
&& allowableFailedStates.contains(armRpoExecutionDetailEntity.getArmRpoState().getId());
}

private boolean pollServiceInProgress(ArmRpoExecutionDetailEntity armRpoExecutionDetail) {
return nonNull(armRpoExecutionDetail.getArmRpoState())
&& ArmRpoHelper.inProgressRpoStatus().getId().equals(armRpoExecutionDetail.getArmRpoStatus().getId())
return ArmRpoHelper.inProgressRpoStatus().getId().equals(armRpoExecutionDetail.getArmRpoStatus().getId())
&& allowableInProgressStates.contains(armRpoExecutionDetail.getArmRpoState().getId());
}

private boolean saveBackgroundSearchCompleted(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) {
return nonNull(armRpoExecutionDetailEntity.getArmRpoState())
&& ArmRpoHelper.saveBackgroundSearchRpoState().getId().equals(armRpoExecutionDetailEntity.getArmRpoState().getId())
return ArmRpoHelper.saveBackgroundSearchRpoState().getId().equals(armRpoExecutionDetailEntity.getArmRpoState().getId())
&& ArmRpoHelper.completedRpoStatus().getId().equals(armRpoExecutionDetailEntity.getArmRpoStatus().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import uk.gov.hmcts.darts.arm.exception.ArmRpoException;
Expand Down Expand Up @@ -104,18 +107,53 @@ public ArmRpoExecutionDetailEntity saveArmRpoExecutionDetailEntity(ArmRpoExecuti
}

@Override
public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List<File> csvFiles) {
public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List<File> csvFiles, int batchSize) {
ObjectRecordStatusEntity armRpoPending = EodHelper.armRpoPendingStatus();
StringBuilder errorMessage = new StringBuilder("Failure during ARM RPO CSV Reconciliation: ");

ArmAutomatedTaskEntity armAutomatedTaskEntity = armAutomatedTaskRepository.findByAutomatedTask_taskName(ADD_ASYNC_SEARCH_RELATED_TASK_NAME)
.orElseThrow(() -> new ArmRpoException(errorMessage.append("Automated task ProcessE2EArmRpoPending not found.").toString()));

List<ExternalObjectDirectoryEntity> externalObjectDirectoryEntities = externalObjectDirectoryRepository.findByStatusAndIngestionDate(
armRpoPending,
armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()),
armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour()));
List<Integer> csvEodList = getEodsListFromCsvFiles(csvFiles, errorMessage);

List<ExternalObjectDirectoryEntity> externalObjectDirectoryEntities = new ArrayList<>();
Pageable pageRequest = PageRequest.of(0, batchSize);
Page<ExternalObjectDirectoryEntity> pages;

do {
pages
= externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging(
armRpoPending,
armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()),
armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour()),
pageRequest
);
log.info("Found number of elements {}, total elements {}, total pages {} for batch size {}",
pages.getNumberOfElements(), pages.getTotalElements(), pages.getTotalPages(), batchSize);
externalObjectDirectoryEntities.addAll(pages.getContent());
pageRequest = pageRequest.next();
} while (pages.hasNext());

externalObjectDirectoryEntities.forEach(
externalObjectDirectoryEntity -> {
if (csvEodList.contains(externalObjectDirectoryEntity.getId())) {
externalObjectDirectoryEntity.setStatus(EodHelper.storedStatus());
} else {
externalObjectDirectoryEntity.setStatus(EodHelper.armReplayStatus());
}
}
);

List<Integer> missingEods = csvEodList.stream()
.filter(csvEod -> externalObjectDirectoryEntities.stream().noneMatch(entity -> entity.getId().equals(csvEod)))
.collect(Collectors.toList());

log.warn("Unable to process the following EODs {} found in the CSV but not in filtered DB list", missingEods);

externalObjectDirectoryRepository.saveAllAndFlush(externalObjectDirectoryEntities);
}

private static List<Integer> getEodsListFromCsvFiles(List<File> csvFiles, StringBuilder errorMessage) {
List<Integer> csvEodList = new ArrayList<>();
Integer counter = 0;
for (File csvFile : csvFiles) {
Expand All @@ -142,24 +180,7 @@ public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDe
throw new ArmRpoException(errorMessage.toString());
}
}

externalObjectDirectoryEntities.forEach(
externalObjectDirectoryEntity -> {
if (csvEodList.contains(externalObjectDirectoryEntity.getId())) {
externalObjectDirectoryEntity.setStatus(EodHelper.storedStatus());
} else {
externalObjectDirectoryEntity.setStatus(EodHelper.armReplayStatus());
}
}
);

List<Integer> missingEods = csvEodList.stream()
.filter(csvEod -> externalObjectDirectoryEntities.stream().noneMatch(entity -> entity.getId().equals(csvEod)))
.collect(Collectors.toList());

log.warn("Unable to process the following EODs {} found in the CSV but not in filtered DB list", missingEods);

externalObjectDirectoryRepository.saveAllAndFlush(externalObjectDirectoryEntities);
return csvEodList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import jakarta.transaction.Transactional;
import org.springframework.data.domain.Limit;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
Expand Down Expand Up @@ -670,9 +671,10 @@ void updateEodStatusAndTransferAttemptsWhereIdIn(ObjectRecordStatusEntity newSta
AND eod.dataIngestionTs between :rpoCsvStartTime AND :rpoCsvEndTime
"""
)
List<ExternalObjectDirectoryEntity> findByStatusAndIngestionDate(ObjectRecordStatusEntity status,
OffsetDateTime rpoCsvStartTime,
OffsetDateTime rpoCsvEndTime);
Page<ExternalObjectDirectoryEntity> findByStatusAndIngestionDateTsWithPaging(ObjectRecordStatusEntity status,
OffsetDateTime rpoCsvStartTime,
OffsetDateTime rpoCsvEndTime,
Pageable pageable);

@Query("""
SELECT eod FROM ExternalObjectDirectoryEntity eod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
public class ArmRpoPollAutomatedTaskConfig extends AbstractAutomatedTaskConfig {

private Duration pollDuration;

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public AutomatedTaskName getAutomatedTaskName() {

@Override
protected void runTask() {
armRpoPollService.pollArmRpo(isManualRun(), getConfig().getPollDuration());
armRpoPollService.pollArmRpo(isManualRun(), getConfig().getPollDuration(), getAutomatedTaskBatchSize());
}
}
Loading