Skip to content

Commit

Permalink
DMP-4646 Implement batching in ArmRpoPolling automated task
Browse files Browse the repository at this point in the history
Started to add batchsize to  ArmRpoPolling automated task
  • Loading branch information
karen-hedges committed Jan 31, 2025
1 parent 7ad2628 commit 721163e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -114,22 +116,23 @@ public void reconcileArmRpoCsvData(ArmRpoExecutionDetailEntity armRpoExecutionDe

List<Integer> csvEodList = getEodsListFromCsvFiles(csvFiles, errorMessage);

// get all EODs that are in the pending state and within the time window in batches until no more are found

List<ExternalObjectDirectoryEntity> externalObjectDirectoryEntities = new ArrayList<>();
int offset = 0;
List<ExternalObjectDirectoryEntity> batch;
Pageable pageRequest = PageRequest.of(0, batchSize);
Page<ExternalObjectDirectoryEntity> pages;

do {
batch = externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging(
pages
= externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging(
armRpoPending,
armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvEndHour()),
armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(armAutomatedTaskEntity.getRpoCsvStartHour()),
Pageable.ofSize(batchSize).withPage(offset)
pageRequest
);
externalObjectDirectoryEntities.addAll(batch);
offset += batchSize;
} while (!batch.isEmpty());
log.info("Found number of elements {}, total elements {}, total pages {} for batch size {}",
pages.getNumberOfElements(), pages.getTotalElements(), pages.getTotalPages(), batchSize);
externalObjectDirectoryEntities.addAll(pages.getContent());
pageRequest = pageRequest.next();
} while (pages.hasNext());

externalObjectDirectoryEntities.forEach(
externalObjectDirectoryEntity -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import jakarta.transaction.Transactional;
import org.springframework.data.domain.Limit;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
Expand Down Expand Up @@ -656,7 +657,7 @@ void updateEodStatusAndTransferAttemptsWhereLastModifiedIsBetweenTwoDateTimesAnd
AND eod.dataIngestionTs between :rpoCsvStartTime AND :rpoCsvEndTime
"""
)
List<ExternalObjectDirectoryEntity> findByStatusAndIngestionDateTsWithPaging(ObjectRecordStatusEntity status,
Page<ExternalObjectDirectoryEntity> findByStatusAndIngestionDateTsWithPaging(ObjectRecordStatusEntity status,
OffsetDateTime rpoCsvStartTime,
OffsetDateTime rpoCsvEndTime,
Pageable pageable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import uk.gov.hmcts.darts.arm.exception.ArmRpoException;
import uk.gov.hmcts.darts.arm.helper.ArmRpoHelper;
import uk.gov.hmcts.darts.arm.helper.ArmRpoHelperMocks;
Expand Down Expand Up @@ -171,21 +173,22 @@ void saveArmRpoExecutionDetailEntity_ShouldSaveEntity() {
verify(armRpoExecutionDetailRepository, times(1)).save(armRpoExecutionDetailEntity);
}

@SuppressWarnings("unchecked")
@Test
void reconcileArmRpoCsvData_Success() {
// given
List<ExternalObjectDirectoryEntity> externalObjectDirectoryEntities = new ArrayList<>();
ExternalObjectDirectoryEntity externalObjectDirectoryEntity1 = createExternalObjectDirectoryEntity(1);
ExternalObjectDirectoryEntity externalObjectDirectoryEntity2 = createExternalObjectDirectoryEntity(2);

externalObjectDirectoryEntities.add(externalObjectDirectoryEntity1);
externalObjectDirectoryEntities.add(externalObjectDirectoryEntity2);
Page<ExternalObjectDirectoryEntity> pagedEods = new PageImpl<>(externalObjectDirectoryEntities);

armRpoExecutionDetailEntity.setCreatedDateTime(OffsetDateTime.now());
when(armAutomatedTaskRepository.findByAutomatedTask_taskName(any()))
.thenReturn(Optional.of(createArmAutomatedTaskEntity()));
when(externalObjectDirectoryRepository.findByStatusAndIngestionDateTsWithPaging(any(), any(), any(), any()))
.thenReturn(externalObjectDirectoryEntities).thenReturn(Collections.emptyList());
.thenReturn(pagedEods);

File file = TestUtils.getFile("Tests/arm/rpo/armRpoCsvData.csv");

Expand All @@ -196,12 +199,12 @@ void reconcileArmRpoCsvData_Success() {
assertEquals(EodHelper.storedStatus(), externalObjectDirectoryEntity1.getStatus());
assertEquals(EodHelper.armReplayStatus(), externalObjectDirectoryEntity2.getStatus());

verify(externalObjectDirectoryRepository, times(2)).findByStatusAndIngestionDateTsWithPaging(
verify(externalObjectDirectoryRepository).findByStatusAndIngestionDateTsWithPaging(
eq(EodHelper.armRpoPendingStatus()),
eq(armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_END_HOUR)),
eq(armRpoExecutionDetailEntity.getCreatedDateTime().minusHours(RPO_CSV_START_HOUR)),
any());
verify(externalObjectDirectoryRepository, times(1)).saveAllAndFlush(externalObjectDirectoryEntities);
verify(externalObjectDirectoryRepository).saveAllAndFlush(externalObjectDirectoryEntities);
}

@Test
Expand Down

0 comments on commit 721163e

Please sign in to comment.