From 9c0155ce1c3dd7aff79294bde23ad931c45465d4 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Thu, 6 Mar 2025 10:52:01 -0700 Subject: [PATCH] wip streaming batch2 job processor --- .../jpa/batch2/JpaJobPersistenceImpl.java | 9 ++ ...Batch2WorkChunkMetadataViewRepository.java | 8 + .../test/IInstanceStateTransitions.java | 4 +- .../uhn/fhir/batch2/api/IJobPersistence.java | 8 + .../fhir/batch2/config/BaseBatch2Config.java | 19 ++- .../maintenance/JobInstanceProcessor.java | 152 +++++++----------- .../JobMaintenanceServiceImpl.java | 9 +- .../WorkChunkProcessingService.java | 139 ++++++++++++++++ .../JobMaintenanceServiceImplTest.java | 4 +- 9 files changed, 251 insertions(+), 101 deletions(-) create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/WorkChunkProcessingService.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index b057f087412e..f8894fe2b285 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -540,6 +540,15 @@ public Page fetchAllWorkChunkMetadataForJobInStates( return page.map(Batch2WorkChunkMetadataView::toChunkMetadata); } + @Override + @Transactional(readOnly = true) + public Stream streamAllWorkChunkMetadataForJobInStates( + String theInstanceId, Set theStates) { + Stream workChunks = + myWorkChunkMetadataViewRepo.streamWorkChunkMetadataForJobInStates(theInstanceId, theStates); + return workChunks.map(Batch2WorkChunkMetadataView::toChunkMetadata); + } + @Override public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { /* diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkMetadataViewRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkMetadataViewRepository.java index 4cfb8975807a..503ec2d15bca 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkMetadataViewRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkMetadataViewRepository.java @@ -26,8 +26,10 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import java.util.Collection; +import java.util.stream.Stream; public interface IBatch2WorkChunkMetadataViewRepository extends JpaRepository { @@ -37,4 +39,10 @@ Page fetchWorkChunkMetadataForJobInStates( Pageable thePageRequest, @Param("instanceId") String theInstanceId, @Param("states") Collection theStates); + + @Query("SELECT v FROM Batch2WorkChunkMetadataView v WHERE v.myInstanceId = :instanceId AND v.myStatus IN :states " + + " ORDER BY v.myInstanceId, v.myTargetStepId, v.myStatus, v.mySequence, v.myId ASC") + @Transactional(readOnly = true) // Ensures the transaction remains open for streaming + Stream streamWorkChunkMetadataForJobInStates( + @Param("instanceId") String theInstanceId, @Param("states") Collection theStates); } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java index 8de64d5ccc81..74a9ecdd632c 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -107,7 +108,8 @@ default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) { instanceId1, new JobChunkProgressAccumulator(), null, - jobDefinitionRegistry + jobDefinitionRegistry, + new HapiTransactionService() ).process(); }); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index de682bfdb8c7..82e09a3373db 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -169,6 +169,14 @@ List fetchInstancesByJobDefinitionIdAndStatus( Page fetchAllWorkChunkMetadataForJobInStates( Pageable thePageable, String theInstanceId, Set theStates); + /** + * Fetches a stream that retrieves WorkChunkMetaData from the db + * @param theInstanceId instance id of job of interest + * @param theStates states of interest + * @return a stream for the workchunks + */ + Stream streamAllWorkChunkMetadataForJobInStates( + String theInstanceId, Set theStates); /** * Callback to update a JobInstance within a locked transaction. * Return true from the callback if the record write should continue, or false if diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index 6e9a0e4d6faa..fde7fa78e564 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -34,6 +34,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; @@ -59,7 +60,10 @@ public abstract class BaseBatch2Config { IChannelFactory myChannelFactory; @Autowired - IHapiTransactionService myHapiTransactionService; + IHapiTransactionService myIHapiTransactionService; + + @Autowired + HapiTransactionService myHapiTransactionService; @Bean public JobDefinitionRegistry batch2JobDefinitionRegistry() { @@ -68,7 +72,7 @@ public JobDefinitionRegistry batch2JobDefinitionRegistry() { @Bean public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) { - return new WorkChunkProcessor(myPersistence, theBatchJobSender, myHapiTransactionService); + return new WorkChunkProcessor(myPersistence, theBatchJobSender, myIHapiTransactionService); } @Bean @@ -81,8 +85,7 @@ public IJobCoordinator batch2JobCoordinator( JobDefinitionRegistry theJobDefinitionRegistry, BatchJobSender theBatchJobSender, WorkChunkProcessor theExecutor, - IJobMaintenanceService theJobMaintenanceService, - IHapiTransactionService theTransactionService) { + IJobMaintenanceService theJobMaintenanceService) { return new JobCoordinatorImpl( theBatchJobSender, batch2ProcessingChannelReceiver(myChannelFactory), @@ -90,7 +93,7 @@ public IJobCoordinator batch2JobCoordinator( theJobDefinitionRegistry, theExecutor, theJobMaintenanceService, - theTransactionService); + myIHapiTransactionService); } @Bean @@ -108,7 +111,8 @@ public IJobMaintenanceService batch2JobMaintenanceService( JpaStorageSettings theStorageSettings, BatchJobSender theBatchJobSender, WorkChunkProcessor theExecutor, - IReductionStepExecutorService theReductionStepExecutorService) { + IReductionStepExecutorService theReductionStepExecutorService, + HapiTransactionService theHapiTransactionService) { return new JobMaintenanceServiceImpl( theSchedulerService, myPersistence, @@ -116,7 +120,8 @@ public IJobMaintenanceService batch2JobMaintenanceService( theJobDefinitionRegistry, theBatchJobSender, theExecutor, - theReductionStepExecutorService); + theReductionStepExecutorService, + myHapiTransactionService); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 199686301c82..2f3944a68e3d 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -32,34 +32,32 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; -import ca.uhn.fhir.model.api.PagingIterator; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; -import org.springframework.data.domain.Page; -import org.springframework.data.domain.Pageable; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; public class JobInstanceProcessor { - private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); - public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY; - - // 10k; we want to get as many as we can - private static final int WORK_CHUNK_METADATA_BATCH_SIZE = 10000; private final IJobPersistence myJobPersistence; private final BatchJobSender myBatchJobSender; + private final String myInstanceId; + private final JobDefinitionRegistry myJobDefinitionRegistry; + private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY; + private final WorkChunkProcessingService myWorkChunkProcessingService; private final JobChunkProgressAccumulator myProgressAccumulator; private final JobInstanceProgressCalculator myJobInstanceProgressCalculator; private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; private final IReductionStepExecutorService myReductionStepExecutorService; - private final String myInstanceId; - private final JobDefinitionRegistry myJobDefinitionegistry; + private final HapiTransactionService myTxService; public JobInstanceProcessor( IJobPersistence theJobPersistence, @@ -67,16 +65,19 @@ public JobInstanceProcessor( String theInstanceId, JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, - JobDefinitionRegistry theJobDefinitionRegistry) { + JobDefinitionRegistry theJobDefinitionRegistry, + HapiTransactionService theTxService) { myJobPersistence = theJobPersistence; myBatchJobSender = theBatchJobSender; myInstanceId = theInstanceId; myProgressAccumulator = theProgressAccumulator; myReductionStepExecutorService = theReductionStepExecutorService; - myJobDefinitionegistry = theJobDefinitionRegistry; + myJobDefinitionRegistry = theJobDefinitionRegistry; myJobInstanceProgressCalculator = new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry); myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); + myTxService = theTxService; + myWorkChunkProcessingService = new WorkChunkProcessingService(theTxService); } public void process() { @@ -95,7 +96,7 @@ public void process() { } JobDefinition jobDefinition = - myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance); + myJobDefinitionRegistry.getJobDefinitionOrThrowException(theInstance); // move POLL_WAITING -> READY processPollingChunks(theInstance.getInstanceId()); @@ -124,7 +125,7 @@ public void process() { } // enqueue all READY chunks - enqueueReadyChunks(theInstance, jobDefinition); + processReadyChunks(theInstance, jobDefinition); ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch); } @@ -144,6 +145,52 @@ private boolean handleCancellation(JobInstance theInstance) { return false; } + public void processReadyChunks(JobInstance theJobInstance, JobDefinition theJobDefinition) { + AtomicInteger counter = new AtomicInteger(0); + try (final Stream chunks = + myWorkChunkProcessingService.getReadyChunks(theJobInstance.getInstanceId())) { + chunks.forEach(chunk -> { + updateChunkAndSendToQueue(chunk); + counter.incrementAndGet(); + }); + } + ourLog.debug( + "Encountered {} READY work chunks for job {} of type {}", + counter, + theJobInstance.getInstanceId(), + theJobDefinition.getJobDefinitionId()); + } + + private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) { + String chunkId = theChunk.getId(); + myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> { + if (updated == 1) { + sendNotification(theChunk); + } else { + // means the work chunk is likely already gone... + // we'll log and skip it. If it's still in the DB, the next pass + // will pick it up. Otherwise, it's no longer important + ourLog.error( + "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.", + theChunk.getInstanceId(), + theChunk.getId(), + updated); + } + }); + } + + private void sendNotification(WorkChunkMetadata theChunk) { + // send to the queue + // we use current step id because it has not been moved to the next step (yet) + JobWorkNotification workNotification = new JobWorkNotification( + theChunk.getJobDefinitionId(), + theChunk.getJobDefinitionVersion(), + theChunk.getInstanceId(), + theChunk.getTargetStepId(), + theChunk.getId()); + myBatchJobSender.sendWorkChannelMessage(workNotification); + } + private String buildCancelledMessage(JobInstance theInstance) { String msg = "Job instance cancelled"; if (theInstance.hasGatedStep()) { @@ -266,17 +313,6 @@ private boolean canAdvanceGatedJob(JobInstance theInstance) { return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); } - protected PagingIterator getReadyChunks() { - return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> { - Pageable pageable = Pageable.ofSize(batchsize).withPage(index); - Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( - pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); - for (WorkChunkMetadata metadata : results) { - consumer.accept(metadata); - } - }); - } - /** * Trigger the reduction step for the given job instance. Reduction step chunks should never be queued. */ @@ -291,70 +327,6 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { - Iterator iter = getReadyChunks(); - - int counter = 0; - while (iter.hasNext()) { - WorkChunkMetadata metadata = iter.next(); - - /* - * For each chunk id - * * Move to QUEUE'd - * * Send to topic - * * flush changes - * * commit - */ - updateChunkAndSendToQueue(metadata); - counter++; - } - ourLog.debug( - "Encountered {} READY work chunks for job {} of type {}", - counter, - theJobInstance.getInstanceId(), - theJobDefinition.getJobDefinitionId()); - } - - /** - * Updates the Work Chunk and sends it to the queue. - * - * Because ReductionSteps are done inline by the maintenance pass, - * those will not be sent to the queue (but they will still have their - * status updated from READY -> QUEUED). - * - * Returns true after processing. - */ - private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) { - String chunkId = theChunk.getId(); - myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> { - ourLog.info("Updated {} workchunk with id {}", updated, chunkId); - if (updated == 1) { - sendNotification(theChunk); - } else { - // means the work chunk is likely already gone... - // we'll log and skip it. If it's still in the DB, the next pass - // will pick it up. Otherwise, it's no longer important - ourLog.error( - "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.", - theChunk.getInstanceId(), - theChunk.getId(), - updated); - } - }); - } - - private void sendNotification(WorkChunkMetadata theChunk) { - // send to the queue - // we use current step id because it has not been moved to the next step (yet) - JobWorkNotification workNotification = new JobWorkNotification( - theChunk.getJobDefinitionId(), - theChunk.getJobDefinitionVersion(), - theChunk.getInstanceId(), - theChunk.getTargetStepId(), - theChunk.getId()); - myBatchJobSender.sendWorkChannelMessage(workNotification); - } - private void processChunksForNextGatedSteps( JobInstance theInstance, JobDefinition theJobDefinition, String nextStepId) { String instanceId = theInstance.getInstanceId(); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java index 154d787b2b6c..7ff121e02e84 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; @@ -96,6 +97,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc private Runnable myMaintenanceJobStartedCallback = () -> {}; private Runnable myMaintenanceJobFinishedCallback = () -> {}; private final IReductionStepExecutorService myReductionStepExecutorService; + private final HapiTransactionService myTransactionService; private boolean myEnabledBool = true; @@ -109,7 +111,8 @@ public JobMaintenanceServiceImpl( @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender, @Nonnull WorkChunkProcessor theExecutor, - @Nonnull IReductionStepExecutorService theReductionStepExecutorService) { + @Nonnull IReductionStepExecutorService theReductionStepExecutorService, + @Nonnull HapiTransactionService theTransactionService) { myStorageSettings = theStorageSettings; myReductionStepExecutorService = theReductionStepExecutorService; Validate.notNull(theSchedulerService); @@ -122,6 +125,7 @@ public JobMaintenanceServiceImpl( myJobDefinitionRegistry = theJobDefinitionRegistry; myBatchJobSender = theBatchJobSender; myJobExecutorSvc = theExecutor; + myTransactionService = theTransactionService; } @Override @@ -243,7 +247,8 @@ private void doMaintenancePass() { instanceId, progressAccumulator, myReductionStepExecutorService, - myJobDefinitionRegistry); + myJobDefinitionRegistry, + myTransactionService); ourLog.debug( "Triggering maintenance process for instance {} in status {}", instanceId, diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/WorkChunkProcessingService.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/WorkChunkProcessingService.java new file mode 100644 index 000000000000..b75a20e9d94e --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/WorkChunkProcessingService.java @@ -0,0 +1,139 @@ +package ca.uhn.fhir.batch2.maintenance; + +import ca.uhn.fhir.batch2.model.WorkChunkMetadata; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import jakarta.persistence.EntityManager; +import jakarta.persistence.Query; +import org.intellij.lang.annotations.Language; +import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.stereotype.Service; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.stream.Stream; + +@Service +public class WorkChunkProcessingService { + + private final HapiTransactionService myTxService; + + public WorkChunkProcessingService(HapiTransactionService theTxService) { + myTxService = theTxService; + } + // @Language("SQL") + // static String WORK_CHUNK_VIEW = + // "SELECT e.id as id, " + // + " e.seq as seq, " + // + " e.stat as state, " + // + " e.instance_id as instance_id, " + // + " e.definition_id as job_definition_id, " + // + " e.definition_ver as job_definition_version, " + // + " e.tgt_step_id as target_step_id " + // + " FROM BT2_WORK_CHUNK e " + // + " WHERE e.instance_id = :instanceId AND e.stat = :status " + // + " ORDER BY e.instance_id, e.tgt_step_id, e.stat, e.seq, e.id ASC"; + @Language("SQL") + static String WORK_CHUNK_VIEW = + "SELECT e.id, e.seq, e.stat, e.instance_id, e.definition_id, e.definition_ver, e.tgt_step_id " + + "FROM BT2_WORK_CHUNK e WHERE e.instance_id = :instanceId AND e.stat = :status"; + + public Stream getReadyChunks(String theInstanceId) { + EntityManager entityManager = getEntityManager(); + + Query nativeQuery = entityManager.createNativeQuery(WORK_CHUNK_VIEW); + nativeQuery.setParameter("instanceId", theInstanceId); + nativeQuery.setParameter("status", WorkChunkStatusEnum.READY.name()); + + return runInTransaction(() -> { + Stream resultStream = nativeQuery.getResultStream(); + return resultStream + .map(row -> { + WorkChunkMetadata metadata = new WorkChunkMetadata(); + metadata.setId((String) row[0]); + metadata.setSequence((Integer) row[1]); + metadata.setStatus(WorkChunkStatusEnum.valueOf((String) row[2])); + metadata.setInstanceId((String) row[3]); + metadata.setJobDefinitionId((String) row[4]); + metadata.setJobDefinitionVersion((Integer) row[5]); + metadata.setTargetStepId((String) row[6]); + + return metadata; + }) + .onClose(() -> onClose(entityManager)); + }); + } + + public T runInTransaction(Callable theRunnable) { + return newTxTemplate().execute(t -> { + try { + return theRunnable.call(); + } catch (Exception theE) { + throw new InternalErrorException(theE); + } + }); + } + + public TransactionTemplate newTxTemplate() { + TransactionTemplate retVal = new TransactionTemplate(myTxService.getTransactionManager()); + + retVal.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + retVal.setReadOnly(true); + retVal.afterPropertiesSet(); + return retVal; + } + + @SuppressWarnings("unchecked") + // public Stream runQuery(String sql, Map params, EntityManager theEntityManager) + // { + // Query q = theEntityManager.createNativeQuery(sql); + // if (!params.isEmpty()) { + // params.forEach(q::setParameter); + // } + // // raw sql to hibernate for compatibility + // Query hibernateQuery = q; + // + // return runInTransaction(() -> { + // final Stream resultStream = (Stream) hibernateQuery.getResultStream(); + // return resultStream.map(row -> { + // WorkChunkMetadata metadata = new WorkChunkMetadata(); + // metadata.setId((String) row[0]); + // metadata.setSequence((Integer) row[1]); + // metadata.setStatus(WorkChunkStatusEnum.valueOf((String) row[2])); + // metadata.setInstanceId((String) row[3]); + // metadata.setJobDefinitionId((String) row[4]); + // metadata.setJobDefinitionVersion((Integer) row[5]); + // metadata.setTargetStepId((String) row[6]); + // + // // ✅ Convert JSON column to IModelJson subclass + // + // if (jsonData != null) { + // try { + // metadata.setJsonData(objectMapper.readValue(jsonData, jsonClass)); // ✅ Convert JSON to IModelJson + // } catch (Exception e) { + // throw new RuntimeException("Failed to deserialize JSON data", e); + // } + // } + // + // return metadata; + // }).onClose(() -> onClose(entityManager)); + // }); + // } + // return resultStream.onClose(() -> onClose(theEntityManager)); + // }); + // } + + private void onClose(EntityManager theEntityManager) { + theEntityManager.close(); + } + + private EntityManager getEntityManager() { + JpaTransactionManager jpaTransactionManager = (JpaTransactionManager) myTxService.getTransactionManager(); + + return Objects.requireNonNull(jpaTransactionManager.getEntityManagerFactory()) + .createEntityManager(); + } +} diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java index 53c7ac212d80..d2f9bcd22b0c 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java @@ -18,6 +18,7 @@ import ca.uhn.fhir.batch2.model.WorkChunkMetadata; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.test.util.LogbackTestExtension; @@ -116,7 +117,8 @@ public void beforeEach() { myJobDefinitionRegistry, batchJobSender, myJobExecutorSvc, - myReductionStepExecutorService + myReductionStepExecutorService, + new HapiTransactionService() ); myStorageSettings.setJobFastTrackingEnabled(true); }