From fd438df6c1f0f2a775283111935908948807029c Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Sun, 2 Feb 2025 11:50:06 +0000 Subject: [PATCH 1/7] DMP-4646 Implement batching in ArmRpoPolling automated task Started to add batchsize to ArmRpoPolling automated task --- .../arm/service/ArmRpoPollServiceIntTest.java | 16 ++++--- .../arm/service/ArmRpoServiceIntTest.java | 11 ++--- .../darts/arm/service/ArmRpoPollService.java | 2 +- .../darts/arm/service/ArmRpoService.java | 2 +- .../service/impl/ArmRpoPollServiceImpl.java | 20 ++++----- .../arm/service/impl/ArmRpoServiceImpl.java | 44 +++++++++++-------- .../impl/ArmRpoPollingAutomatedTask.java | 2 +- .../impl/ArmRpoPollServiceImplTest.java | 31 ++++++------- .../service/impl/ArmRpoServiceImplTest.java | 5 ++- .../impl/ArmRpoPollAutomatedTaskTest.java | 2 +- 10 files changed, 73 insertions(+), 62 deletions(-) diff --git a/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollServiceIntTest.java b/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollServiceIntTest.java index 33e32a65ae..f379a32e51 100644 --- a/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollServiceIntTest.java +++ b/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollServiceIntTest.java @@ -90,6 +90,8 @@ class ArmRpoPollServiceIntTest extends PostgresIntegrationBase { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private String uniqueProductionName; private final Duration pollDuration = Duration.ofHours(4); + private int batchSize = 10; + @Autowired private ArmRpoPollServiceImpl armRpoPollService; @@ -120,6 +122,8 @@ void pollArmRpo_shouldPollSuccessfully_WithSaveBackgroundCompleted() throws IOEx armRpoExecutionDetailEntity.setProductionId(PRODUCTION_ID); armRpoExecutionDetailEntity = dartsPersistence.save(armRpoExecutionDetailEntity); + batchSize = 5; + when(armApiService.getArmBearerToken()).thenReturn(BEARER_TOKEN); when(armRpoClient.getExtendedSearchesByMatter(any(), any())) .thenReturn(getExtendedSearchesByMatterResponse()); @@ -139,7 +143,7 @@ void pollArmRpo_shouldPollSuccessfully_WithSaveBackgroundCompleted() throws IOEx .thenReturn(getRemoveProductionResponse()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId()); @@ -188,7 +192,7 @@ void pollArmRpo_shouldPollSuccessfully_WithGetExtendedSearchesByMatterInProgress .thenReturn(getRemoveProductionResponse()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId()); @@ -228,7 +232,7 @@ void pollArmRpo_shouldPollSuccessfully_WithSaveBackgroundCompletedAndFinishWithC .thenReturn(getCreateExportBasedOnSearchResultsTableResponseInProgress()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId()); @@ -276,7 +280,7 @@ void pollArmRpo_shouldPollSuccessfully_WithGetExtendedProductionsByMatterInProgr .thenReturn(getRemoveProductionResponse()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId()); @@ -329,7 +333,7 @@ void pollArmRpo_shouldPollSuccessfully_WithGetProductionOutputFilesInProgress() .thenReturn(getRemoveProductionResponse()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId()); @@ -379,7 +383,7 @@ void pollArmRpo_shouldPollSuccessfully_WithFailedDownloadProductionIsManual() th .thenReturn(getRemoveProductionResponse()); // when - armRpoPollService.pollArmRpo(true, pollDuration); + armRpoPollService.pollArmRpo(true, pollDuration, batchSize); // then var updatedArmRpoExecutionDetailEntity = dartsPersistence.getArmRpoExecutionDetailRepository().findById(armRpoExecutionDetailEntity.getId()); diff --git a/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java b/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java index 3db53f4cdc..408bfa9782 100644 --- a/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java +++ b/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java @@ -46,6 +46,7 @@ class ArmRpoServiceIntTest extends PostgresIntegrationBase { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private MediaEntity media1; private MediaEntity media2; + private int batchSize = 10; @BeforeEach void setUp() { @@ -116,7 +117,7 @@ void reconcileArmRpoCsvData_Success() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); // then List foundMediaList1 = dartsDatabase.getExternalObjectDirectoryRepository() @@ -161,7 +162,7 @@ void reconcileArmRpoCsvData_DoesNothing_WhenIngestionDateOutsideOfRange() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); // then List foundMediaList1 = dartsDatabase.getExternalObjectDirectoryRepository() @@ -197,7 +198,7 @@ void reconcileArmRpoCsvData_UpdatesStatus_WhenCsvDataMatches() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); // then List foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository() @@ -225,7 +226,7 @@ void reconcileArmRpoCsvData_UpdatesEodStatusToReplay_WhenCsvDataContainsEodsThat File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvDataNotFoundEods.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); // then List foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository() @@ -253,7 +254,7 @@ void reconcileArmRpoCsvData_DoesNothing_WhenNoEodsInRange() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvDataNotFoundEods.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); // then List foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository() diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollService.java b/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollService.java index c8e086f0fa..55c544bf08 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollService.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoPollService.java @@ -4,6 +4,6 @@ public interface ArmRpoPollService { - void pollArmRpo(boolean isManualRun, Duration pollDuration); + void pollArmRpo(boolean isManualRun, Duration pollDuration, int batchSize); } diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoService.java b/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoService.java index 897b81bcbe..cde95e2177 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoService.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/ArmRpoService.java @@ -26,6 +26,6 @@ void updateArmRpoStatus(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, ArmRpoExecutionDetailEntity saveArmRpoExecutionDetailEntity(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity); - void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List csvFiles); + void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List csvFiles, int batchSize); } diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java index 8b2a2395a2..bb1508d4da 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java @@ -50,7 +50,7 @@ public class ArmRpoPollServiceImpl implements ArmRpoPollService { private List allowableInProgressStates; @Override - public void pollArmRpo(boolean isManualRun, Duration pollDuration) { + public void pollArmRpo(boolean isManualRun, Duration pollDuration, int batchSize) { log.info("Polling ARM RPO service - isManualRun: {}", isManualRun); setupFailedStatuses(); setupAllowableInProgressStates(); @@ -96,7 +96,8 @@ public void pollArmRpo(boolean isManualRun, Duration pollDuration) { uniqueProductionName = armRpoExecutionDetailEntity.getProductionName(); } if (createExportBasedOnSearchResultsTable) { - processProductions(bearerToken, executionId, uniqueProductionName, userAccount, armRpoExecutionDetailEntity); + processProductions(bearerToken, executionId, uniqueProductionName, userAccount, armRpoExecutionDetailEntity, + batchSize); } else { log.warn("ARM RPO Polling is still in-progress for createExportBasedOnSearchResultsTable"); } @@ -120,7 +121,7 @@ boolean skipSteps(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) { } private void processProductions(String bearerToken, Integer executionId, String productionName, UserAccountEntity userAccount, - ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) throws IOException { + ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, int batchSize) throws IOException { // step to call ARM RPO API to get the extended productions by matter boolean getExtendedProductionsByMatter = armRpoApi.getExtendedProductionsByMatter(bearerToken, executionId, productionName, userAccount); if (getExtendedProductionsByMatter) { @@ -134,7 +135,7 @@ private void processProductions(String bearerToken, Integer executionId, String // step to call ARM RPO API to remove the production armRpoApi.removeProduction(bearerToken, executionId, userAccount); log.debug("About to reconcile production files"); - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, tempProductionFiles); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, tempProductionFiles, batchSize); } else { log.warn("No production export files found"); } @@ -208,7 +209,7 @@ private String generateTempProductionExportFilename(String productionExportFileI private ArmRpoExecutionDetailEntity getArmRpoExecutionDetailEntity(boolean isManualRun) { var armRpoExecutionDetailEntity = armRpoService.getLatestArmRpoExecutionDetailEntity(); - if (isNull(armRpoExecutionDetailEntity)) { + if (isNull(armRpoExecutionDetailEntity) || isNull(armRpoExecutionDetailEntity.getArmRpoState())) { return null; } @@ -229,20 +230,17 @@ private ArmRpoExecutionDetailEntity getArmRpoExecutionDetailEntity(boolean isMan } private boolean pollServiceFailed(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) { - return nonNull(armRpoExecutionDetailEntity.getArmRpoState()) - && ArmRpoHelper.failedRpoStatus().getId().equals(armRpoExecutionDetailEntity.getArmRpoStatus().getId()) + return ArmRpoHelper.failedRpoStatus().getId().equals(armRpoExecutionDetailEntity.getArmRpoStatus().getId()) && allowableFailedStates.contains(armRpoExecutionDetailEntity.getArmRpoState().getId()); } private boolean pollServiceInProgress(ArmRpoExecutionDetailEntity armRpoExecutionDetail) { - return nonNull(armRpoExecutionDetail.getArmRpoState()) - && ArmRpoHelper.inProgressRpoStatus().getId().equals(armRpoExecutionDetail.getArmRpoStatus().getId()) + return ArmRpoHelper.inProgressRpoStatus().getId().equals(armRpoExecutionDetail.getArmRpoStatus().getId()) && allowableInProgressStates.contains(armRpoExecutionDetail.getArmRpoState().getId()); } private boolean saveBackgroundSearchCompleted(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity) { - return nonNull(armRpoExecutionDetailEntity.getArmRpoState()) - && ArmRpoHelper.saveBackgroundSearchRpoState().getId().equals(armRpoExecutionDetailEntity.getArmRpoState().getId()) + return ArmRpoHelper.saveBackgroundSearchRpoState().getId().equals(armRpoExecutionDetailEntity.getArmRpoState().getId()) && ArmRpoHelper.completedRpoStatus().getId().equals(armRpoExecutionDetailEntity.getArmRpoStatus().getId()); } } diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java index ece39880a8..da8863d5fc 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java @@ -104,7 +104,8 @@ public ArmRpoExecutionDetailEntity saveArmRpoExecutionDetailEntity(ArmRpoExecuti } @Override - public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List csvFiles) { + public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List csvFiles, + int batchSize) { ObjectRecordStatusEntity armRpoPending = EodHelper.armRpoPendingStatus(); StringBuilder errorMessage = new StringBuilder("Failure during ARM RPO CSV Reconciliation: "); @@ -116,6 +117,28 @@ public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDe armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()), armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour())); + List csvEodList = getEodsListFromCsvFiles(csvFiles, errorMessage); + + externalObjectDirectoryEntities.forEach( + externalObjectDirectoryEntity -> { + if (csvEodList.contains(externalObjectDirectoryEntity.getId())) { + externalObjectDirectoryEntity.setStatus(EodHelper.storedStatus()); + } else { + externalObjectDirectoryEntity.setStatus(EodHelper.armReplayStatus()); + } + } + ); + + List missingEods = csvEodList.stream() + .filter(csvEod -> externalObjectDirectoryEntities.stream().noneMatch(entity -> entity.getId().equals(csvEod))) + .collect(Collectors.toList()); + + log.warn("Unable to process the following EODs {} found in the CSV but not in filtered DB list", missingEods); + + externalObjectDirectoryRepository.saveAllAndFlush(externalObjectDirectoryEntities); + } + + private static List getEodsListFromCsvFiles(List csvFiles, StringBuilder errorMessage) { List csvEodList = new ArrayList<>(); Integer counter = 0; for (File csvFile : csvFiles) { @@ -142,24 +165,7 @@ public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDe throw new ArmRpoException(errorMessage.toString()); } } - - externalObjectDirectoryEntities.forEach( - externalObjectDirectoryEntity -> { - if (csvEodList.contains(externalObjectDirectoryEntity.getId())) { - externalObjectDirectoryEntity.setStatus(EodHelper.storedStatus()); - } else { - externalObjectDirectoryEntity.setStatus(EodHelper.armReplayStatus()); - } - } - ); - - List missingEods = csvEodList.stream() - .filter(csvEod -> externalObjectDirectoryEntities.stream().noneMatch(entity -> entity.getId().equals(csvEod))) - .collect(Collectors.toList()); - - log.warn("Unable to process the following EODs {} found in the CSV but not in filtered DB list", missingEods); - - externalObjectDirectoryRepository.saveAllAndFlush(externalObjectDirectoryEntities); + return csvEodList; } } diff --git a/src/main/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollingAutomatedTask.java b/src/main/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollingAutomatedTask.java index bb5a7149e3..bb6df8d7c8 100644 --- a/src/main/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollingAutomatedTask.java +++ b/src/main/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollingAutomatedTask.java @@ -37,6 +37,6 @@ public AutomatedTaskName getAutomatedTaskName() { @Override protected void runTask() { - armRpoPollService.pollArmRpo(isManualRun(), getConfig().getPollDuration()); + armRpoPollService.pollArmRpo(isManualRun(), getConfig().getPollDuration(), getAutomatedTaskBatchSize()); } } diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java index 7d066dbc11..1cb3d77607 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java @@ -83,6 +83,7 @@ class ArmRpoPollServiceImplTest { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private static final ArmRpoHelperMocks ARM_RPO_HELPER_MOCKS = new ArmRpoHelperMocks(); private final Duration pollDuration = Duration.ofHours(4); + private int batchSize = 10; private ArmRpoPollServiceImpl armRpoPollService; @@ -125,7 +126,7 @@ void pollArmRpo_shouldPollSuccessfully_whenSaveBackgroundCompleted() throws IOEx when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -170,7 +171,7 @@ void pollArmRpo_shouldPollSuccessfully_whenSaveBackgroundCompletedForManualRun() when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(true, pollDuration); + armRpoPollService.pollArmRpo(true, pollDuration, batchSize); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -215,7 +216,7 @@ void pollArmRpo_shouldPollSuccessfully_whenGetExtendedSearchesByMatterInProgress when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -260,7 +261,7 @@ void pollArmRpo_shouldPollSuccessfully_whenCreateExportBasedOnSearchResultsTable when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -302,7 +303,7 @@ void pollArmRpo_shouldPollSuccessfully_whenGetExtendedProductionsByMatterInProgr when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoApi).getExtendedProductionsByMatter(eq("bearerToken"), eq(1), contains(PRODUCTION_NAME), eq(userAccountEntity)); @@ -342,7 +343,7 @@ void pollArmRpo_shouldPollSuccessfully_whenCreateExportBasedOnSearchResultsTable when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(true, pollDuration); + armRpoPollService.pollArmRpo(true, pollDuration, batchSize); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -387,7 +388,7 @@ void pollArmRpo_shouldPollSuccessfully_whenDownloadProductionFailedOnPreviousAtt when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(true, pollDuration); + armRpoPollService.pollArmRpo(true, pollDuration, batchSize); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -416,7 +417,7 @@ void pollArmRpo_shouldPollNotFindLatestExecutionDetail_OnStepDownloadProductionF armRpoExecutionDetailEntity.setArmRpoState(ARM_RPO_HELPER_MOCKS.getDownloadProductionRpoState()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -431,7 +432,7 @@ void pollArmRpo_shouldPollNotFindLatestExecutionDetailForManualRun_OnStepDownloa armRpoExecutionDetailEntity.setArmRpoState(ARM_RPO_HELPER_MOCKS.getDownloadProductionRpoState()); // when - armRpoPollService.pollArmRpo(true, pollDuration); + armRpoPollService.pollArmRpo(true, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -445,7 +446,7 @@ void pollArmRpo_shouldHandleNoExecutionDetailEntity() { when(armRpoService.getLatestArmRpoExecutionDetailEntity()).thenReturn(null); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -462,7 +463,7 @@ void pollArmRpo_shouldHandleNoBearerToken() { when(armApiService.getArmBearerToken()).thenReturn(null); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -484,7 +485,7 @@ void pollArmRpo_shouldHandleCreateExportInProgress() { when(armRpoApi.createExportBasedOnSearchResultsTable(anyString(), anyInt(), any(), anyString(), any(), any(UserAccountEntity.class))).thenReturn(false); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -509,7 +510,7 @@ void pollArmRpo_shouldHandleGetExtendedSearchesByMatterGetInProgress() { ArmRpoInProgressException.class); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -535,7 +536,7 @@ void pollArmRpo_shouldHandleNoProductionFiles() { when(armRpoApi.getProductionOutputFiles(anyString(), anyInt(), any(UserAccountEntity.class))).thenReturn(List.of()); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -567,7 +568,7 @@ void pollArmRpo_shouldHandleExceptionDuringPolling_whenCreateExportBasedOnSearch new ArmRpoException("Test exception")); // when - armRpoPollService.pollArmRpo(false, pollDuration); + armRpoPollService.pollArmRpo(false, pollDuration, batchSize); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java index 66993c19c5..39d187e58c 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java @@ -70,6 +70,7 @@ class ArmRpoServiceImplTest { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private UserAccountEntity userAccountEntity; + private final int batchSize = 10; @BeforeEach void setUp() { @@ -186,7 +187,7 @@ void reconcileArmRpoCsvData_Success() { File file = TestUtils.getFile("Tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); // then assertEquals(EodHelper.storedStatus(), externalObjectDirectoryEntity1.getStatus()); @@ -211,7 +212,7 @@ void reconcileArmRpoCsvData_NoCsvFoundError() { File file = new File("Tests/arm/rpo/noFile.csv"); // when ArmRpoException armRpoException = assertThrows(ArmRpoException.class, () -> - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file))); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize)); // then assertThat(armRpoException.getMessage(), containsString( diff --git a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java index 99b3e34394..e59fb69243 100644 --- a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java +++ b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java @@ -42,6 +42,6 @@ void runTask() { armRpoPollAutomatedTask.runTask(); // then - Mockito.verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, Duration.ofSeconds(5)); + Mockito.verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, Duration.ofSeconds(5), 1); } } \ No newline at end of file From c405d69ef69a5e8516622b9feed802fbd69c63bd Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Sun, 2 Feb 2025 11:53:49 +0000 Subject: [PATCH 2/7] DMP-4627 UpdateMetadata is failing for ARM Merged master --- .../arm/service/ArmRpoServiceIntTest.java | 12 ++++----- .../arm/service/impl/ArmRpoServiceImpl.java | 26 ++++++++++++++----- .../ExternalObjectDirectoryRepository.java | 7 ++--- .../service/impl/ArmRpoServiceImplTest.java | 16 +++++++----- .../impl/ArmRpoPollAutomatedTaskTest.java | 9 ++++--- 5 files changed, 44 insertions(+), 26 deletions(-) diff --git a/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java b/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java index 408bfa9782..98eb2ddf7a 100644 --- a/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java +++ b/src/integrationTest/java/uk/gov/hmcts/darts/arm/service/ArmRpoServiceIntTest.java @@ -36,6 +36,7 @@ class ArmRpoServiceIntTest extends PostgresIntegrationBase { private static final LocalDateTime HEARING_DATE = LocalDateTime.of(2023, 9, 26, 10, 0, 0); private static final String PRODUCTION_ID = "ProductionId"; + private static final int BATCH_SIZE = 10; @MockitoBean private UserIdentity userIdentity; @@ -46,7 +47,6 @@ class ArmRpoServiceIntTest extends PostgresIntegrationBase { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private MediaEntity media1; private MediaEntity media2; - private int batchSize = 10; @BeforeEach void setUp() { @@ -117,7 +117,7 @@ void reconcileArmRpoCsvData_Success() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE); // then List foundMediaList1 = dartsDatabase.getExternalObjectDirectoryRepository() @@ -162,7 +162,7 @@ void reconcileArmRpoCsvData_DoesNothing_WhenIngestionDateOutsideOfRange() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE); // then List foundMediaList1 = dartsDatabase.getExternalObjectDirectoryRepository() @@ -198,7 +198,7 @@ void reconcileArmRpoCsvData_UpdatesStatus_WhenCsvDataMatches() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE); // then List foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository() @@ -226,7 +226,7 @@ void reconcileArmRpoCsvData_UpdatesEodStatusToReplay_WhenCsvDataContainsEodsThat File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvDataNotFoundEods.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE); // then List foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository() @@ -254,7 +254,7 @@ void reconcileArmRpoCsvData_DoesNothing_WhenNoEodsInRange() { File file = TestUtils.getFile("tests/arm/rpo/armRpoCsvDataNotFoundEods.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE); // then List foundMediaList = dartsDatabase.getExternalObjectDirectoryRepository() diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java index da8863d5fc..4ab207e93e 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.csv.CSVRecord; import org.apache.commons.lang3.StringUtils; +import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import uk.gov.hmcts.darts.arm.exception.ArmRpoException; @@ -104,21 +105,32 @@ public ArmRpoExecutionDetailEntity saveArmRpoExecutionDetailEntity(ArmRpoExecuti } @Override - public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List csvFiles, - int batchSize) { + public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity, List csvFiles, int batchSize) { ObjectRecordStatusEntity armRpoPending = EodHelper.armRpoPendingStatus(); StringBuilder errorMessage = new StringBuilder("Failure during ARM RPO CSV Reconciliation: "); ArmAutomatedTaskEntity armAutomatedTaskEntity = armAutomatedTaskRepository.findByAutomatedTask_taskName(ADD_ASYNC_SEARCH_RELATED_TASK_NAME) .orElseThrow(() -> new ArmRpoException(errorMessage.append("Automated task ProcessE2EArmRpoPending not found.").toString())); - List externalObjectDirectoryEntities = externalObjectDirectoryRepository.findByStatusAndIngestionDate( - armRpoPending, - armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()), - armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour())); - List csvEodList = getEodsListFromCsvFiles(csvFiles, errorMessage); + // get all EODs that are in the pending state and within the time window in batches until no more are found + + List externalObjectDirectoryEntities = new ArrayList<>(); + int offset = 0; + List batch; + + do { + batch = externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging( + armRpoPending, + armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()), + armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour()), + Pageable.ofSize(batchSize).withPage(offset) + ); + externalObjectDirectoryEntities.addAll(batch); + offset += batchSize; + } while (!batch.isEmpty()); + externalObjectDirectoryEntities.forEach( externalObjectDirectoryEntity -> { if (csvEodList.contains(externalObjectDirectoryEntity.getId())) { diff --git a/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java b/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java index ca35a43305..4e5ae72b39 100644 --- a/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java +++ b/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java @@ -670,9 +670,10 @@ void updateEodStatusAndTransferAttemptsWhereIdIn(ObjectRecordStatusEntity newSta AND eod.dataIngestionTs between :rpoCsvStartTime AND :rpoCsvEndTime """ ) - List findByStatusAndIngestionDate(ObjectRecordStatusEntity status, - OffsetDateTime rpoCsvStartTime, - OffsetDateTime rpoCsvEndTime); + List findByStatusAndIngestionDateTsWithPaging(ObjectRecordStatusEntity status, + OffsetDateTime rpoCsvStartTime, + OffsetDateTime rpoCsvEndTime, + Pageable pageable); @Query(""" SELECT eod FROM ExternalObjectDirectoryEntity eod diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java index 39d187e58c..e7be3c2cdc 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java @@ -40,6 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -181,8 +182,8 @@ void reconcileArmRpoCsvData_Success() { armRpoExecutionDetailEntity.setCreatedDateTime(OffsetDateTime.now()); when(armAutomatedTaskRepository.findByAutomatedTask_taskName(any())) .thenReturn(Optional.of(createArmAutomatedTaskEntity())); - when(externalObjectDirectoryRepository.findByStatusAndIngestionDate(any(), any(), any())) - .thenReturn(externalObjectDirectoryEntities); + when(externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging(any(), any(), any(), any())) + .thenReturn(externalObjectDirectoryEntities).thenReturn(Collections.emptyList()); File file = TestUtils.getFile("Tests/arm/rpo/armRpoCsvData.csv"); @@ -193,9 +194,11 @@ void reconcileArmRpoCsvData_Success() { assertEquals(EodHelper.storedStatus(), externalObjectDirectoryEntity1.getStatus()); assertEquals(EodHelper.armReplayStatus(), externalObjectDirectoryEntity2.getStatus()); - verify(externalObjectDirectoryRepository).findByStatusAndIngestionDate(EodHelper.armRpoPendingStatus(), - armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_END_HOUR), - armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_START_HOUR)); + verify(externalObjectDirectoryRepository, times(2)).findByStatusAndIngestionDateTsWithPaging( + eq(EodHelper.armRpoPendingStatus()), + eq(armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_END_HOUR)), + eq(armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_START_HOUR)), + any()); verify(externalObjectDirectoryRepository, times(1)).saveAllAndFlush(externalObjectDirectoryEntities); } @@ -207,9 +210,8 @@ void reconcileArmRpoCsvData_NoCsvFoundError() { armRpoExecutionDetailEntity.setCreatedDateTime(OffsetDateTime.now()); when(armAutomatedTaskRepository.findByAutomatedTask_taskName(any())) .thenReturn(Optional.of(createArmAutomatedTaskEntity())); - when(externalObjectDirectoryRepository.findByStatusAndIngestionDate(any(), any(), any())) - .thenReturn(Collections.singletonList(externalObjectDirectoryEntity)); File file = new File("Tests/arm/rpo/noFile.csv"); + // when ArmRpoException armRpoException = assertThrows(ArmRpoException.class, () -> armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize)); diff --git a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java index e59fb69243..5adb6d351e 100644 --- a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java +++ b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java @@ -13,6 +13,8 @@ import java.time.Duration; +import static org.mockito.Mockito.verify; + @ExtendWith(MockitoExtension.class) class ArmRpoPollAutomatedTaskTest { @@ -24,14 +26,15 @@ class ArmRpoPollAutomatedTaskTest { private LogApi logApi; @Mock private LockService lockService; + @Mock + private ArmRpoPollAutomatedTaskConfig armRpoPollAutomatedTaskConfig; @Test void runTask() { - ArmRpoPollAutomatedTaskConfig armRpoPollAutomatedTaskConfig = new ArmRpoPollAutomatedTaskConfig(); armRpoPollAutomatedTaskConfig.setPollDuration(Duration.ofSeconds(5)); // given ArmRpoPollingAutomatedTask armRpoPollAutomatedTask = new ArmRpoPollingAutomatedTask( - null, + automatedTaskRepository, armRpoPollAutomatedTaskConfig, armRpoPollService, logApi, @@ -42,6 +45,6 @@ void runTask() { armRpoPollAutomatedTask.runTask(); // then - Mockito.verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, Duration.ofSeconds(5), 1); + verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, 0); } } \ No newline at end of file From 55bd71503eda83e53bcb3e025ccce9624adbd28e Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Sun, 2 Feb 2025 11:54:06 +0000 Subject: [PATCH 3/7] DMP-4627 UpdateMetadata is failing for ARM Merged master --- .../darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java index 5adb6d351e..d76dc112e8 100644 --- a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java +++ b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java @@ -13,8 +13,6 @@ import java.time.Duration; -import static org.mockito.Mockito.verify; - @ExtendWith(MockitoExtension.class) class ArmRpoPollAutomatedTaskTest { @@ -45,6 +43,6 @@ void runTask() { armRpoPollAutomatedTask.runTask(); // then - verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, 0); + Mockito.verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, Duration.ofSeconds(5), 1); } } \ No newline at end of file From 86aec05ef508e73cb19c2dc66794fe5597aaedd0 Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Sun, 2 Feb 2025 11:57:39 +0000 Subject: [PATCH 4/7] DMP-4646 Implement batching in ArmRpoPolling automated task Started to add batchsize to ArmRpoPolling automated task --- .../arm/service/impl/ArmRpoPollServiceImplTest.java | 1 + .../darts/arm/service/impl/ArmRpoServiceImplTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java index 1cb3d77607..7545163587 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java @@ -51,6 +51,7 @@ class ArmRpoPollServiceImplTest { private static final String PRODUCTION_NAME = "DARTS_RPO_2024-08-13"; + private static final int BATCH_SIZE = 10; @Mock private ArmRpoApi armRpoApi; diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java index e7be3c2cdc..7a7decae3a 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java @@ -49,6 +49,8 @@ @ExtendWith(MockitoExtension.class) class ArmRpoServiceImplTest { + private static final int BATCH_SIZE = 10; + @Mock private ArmRpoExecutionDetailRepository armRpoExecutionDetailRepository; @@ -71,7 +73,6 @@ class ArmRpoServiceImplTest { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private UserAccountEntity userAccountEntity; - private final int batchSize = 10; @BeforeEach void setUp() { @@ -188,7 +189,7 @@ void reconcileArmRpoCsvData_Success() { File file = TestUtils.getFile("Tests/arm/rpo/armRpoCsvData.csv"); // when - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE); // then assertEquals(EodHelper.storedStatus(), externalObjectDirectoryEntity1.getStatus()); @@ -205,16 +206,16 @@ void reconcileArmRpoCsvData_Success() { @Test void reconcileArmRpoCsvData_NoCsvFoundError() { // given - ExternalObjectDirectoryEntity externalObjectDirectoryEntity = createExternalObjectDirectoryEntity(1); + createExternalObjectDirectoryEntity(1); armRpoExecutionDetailEntity.setCreatedDateTime(OffsetDateTime.now()); when(armAutomatedTaskRepository.findByAutomatedTask_taskName(any())) .thenReturn(Optional.of(createArmAutomatedTaskEntity())); File file = new File("Tests/arm/rpo/noFile.csv"); - + // when ArmRpoException armRpoException = assertThrows(ArmRpoException.class, () -> - armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), batchSize)); + armRpoService.reconcileArmRpoCsvData(armRpoExecutionDetailEntity, Collections.singletonList(file), BATCH_SIZE)); // then assertThat(armRpoException.getMessage(), containsString( From 75fff428250c87424a6169aa09b4fc4ae534e4ef Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Sun, 2 Feb 2025 12:00:56 +0000 Subject: [PATCH 5/7] DMP-4646 Implement batching in ArmRpoPolling automated task Started to add batchsize to ArmRpoPolling automated task --- .../gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java index bb1508d4da..00dd1e9680 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImpl.java @@ -51,7 +51,7 @@ public class ArmRpoPollServiceImpl implements ArmRpoPollService { @Override public void pollArmRpo(boolean isManualRun, Duration pollDuration, int batchSize) { - log.info("Polling ARM RPO service - isManualRun: {}", isManualRun); + log.info("Polling ARM RPO - isManualRun: {} poll duration: {}, batchSize: {}", isManualRun, pollDuration, batchSize); setupFailedStatuses(); setupAllowableInProgressStates(); Integer executionId = null; From fdc7104ddb70794af293f37bec27b20020a0a8e5 Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Fri, 31 Jan 2025 09:51:54 +0000 Subject: [PATCH 6/7] DMP-4646 Implement batching in ArmRpoPolling automated task Started to add batchsize to ArmRpoPolling automated task --- .../arm/service/impl/ArmRpoServiceImpl.java | 21 +++++++++++-------- .../ExternalObjectDirectoryRepository.java | 3 ++- .../service/impl/ArmRpoServiceImplTest.java | 11 ++++++---- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java index 4ab207e93e..f14a08552b 100644 --- a/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java +++ b/src/main/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImpl.java @@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.csv.CSVRecord; import org.apache.commons.lang3.StringUtils; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -114,22 +116,23 @@ public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDe List csvEodList = getEodsListFromCsvFiles(csvFiles, errorMessage); - // get all EODs that are in the pending state and within the time window in batches until no more are found - List externalObjectDirectoryEntities = new ArrayList<>(); - int offset = 0; - List batch; + Pageable pageRequest = PageRequest.of(0, batchSize); + Page pages; do { - batch = externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging( + pages + = externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging( armRpoPending, armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()), armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour()), - Pageable.ofSize(batchSize).withPage(offset) + pageRequest ); - externalObjectDirectoryEntities.addAll(batch); - offset += batchSize; - } while (!batch.isEmpty()); + log.info("Found number of elements {}, total elements {}, total pages {} for batch size {}", + pages.getNumberOfElements(), pages.getTotalElements(), pages.getTotalPages(), batchSize); + externalObjectDirectoryEntities.addAll(pages.getContent()); + pageRequest = pageRequest.next(); + } while (pages.hasNext()); externalObjectDirectoryEntities.forEach( externalObjectDirectoryEntity -> { diff --git a/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java b/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java index 4e5ae72b39..f8cf762e73 100644 --- a/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java +++ b/src/main/java/uk/gov/hmcts/darts/common/repository/ExternalObjectDirectoryRepository.java @@ -2,6 +2,7 @@ import jakarta.transaction.Transactional; import org.springframework.data.domain.Limit; +import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; @@ -670,7 +671,7 @@ void updateEodStatusAndTransferAttemptsWhereIdIn(ObjectRecordStatusEntity newSta AND eod.dataIngestionTs between :rpoCsvStartTime AND :rpoCsvEndTime """ ) - List findByStatusAndIngestionDateTsWithPaging(ObjectRecordStatusEntity status, + Page findByStatusAndIngestionDateTsWithPaging(ObjectRecordStatusEntity status, OffsetDateTime rpoCsvStartTime, OffsetDateTime rpoCsvEndTime, Pageable pageable); diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java index 7a7decae3a..6a05d0ff23 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoServiceImplTest.java @@ -9,6 +9,8 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import uk.gov.hmcts.darts.arm.exception.ArmRpoException; import uk.gov.hmcts.darts.arm.helper.ArmRpoHelper; import uk.gov.hmcts.darts.arm.helper.ArmRpoHelperMocks; @@ -170,21 +172,22 @@ void saveArmRpoExecutionDetailEntity_ShouldSaveEntity() { verify(armRpoExecutionDetailRepository, times(1)).save(armRpoExecutionDetailEntity); } + @SuppressWarnings("unchecked") @Test void reconcileArmRpoCsvData_Success() { // given List externalObjectDirectoryEntities = new ArrayList<>(); ExternalObjectDirectoryEntity externalObjectDirectoryEntity1 = createExternalObjectDirectoryEntity(1); ExternalObjectDirectoryEntity externalObjectDirectoryEntity2 = createExternalObjectDirectoryEntity(2); - externalObjectDirectoryEntities.add(externalObjectDirectoryEntity1); externalObjectDirectoryEntities.add(externalObjectDirectoryEntity2); + Page pagedEods = new PageImpl<>(externalObjectDirectoryEntities); armRpoExecutionDetailEntity.setCreatedDateTime(OffsetDateTime.now()); when(armAutomatedTaskRepository.findByAutomatedTask_taskName(any())) .thenReturn(Optional.of(createArmAutomatedTaskEntity())); when(externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging(any(), any(), any(), any())) - .thenReturn(externalObjectDirectoryEntities).thenReturn(Collections.emptyList()); + .thenReturn(pagedEods); File file = TestUtils.getFile("Tests/arm/rpo/armRpoCsvData.csv"); @@ -195,12 +198,12 @@ void reconcileArmRpoCsvData_Success() { assertEquals(EodHelper.storedStatus(), externalObjectDirectoryEntity1.getStatus()); assertEquals(EodHelper.armReplayStatus(), externalObjectDirectoryEntity2.getStatus()); - verify(externalObjectDirectoryRepository, times(2)).findByStatusAndIngestionDateTsWithPaging( + verify(externalObjectDirectoryRepository).findByStatusAndIngestionDateTsWithPaging( eq(EodHelper.armRpoPendingStatus()), eq(armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_END_HOUR)), eq(armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_START_HOUR)), any()); - verify(externalObjectDirectoryRepository, times(1)).saveAllAndFlush(externalObjectDirectoryEntities); + verify(externalObjectDirectoryRepository).saveAllAndFlush(externalObjectDirectoryEntities); } @Test From 4113e854e42b9e0685f25b3d8e662314780f22c5 Mon Sep 17 00:00:00 2001 From: karen-hedges <133129444+karen-hedges@users.noreply.github.com> Date: Sun, 2 Feb 2025 12:33:29 +0000 Subject: [PATCH 7/7] DMP-4646 Implement batching in ArmRpoPolling automated task Started to add batchsize to ArmRpoPolling automated task --- .../config/ArmRpoPollAutomatedTaskConfig.java | 1 + .../impl/ArmRpoPollServiceImplTest.java | 31 +++++++++---------- .../impl/ArmRpoPollAutomatedTaskTest.java | 11 +++++-- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/main/java/uk/gov/hmcts/darts/task/config/ArmRpoPollAutomatedTaskConfig.java b/src/main/java/uk/gov/hmcts/darts/task/config/ArmRpoPollAutomatedTaskConfig.java index c9369ff399..7d2d885efe 100644 --- a/src/main/java/uk/gov/hmcts/darts/task/config/ArmRpoPollAutomatedTaskConfig.java +++ b/src/main/java/uk/gov/hmcts/darts/task/config/ArmRpoPollAutomatedTaskConfig.java @@ -14,4 +14,5 @@ public class ArmRpoPollAutomatedTaskConfig extends AbstractAutomatedTaskConfig { private Duration pollDuration; + } diff --git a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java index 7545163587..9ab4d2f754 100644 --- a/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java +++ b/src/test/java/uk/gov/hmcts/darts/arm/service/impl/ArmRpoPollServiceImplTest.java @@ -84,7 +84,6 @@ class ArmRpoPollServiceImplTest { private ArmRpoExecutionDetailEntity armRpoExecutionDetailEntity; private static final ArmRpoHelperMocks ARM_RPO_HELPER_MOCKS = new ArmRpoHelperMocks(); private final Duration pollDuration = Duration.ofHours(4); - private int batchSize = 10; private ArmRpoPollServiceImpl armRpoPollService; @@ -127,7 +126,7 @@ void pollArmRpo_shouldPollSuccessfully_whenSaveBackgroundCompleted() throws IOEx when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -172,7 +171,7 @@ void pollArmRpo_shouldPollSuccessfully_whenSaveBackgroundCompletedForManualRun() when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(true, pollDuration, batchSize); + armRpoPollService.pollArmRpo(true, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -217,7 +216,7 @@ void pollArmRpo_shouldPollSuccessfully_whenGetExtendedSearchesByMatterInProgress when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -262,7 +261,7 @@ void pollArmRpo_shouldPollSuccessfully_whenCreateExportBasedOnSearchResultsTable when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -304,7 +303,7 @@ void pollArmRpo_shouldPollSuccessfully_whenGetExtendedProductionsByMatterInProgr when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedProductionsByMatter(eq("bearerToken"), eq(1), contains(PRODUCTION_NAME), eq(userAccountEntity)); @@ -344,7 +343,7 @@ void pollArmRpo_shouldPollSuccessfully_whenCreateExportBasedOnSearchResultsTable when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(true, pollDuration, batchSize); + armRpoPollService.pollArmRpo(true, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -389,7 +388,7 @@ void pollArmRpo_shouldPollSuccessfully_whenDownloadProductionFailedOnPreviousAtt when(fileOperationService.saveFileToTempWorkspace(any(InputStream.class), anyString(), any(), anyBoolean())).thenReturn(filePath); // when - armRpoPollService.pollArmRpo(true, pollDuration, batchSize); + armRpoPollService.pollArmRpo(true, pollDuration, BATCH_SIZE); // then verify(armRpoApi).getExtendedSearchesByMatter("bearerToken", 1, userAccountEntity); @@ -418,7 +417,7 @@ void pollArmRpo_shouldPollNotFindLatestExecutionDetail_OnStepDownloadProductionF armRpoExecutionDetailEntity.setArmRpoState(ARM_RPO_HELPER_MOCKS.getDownloadProductionRpoState()); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -433,7 +432,7 @@ void pollArmRpo_shouldPollNotFindLatestExecutionDetailForManualRun_OnStepDownloa armRpoExecutionDetailEntity.setArmRpoState(ARM_RPO_HELPER_MOCKS.getDownloadProductionRpoState()); // when - armRpoPollService.pollArmRpo(true, pollDuration, batchSize); + armRpoPollService.pollArmRpo(true, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -447,7 +446,7 @@ void pollArmRpo_shouldHandleNoExecutionDetailEntity() { when(armRpoService.getLatestArmRpoExecutionDetailEntity()).thenReturn(null); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -464,7 +463,7 @@ void pollArmRpo_shouldHandleNoBearerToken() { when(armApiService.getArmBearerToken()).thenReturn(null); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -486,7 +485,7 @@ void pollArmRpo_shouldHandleCreateExportInProgress() { when(armRpoApi.createExportBasedOnSearchResultsTable(anyString(), anyInt(), any(), anyString(), any(), any(UserAccountEntity.class))).thenReturn(false); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -511,7 +510,7 @@ void pollArmRpo_shouldHandleGetExtendedSearchesByMatterGetInProgress() { ArmRpoInProgressException.class); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -537,7 +536,7 @@ void pollArmRpo_shouldHandleNoProductionFiles() { when(armRpoApi.getProductionOutputFiles(anyString(), anyInt(), any(UserAccountEntity.class))).thenReturn(List.of()); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); @@ -569,7 +568,7 @@ void pollArmRpo_shouldHandleExceptionDuringPolling_whenCreateExportBasedOnSearch new ArmRpoException("Test exception")); // when - armRpoPollService.pollArmRpo(false, pollDuration, batchSize); + armRpoPollService.pollArmRpo(false, pollDuration, BATCH_SIZE); // then verify(armRpoService).getLatestArmRpoExecutionDetailEntity(); diff --git a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java index d76dc112e8..9a3b32d30c 100644 --- a/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java +++ b/src/test/java/uk/gov/hmcts/darts/task/runner/impl/ArmRpoPollAutomatedTaskTest.java @@ -3,7 +3,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import uk.gov.hmcts.darts.arm.service.ArmRpoPollService; import uk.gov.hmcts.darts.common.repository.AutomatedTaskRepository; @@ -13,6 +12,10 @@ import java.time.Duration; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) class ArmRpoPollAutomatedTaskTest { @@ -29,8 +32,10 @@ class ArmRpoPollAutomatedTaskTest { @Test void runTask() { - armRpoPollAutomatedTaskConfig.setPollDuration(Duration.ofSeconds(5)); // given + Duration pollDuration = Duration.ofSeconds(5); + when(armRpoPollAutomatedTaskConfig.getPollDuration()).thenReturn(pollDuration); + ArmRpoPollingAutomatedTask armRpoPollAutomatedTask = new ArmRpoPollingAutomatedTask( automatedTaskRepository, armRpoPollAutomatedTaskConfig, @@ -43,6 +48,6 @@ void runTask() { armRpoPollAutomatedTask.runTask(); // then - Mockito.verify(armRpoPollService, Mockito.times(1)).pollArmRpo(false, Duration.ofSeconds(5), 1); + verify(armRpoPollService, times(1)).pollArmRpo(false, pollDuration, 0); } } \ No newline at end of file