Skip to content

Commit

Permalink
Merge pull request #85 from medizininformatik-initiative/refactor-req…
Browse files Browse the repository at this point in the history
…uest-process

Refactor request process to utilize process parallization of the camunda process engine
  • Loading branch information
EmteZogaf authored Dec 5, 2023
2 parents 18e2fe2 + 944761d commit 8c5dd03
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 350 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package de.medizininformatik_initiative.feasibility_dsf_process.client.listener;

import dev.dsf.bpe.v1.ProcessPluginApi;
import dev.dsf.bpe.v1.constants.BpmnExecutionVariables;
import dev.dsf.bpe.v1.variables.Target;
import dev.dsf.bpe.v1.variables.Variables;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.springframework.beans.factory.InitializingBean;

import java.util.Objects;

public class SetCorrelationKeyListener implements ExecutionListener, InitializingBean
{
private final ProcessPluginApi api;

public SetCorrelationKeyListener(ProcessPluginApi api) {
this.api = api;
}

@Override
public void afterPropertiesSet() throws Exception {
Objects.requireNonNull(api, "api");
}

@Override
public void notify(DelegateExecution execution) throws Exception {
Variables variables = api.getVariables(execution);
Target target = variables.getTarget();

execution.setVariableLocal(BpmnExecutionVariables.CORRELATION_KEY, target.getCorrelationKey());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package de.medizininformatik_initiative.feasibility_dsf_process.message;

import dev.dsf.bpe.v1.ProcessPluginApi;
import dev.dsf.bpe.v1.activity.AbstractTaskMessageSend;
import dev.dsf.bpe.v1.variables.Variables;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.Task;
import org.hl7.fhir.r4.model.Task.ParameterComponent;

import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkNotNull;
import static de.medizininformatik_initiative.feasibility_dsf_process.variables.ConstantsFeasibility.CODESYSTEM_FEASIBILITY;
import static de.medizininformatik_initiative.feasibility_dsf_process.variables.ConstantsFeasibility.CODESYSTEM_FEASIBILITY_VALUE_MEASURE_REFERENCE;

public class SendDicRequest extends AbstractTaskMessageSend {

public SendDicRequest(ProcessPluginApi api) {
super(api);
}

@Override
protected Stream<ParameterComponent> getAdditionalInputParameters(DelegateExecution execution,
Variables variables) {
return Stream.of(api.getTaskHelper().createInput(
new Reference(checkNotNull(variables.getString("measure-id"), "variable 'measure-id' not set")),
CODESYSTEM_FEASIBILITY, CODESYSTEM_FEASIBILITY_VALUE_MEASURE_REFERENCE));
}

@Override
protected void handleIntermediateThrowEventError(DelegateExecution execution, Variables variables,
Exception exception, String errorMessage) {
execution.setVariableLocal("sendError", true);
}

@Override
protected void addErrorMessage(Task task, String errorMessage) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected void doExecute(DelegateExecution execution, Variables variables) {
FhirWebserviceClient client = clientProvider
.getWebserviceClientByReference(measureReportId);
MeasureReport measureReport = downloadMeasureReport(client, measureReportId);
execution.setVariable(VARIABLE_MEASURE_REPORT, measureReport);
execution.setVariableLocal(VARIABLE_MEASURE_REPORT, measureReport);
}

private MeasureReport downloadMeasureReport(FhirWebserviceClient client, IdType measureReportId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package de.medizininformatik_initiative.feasibility_dsf_process.service;

import dev.dsf.bpe.v1.ProcessPluginApi;
import dev.dsf.bpe.v1.activity.AbstractServiceDelegate;
import dev.dsf.bpe.v1.variables.Target;
import dev.dsf.bpe.v1.variables.Variables;
import org.camunda.bpm.engine.delegate.BpmnError;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogReceiveTimeout extends AbstractServiceDelegate {

private static final Logger logger = LoggerFactory.getLogger(LogReceiveTimeout.class);

public LogReceiveTimeout(ProcessPluginApi api) {
super(api);
}

@Override
protected void doExecute(DelegateExecution execution, Variables variables) throws BpmnError, Exception {
Target target = variables.getTarget();
logger.warn("Timeout while waiting for result from {} (endpoint url: {}).",
target.getOrganizationIdentifierValue(),
target.getEndpointUrl());
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public StoreLiveResult(ProcessPluginApi api) {
protected void doExecute(DelegateExecution execution, Variables variables) {
Task task = variables.getLatestTask();

MeasureReport measureReport = variables.getResource(VARIABLE_MEASURE_REPORT);
MeasureReport measureReport = (MeasureReport) execution.getVariableLocal(VARIABLE_MEASURE_REPORT);
addReadAccessTag(measureReport);

MeasureReport storedMeasureReport = storeMeasureReport(measureReport);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.medizininformatik_initiative.feasibility_dsf_process.spring.config;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import de.medizininformatik_initiative.feasibility_dsf_process.EnhancedFhirWebserviceClientProvider;
import de.medizininformatik_initiative.feasibility_dsf_process.EnhancedFhirWebserviceClientProviderImpl;
Expand All @@ -10,18 +11,20 @@
import de.medizininformatik_initiative.feasibility_dsf_process.Obfuscator;
import de.medizininformatik_initiative.feasibility_dsf_process.RateLimit;
import de.medizininformatik_initiative.feasibility_dsf_process.client.flare.FlareWebserviceClient;
import de.medizininformatik_initiative.feasibility_dsf_process.client.listener.SetCorrelationKeyListener;
import de.medizininformatik_initiative.feasibility_dsf_process.message.SendDicRequest;
import de.medizininformatik_initiative.feasibility_dsf_process.message.SendDicResponse;
import de.medizininformatik_initiative.feasibility_dsf_process.service.DownloadFeasibilityResources;
import de.medizininformatik_initiative.feasibility_dsf_process.service.DownloadMeasureReport;
import de.medizininformatik_initiative.feasibility_dsf_process.service.EvaluateCqlMeasure;
import de.medizininformatik_initiative.feasibility_dsf_process.service.EvaluateRequestRate;
import de.medizininformatik_initiative.feasibility_dsf_process.service.EvaluateStructuredQueryMeasure;
import de.medizininformatik_initiative.feasibility_dsf_process.service.FeasibilityResourceCleaner;
import de.medizininformatik_initiative.feasibility_dsf_process.service.LogReceiveTimeout;
import de.medizininformatik_initiative.feasibility_dsf_process.service.ObfuscateEvaluationResult;
import de.medizininformatik_initiative.feasibility_dsf_process.service.RateLimitExceededTaskRejecter;
import de.medizininformatik_initiative.feasibility_dsf_process.service.SelectRequestTargets;
import de.medizininformatik_initiative.feasibility_dsf_process.service.SelectResponseTarget;
import de.medizininformatik_initiative.feasibility_dsf_process.service.SendDicRequests;
import de.medizininformatik_initiative.feasibility_dsf_process.service.SetupEvaluationSettings;
import de.medizininformatik_initiative.feasibility_dsf_process.service.StoreFeasibilityResources;
import de.medizininformatik_initiative.feasibility_dsf_process.service.StoreLiveResult;
Expand All @@ -35,22 +38,23 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.util.concurrent.ForkJoinPool;

@Configuration
public class FeasibilityConfig {

private final IGenericClient storeClient;

@Autowired private final FhirContext fhirContext;
@Autowired private ProcessPluginApi api;

private final EvaluationSettingsProvider evaluationSettingsProvider;
private final FlareWebserviceClient flareWebserviceClient;

public FeasibilityConfig(@Qualifier("store-client") IGenericClient storeClient,
FhirContext fhirContext,
EvaluationSettingsProvider evaluationSettingsProvider,
FlareWebserviceClient flareWebserviceClient) {
this.storeClient = storeClient;
this.fhirContext = fhirContext;
this.evaluationSettingsProvider = evaluationSettingsProvider;
this.flareWebserviceClient = flareWebserviceClient;
}
Expand Down Expand Up @@ -79,8 +83,8 @@ public SelectRequestTargets selectRequestTargets() {

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public SendDicRequests sendDicRequests(ForkJoinPool threadPool) {
return new SendDicRequests(api, threadPool);
public SendDicRequest sendDicRequests() {
return new SendDicRequest(api);
}

@Bean
Expand All @@ -95,6 +99,18 @@ public StoreLiveResult storeLiveResult() {
return new StoreLiveResult(api);
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public LogReceiveTimeout logReceiveTimeout() {
return new LogReceiveTimeout(api);
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public SetCorrelationKeyListener setCorrelationKeyListener() {
return new SetCorrelationKeyListener(api);
}

//
// process executeFeasibility implementations
//
Expand Down Expand Up @@ -167,14 +183,9 @@ public SendDicResponse sendDicResponse() {
return new SendDicResponse(api);
}

@Bean
public ForkJoinPool ioThreadPool() {
return new ForkJoinPool(8);
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public FeasibilityProcessPluginDeploymentStateListener deploymentStateListener () {
public FeasibilityProcessPluginDeploymentStateListener deploymentStateListener() {
return new FeasibilityProcessPluginDeploymentStateListener(
EvaluationStrategy
.fromStrategyRepresentation(evaluationSettingsProvider.evaluationStrategyRepresentation()),
Expand Down
Loading

0 comments on commit 8c5dd03

Please sign in to comment.