Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip streaming batch2 job processor #6778

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,15 @@ public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
}

@Override
@Transactional(readOnly = true)
public Stream<WorkChunkMetadata> streamAllWorkChunkMetadataForJobInStates(
String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
Stream<Batch2WorkChunkMetadataView> workChunks =
myWorkChunkMetadataViewRepo.streamWorkChunkMetadataForJobInStates(theInstanceId, theStates);
return workChunks.map(Batch2WorkChunkMetadataView::toChunkMetadata);
}

@Override
public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;

import java.util.Collection;
import java.util.stream.Stream;

public interface IBatch2WorkChunkMetadataViewRepository extends JpaRepository<Batch2WorkChunkMetadataView, String> {

Expand All @@ -37,4 +39,10 @@ Page<Batch2WorkChunkMetadataView> fetchWorkChunkMetadataForJobInStates(
Pageable thePageRequest,
@Param("instanceId") String theInstanceId,
@Param("states") Collection<WorkChunkStatusEnum> theStates);

@Query("SELECT v FROM Batch2WorkChunkMetadataView v WHERE v.myInstanceId = :instanceId AND v.myStatus IN :states "
+ " ORDER BY v.myInstanceId, v.myTargetStepId, v.myStatus, v.mySequence, v.myId ASC")
@Transactional(readOnly = true) // Ensures the transaction remains open for streaming
Stream<Batch2WorkChunkMetadataView> streamWorkChunkMetadataForJobInStates(
@Param("instanceId") String theInstanceId, @Param("states") Collection<WorkChunkStatusEnum> theStates);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand Down Expand Up @@ -107,7 +108,8 @@ default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) {
instanceId1,
new JobChunkProgressAccumulator(),
null,
jobDefinitionRegistry
jobDefinitionRegistry,
new HapiTransactionService()
).process();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates);

/**
* Fetches a stream that retrieves WorkChunkMetaData from the db
* @param theInstanceId instance id of job of interest
* @param theStates states of interest
* @return a stream for the workchunks
*/
Stream<WorkChunkMetadata> streamAllWorkChunkMetadataForJobInStates(
String theInstanceId, Set<WorkChunkStatusEnum> theStates);
/**
* Callback to update a JobInstance within a locked transaction.
* Return true from the callback if the record write should continue, or false if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
Expand All @@ -59,7 +60,10 @@ public abstract class BaseBatch2Config {
IChannelFactory myChannelFactory;

@Autowired
IHapiTransactionService myHapiTransactionService;
IHapiTransactionService myIHapiTransactionService;

@Autowired
HapiTransactionService myHapiTransactionService;

@Bean
public JobDefinitionRegistry batch2JobDefinitionRegistry() {
Expand All @@ -68,7 +72,7 @@ public JobDefinitionRegistry batch2JobDefinitionRegistry() {

@Bean
public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) {
return new WorkChunkProcessor(myPersistence, theBatchJobSender, myHapiTransactionService);
return new WorkChunkProcessor(myPersistence, theBatchJobSender, myIHapiTransactionService);
}

@Bean
Expand All @@ -81,16 +85,15 @@ public IJobCoordinator batch2JobCoordinator(
JobDefinitionRegistry theJobDefinitionRegistry,
BatchJobSender theBatchJobSender,
WorkChunkProcessor theExecutor,
IJobMaintenanceService theJobMaintenanceService,
IHapiTransactionService theTransactionService) {
IJobMaintenanceService theJobMaintenanceService) {
return new JobCoordinatorImpl(
theBatchJobSender,
batch2ProcessingChannelReceiver(myChannelFactory),
myPersistence,
theJobDefinitionRegistry,
theExecutor,
theJobMaintenanceService,
theTransactionService);
myIHapiTransactionService);
}

@Bean
Expand All @@ -108,15 +111,17 @@ public IJobMaintenanceService batch2JobMaintenanceService(
JpaStorageSettings theStorageSettings,
BatchJobSender theBatchJobSender,
WorkChunkProcessor theExecutor,
IReductionStepExecutorService theReductionStepExecutorService) {
IReductionStepExecutorService theReductionStepExecutorService,
HapiTransactionService theHapiTransactionService) {
return new JobMaintenanceServiceImpl(
theSchedulerService,
myPersistence,
theStorageSettings,
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor,
theReductionStepExecutorService);
theReductionStepExecutorService,
myHapiTransactionService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,51 +32,52 @@
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class JobInstanceProcessor {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;

// 10k; we want to get as many as we can
private static final int WORK_CHUNK_METADATA_BATCH_SIZE = 10000;
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final String myInstanceId;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
private final WorkChunkProcessingService myWorkChunkProcessingService;
private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
private final IReductionStepExecutorService myReductionStepExecutorService;
private final String myInstanceId;
private final JobDefinitionRegistry myJobDefinitionegistry;
private final HapiTransactionService myTxService;

public JobInstanceProcessor(
IJobPersistence theJobPersistence,
BatchJobSender theBatchJobSender,
String theInstanceId,
JobChunkProgressAccumulator theProgressAccumulator,
IReductionStepExecutorService theReductionStepExecutorService,
JobDefinitionRegistry theJobDefinitionRegistry) {
JobDefinitionRegistry theJobDefinitionRegistry,
HapiTransactionService theTxService) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myInstanceId = theInstanceId;
myProgressAccumulator = theProgressAccumulator;
myReductionStepExecutorService = theReductionStepExecutorService;
myJobDefinitionegistry = theJobDefinitionRegistry;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myJobInstanceProgressCalculator =
new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry);
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
myTxService = theTxService;
myWorkChunkProcessingService = new WorkChunkProcessingService(theTxService);
}

public void process() {
Expand All @@ -95,7 +96,7 @@ public void process() {
}

JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
myJobDefinitionRegistry.getJobDefinitionOrThrowException(theInstance);

// move POLL_WAITING -> READY
processPollingChunks(theInstance.getInstanceId());
Expand Down Expand Up @@ -124,7 +125,7 @@ public void process() {
}

// enqueue all READY chunks
enqueueReadyChunks(theInstance, jobDefinition);
processReadyChunks(theInstance, jobDefinition);

ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}
Expand All @@ -144,6 +145,52 @@ private boolean handleCancellation(JobInstance theInstance) {
return false;
}

public void processReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
AtomicInteger counter = new AtomicInteger(0);
try (final Stream<WorkChunkMetadata> chunks =
myWorkChunkProcessingService.getReadyChunks(theJobInstance.getInstanceId())) {
chunks.forEach(chunk -> {
updateChunkAndSendToQueue(chunk);
counter.incrementAndGet();
});
}
ourLog.debug(
"Encountered {} READY work chunks for job {} of type {}",
counter,
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
}

private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
if (updated == 1) {
sendNotification(theChunk);
} else {
// means the work chunk is likely already gone...
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.",
theChunk.getInstanceId(),
theChunk.getId(),
updated);
}
});
}

private void sendNotification(WorkChunkMetadata theChunk) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theChunk.getJobDefinitionId(),
theChunk.getJobDefinitionVersion(),
theChunk.getInstanceId(),
theChunk.getTargetStepId(),
theChunk.getId());
myBatchJobSender.sendWorkChannelMessage(workNotification);
}

private String buildCancelledMessage(JobInstance theInstance) {
String msg = "Job instance cancelled";
if (theInstance.hasGatedStep()) {
Expand Down Expand Up @@ -266,17 +313,6 @@ private boolean canAdvanceGatedJob(JobInstance theInstance) {
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}

protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> {
Pageable pageable = Pageable.ofSize(batchsize).withPage(index);
Page<WorkChunkMetadata> results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY));
for (WorkChunkMetadata metadata : results) {
consumer.accept(metadata);
}
});
}

/**
* Trigger the reduction step for the given job instance. Reduction step chunks should never be queued.
*/
Expand All @@ -291,70 +327,6 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor<?, ?, ?
* We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
* for processing.
*/
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();

int counter = 0;
while (iter.hasNext()) {
WorkChunkMetadata metadata = iter.next();

/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
*/
updateChunkAndSendToQueue(metadata);
counter++;
}
ourLog.debug(
"Encountered {} READY work chunks for job {} of type {}",
counter,
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
}

/**
* Updates the Work Chunk and sends it to the queue.
*
* Because ReductionSteps are done inline by the maintenance pass,
* those will not be sent to the queue (but they will still have their
* status updated from READY -> QUEUED).
*
* Returns true after processing.
*/
private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) {
String chunkId = theChunk.getId();
myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
ourLog.info("Updated {} workchunk with id {}", updated, chunkId);
if (updated == 1) {
sendNotification(theChunk);
} else {
// means the work chunk is likely already gone...
// we'll log and skip it. If it's still in the DB, the next pass
// will pick it up. Otherwise, it's no longer important
ourLog.error(
"Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.",
theChunk.getInstanceId(),
theChunk.getId(),
updated);
}
});
}

private void sendNotification(WorkChunkMetadata theChunk) {
// send to the queue
// we use current step id because it has not been moved to the next step (yet)
JobWorkNotification workNotification = new JobWorkNotification(
theChunk.getJobDefinitionId(),
theChunk.getJobDefinitionVersion(),
theChunk.getInstanceId(),
theChunk.getTargetStepId(),
theChunk.getId());
myBatchJobSender.sendWorkChannelMessage(workNotification);
}

private void processChunksForNextGatedSteps(
JobInstance theInstance, JobDefinition<?> theJobDefinition, String nextStepId) {
String instanceId = theInstance.getInstanceId();
Expand Down
Loading
Loading