From f29d2effacdca645e0523f7ee5b0e3331a30a11d Mon Sep 17 00:00:00 2001 From: Sven F <9976560+sven1103@users.noreply.github.com> Date: Wed, 15 May 2024 09:56:31 +0200 Subject: [PATCH 1/2] Enable thread amount config and pattern validation (#8) The number of threads per process are now configurable via environment variables, and the QBiC measurement ID is now format validated in the registration request to fail early. --- README.md | 19 ++++++++------- .../life/qbic/data/processing/AppConfig.java | 16 ++++++------- .../qbic/data/processing/GlobalConfig.java | 13 ++++++++++- .../config/EvaluationWorkersConfig.java | 11 +-------- .../evaluation/EvaluationConfiguration.java | 10 -------- .../evaluation/EvaluationRequest.java | 9 +------- .../processing/registration/ErrorCode.java | 2 +- .../ProcessRegistrationRequest.java | 23 +++++++++++++++++++ src/main/resources/application.properties | 8 +++---- 9 files changed, 60 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 8d8ef2c..9c6d3a5 100644 --- a/README.md +++ b/README.md @@ -157,12 +157,14 @@ perform checksum validation. Feel free to use it as template for subsequent proc ### Evaluation -Last but not least, this step looks for any present QBiC measurement ID in the dataset name. If none -is given, the registration cannot be executed. +Last but not least, this step validates the QBiC measurement ID via a [configurable](#evaluation-step-config) regex pattern. -In this case the process moves the task directory into the user's home error folder. After the user -has -provided a valid QBiC measurement id, they can move the dataset into registration again. +In case of invalid measurement ID formats, the process moves the task directory into the user's home error folder. +After the user has provided a valid QBiC measurement id, they can move the dataset into registration again. + +In case of a successful ID validation, the dataset will be moved to the configured destination folder. +If multiple destination folders are provided in the [configuration](#evaluation-step-config), the assignment of the next target directory is based +on a round-robin approach, to balance any downstream task load (e.g. openBIS dropbox registration). ## Configuration @@ -218,7 +220,8 @@ finished tasks are moved to after successful operation. #---------------- # Settings for the registration worker threads #---------------- -registration.threads=2 +registration.threads=${REGISTRATION_THREADS:2} +registration.metadata.filename=metadata.txt registration.working.dir=${WORKING_DIR:} registration.target.dir=${PROCESSING_DIR:} ``` @@ -233,7 +236,7 @@ finished tasks are moved to after successful operation. # Settings for the 1. processing step # Proper packaging and provenance data, some simple checks #------------------------------------ -processing.threads=2 +processing.threads=${PROCESSING_THREADS:2} processing.working.dir=${PROCESSING_DIR} processing.target.dir=${EVALUATION_DIR} ``` @@ -248,7 +251,7 @@ finished tasks are moved to after successful operation. # Setting for the 2. processing step: # Measurement ID evaluation # --------------------------------- -evaluations.threads=2 +evaluations.threads=${EVALUATION_THREADS:2} evaluation.working.dir=${EVALUATION_DIR} # Define one or more target directories here # Example single target dir: diff --git a/src/main/java/life/qbic/data/processing/AppConfig.java b/src/main/java/life/qbic/data/processing/AppConfig.java index c9f6b32..d5f357e 100644 --- a/src/main/java/life/qbic/data/processing/AppConfig.java +++ b/src/main/java/life/qbic/data/processing/AppConfig.java @@ -52,20 +52,17 @@ RegistrationConfiguration registrationConfiguration( @Bean EvaluationWorkersConfig evaluationWorkersConfig( - @Value("${evaluations.threads}") int amountOfWorkers, + @Value("${evaluation.threads}") int amountOfWorkers, @Value("${evaluation.working.dir}") String workingDirectory, - @Value("${evaluation.target.dirs}") String[] targetDirectory, - @Value("${evaluation.measurement-id.pattern}") String measurementIdPattern) { - return new EvaluationWorkersConfig(amountOfWorkers, workingDirectory, - measurementIdPattern, Arrays.stream(targetDirectory).toList()); + @Value("${evaluation.target.dirs}") String[] targetDirectory) { + return new EvaluationWorkersConfig(amountOfWorkers, workingDirectory, Arrays.stream(targetDirectory).toList()); } @Bean EvaluationConfiguration evaluationConfiguration(EvaluationWorkersConfig evaluationWorkersConfig, GlobalConfig globalConfig) { return new EvaluationConfiguration(evaluationWorkersConfig.workingDirectory().toString(), - evaluationWorkersConfig.targetDirectories(), - evaluationWorkersConfig.measurementIdPattern().toString(), globalConfig); + evaluationWorkersConfig.targetDirectories(), globalConfig); } @Bean @@ -86,8 +83,9 @@ ProcessingConfiguration processingConfiguration(ProcessingWorkersConfig processi @Bean GlobalConfig globalConfig( @Value("${users.error.directory.name}") String usersErrorDirectoryName, - @Value("${users.registration.directory.name}") String usersRegistrationDirectoryName) { - return new GlobalConfig(usersErrorDirectoryName, usersRegistrationDirectoryName); + @Value("${users.registration.directory.name}") String usersRegistrationDirectoryName, + @Value("${qbic.measurement-id.pattern}") String measurementIdPattern) { + return new GlobalConfig(usersErrorDirectoryName, usersRegistrationDirectoryName, measurementIdPattern); } } diff --git a/src/main/java/life/qbic/data/processing/GlobalConfig.java b/src/main/java/life/qbic/data/processing/GlobalConfig.java index 9bf90ea..44c2441 100644 --- a/src/main/java/life/qbic/data/processing/GlobalConfig.java +++ b/src/main/java/life/qbic/data/processing/GlobalConfig.java @@ -2,6 +2,7 @@ import java.nio.file.Path; import java.nio.file.Paths; +import java.util.regex.Pattern; public class GlobalConfig { @@ -9,15 +10,21 @@ public class GlobalConfig { private final Path usersDirectoryRegistrationName; - public GlobalConfig(String usersErrorDirectoryName, String usersRegistrationDirectoryName) { + private final Pattern qbicMeasurementIdPattern; + + public GlobalConfig(String usersErrorDirectoryName, String usersRegistrationDirectoryName, String qbicMeasurementIdPattern) { if (usersErrorDirectoryName == null || usersErrorDirectoryName.isBlank()) { throw new IllegalArgumentException("usersErrorDirectoryName cannot be null or empty"); } if (usersRegistrationDirectoryName == null || usersRegistrationDirectoryName.isBlank()) { throw new IllegalArgumentException("usersRegistrationDirectoryName cannot be null or empty"); } + if (qbicMeasurementIdPattern == null || qbicMeasurementIdPattern.isBlank()) { + throw new IllegalArgumentException("qbicMeasurementIdPattern cannot be null or empty"); + } this.usersErrorDirectoryName = Paths.get(usersErrorDirectoryName); this.usersDirectoryRegistrationName = Paths.get(usersRegistrationDirectoryName); + this.qbicMeasurementIdPattern = Pattern.compile(qbicMeasurementIdPattern); } public Path usersErrorDirectory() { @@ -28,4 +35,8 @@ public Path usersDirectoryRegistration() { return this.usersDirectoryRegistrationName; } + public Pattern qbicMeasurementIdPattern() { + return this.qbicMeasurementIdPattern; + } + } diff --git a/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java b/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java index a675921..ae2b77f 100644 --- a/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java +++ b/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java @@ -10,9 +10,8 @@ public class EvaluationWorkersConfig { private final int threads; private final Path workingDirectory; private final Collection targetDirectories; - private final Pattern measurementIdPattern; - public EvaluationWorkersConfig(int threads, String workingDirectory, String measurementIdPattern, + public EvaluationWorkersConfig(int threads, String workingDirectory, Collection targetDirectories) { if (threads < 1) { throw new IllegalArgumentException( @@ -32,10 +31,6 @@ public EvaluationWorkersConfig(int threads, String workingDirectory, String meas throw new IllegalArgumentException( "Evaluation target directory '%s' does not exist".formatted(path)); }); - if (measurementIdPattern.isBlank()) { - throw new IllegalArgumentException("Measurement id pattern cannot be blank"); - } - this.measurementIdPattern = Pattern.compile(measurementIdPattern); } public int threads() { @@ -49,8 +44,4 @@ public Path workingDirectory() { public Collection targetDirectories() { return targetDirectories; } - - public Pattern measurementIdPattern() { - return measurementIdPattern; - } } diff --git a/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java b/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java index 1ab8c58..6ccfdbb 100644 --- a/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java +++ b/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java @@ -18,12 +18,10 @@ public class EvaluationConfiguration { private final Path workingDirectory; private final Collection targetDirectories; - private final Pattern measurementIdPattern; private final Path usersErrorDirectory; private final RoundRobinDraw targetDirectoriesRoundRobinDraw; public EvaluationConfiguration(String workingDirectory, Collection targetDirectories, - String measurementIdPattern, GlobalConfig globalConfig) { this.workingDirectory = Paths.get(workingDirectory); if (!this.workingDirectory.toFile().exists()) { @@ -35,11 +33,7 @@ public EvaluationConfiguration(String workingDirectory, Collection targetD "Evaluation target directory '%s' does not exist".formatted(path)); }); this.targetDirectoriesRoundRobinDraw = RoundRobinDraw.create(targetDirectories); - if (measurementIdPattern.isBlank()) { - throw new IllegalArgumentException("Measurement id pattern cannot be blank"); - } this.usersErrorDirectory = globalConfig.usersErrorDirectory(); - this.measurementIdPattern = Pattern.compile(measurementIdPattern); } public Path workingDirectory() { @@ -50,10 +44,6 @@ public RoundRobinDraw targetDirectories() { return targetDirectoriesRoundRobinDraw; } - public Pattern measurementIdPattern() { - return measurementIdPattern; - } - public Path usersErrorDirectory() { return usersErrorDirectory; } diff --git a/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java b/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java index 2753d8d..f75254d 100644 --- a/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java +++ b/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java @@ -50,17 +50,15 @@ public class EvaluationRequest extends Thread { private final AtomicBoolean active = new AtomicBoolean(false); private final AtomicBoolean terminated = new AtomicBoolean(false); private final Path workingDirectory; - private final Pattern measurementIdPattern; private final Path usersErrorDirectory; private final RoundRobinDraw targetDirectories; private Path assignedTargetDirectory; public EvaluationRequest(Path workingDirectory, RoundRobinDraw targetDirectories, - Pattern measurementIdPattern, Path usersErrorDirectory) { + Path usersErrorDirectory) { this.setName(THREAD_NAME.formatted(nextThreadNumber())); this.workingDirectory = workingDirectory; this.targetDirectories = targetDirectories; - this.measurementIdPattern = measurementIdPattern; if (!workingDirectory.resolve(INTERVENTION_DIRECTORY).toFile().mkdir() && !workingDirectory.resolve( INTERVENTION_DIRECTORY).toFile().exists()) { @@ -73,7 +71,6 @@ public EvaluationRequest(Path workingDirectory, RoundRobinDraw targetDirec public EvaluationRequest(EvaluationConfiguration evaluationConfiguration) { this(evaluationConfiguration.workingDirectory(), evaluationConfiguration.targetDirectories(), - evaluationConfiguration.measurementIdPattern(), evaluationConfiguration.usersErrorDirectory()); } @@ -185,10 +182,6 @@ private boolean createMarkerFile(Path targetDirectory, String name) throws IOExc return targetDirectory.resolve(markerFileName).toFile().createNewFile(); } - private Optional findDataset(File taskDir) { - return Arrays.stream(taskDir.listFiles()).filter(File::isDirectory).findFirst(); - } - private void moveToSystemIntervention(File taskDir, String reason) { try { var errorFile = taskDir.toPath().resolve("error.txt").toFile(); diff --git a/src/main/java/life/qbic/data/processing/registration/ErrorCode.java b/src/main/java/life/qbic/data/processing/registration/ErrorCode.java index cbf7726..5821ca2 100644 --- a/src/main/java/life/qbic/data/processing/registration/ErrorCode.java +++ b/src/main/java/life/qbic/data/processing/registration/ErrorCode.java @@ -9,5 +9,5 @@ */ public enum ErrorCode { METADATA_FILE_NOT_FOUND, - INCOMPLETE_METADATA, FILE_NOT_FOUND, MISSING_FILE_ENTRY, IO_EXCEPTION + INCOMPLETE_METADATA, FILE_NOT_FOUND, MISSING_FILE_ENTRY, INVALID_MEASUREMENT_ID_FORMAT, IO_EXCEPTION } diff --git a/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java b/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java index 8a064b6..5d5c0c4 100644 --- a/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java +++ b/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java @@ -17,6 +17,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import java.util.stream.Collectors; import life.qbic.data.processing.ConcurrentRegistrationQueue; import life.qbic.data.processing.GlobalConfig; @@ -53,6 +54,7 @@ public class ProcessRegistrationRequest extends Thread { private final Path targetDirectory; private final String metadataFileName; private final Path userErrorDirectory; + private final Pattern measurementIdPattern; private AtomicBoolean active = new AtomicBoolean(false); public ProcessRegistrationRequest(@NonNull ConcurrentRegistrationQueue registrationQueue, @@ -63,6 +65,7 @@ public ProcessRegistrationRequest(@NonNull ConcurrentRegistrationQueue registrat this.targetDirectory = configuration.targetDirectory(); this.metadataFileName = configuration.metadataFileName(); this.userErrorDirectory = globalConfig.usersErrorDirectory(); + this.measurementIdPattern = globalConfig.qbicMeasurementIdPattern(); } private static int nextThreadNumber() { @@ -221,6 +224,8 @@ public void run() { var registrationMetadata = findAndParseMetadata(workingTargetDir); validateFileEntries(registrationMetadata, workingTargetDir); + validateMeasurementIds(registrationMetadata); + var aggregatedFilesByMeasurementId = registrationMetadata.stream().collect( Collectors.groupingBy(RegistrationMetadata::measurementId)); @@ -244,6 +249,24 @@ public void run() { } } + private void validateMeasurementIds(List registrationMetadata) + throws ValidationException { + registrationMetadata.stream().map(RegistrationMetadata::measurementId) + .filter(this::isMeasurementIdInvalid).findAny().ifPresent(invalidEntry -> { + throw new ValidationException( + "Invalid measurement ID format found: %s".formatted(invalidEntry), + ErrorCode.INVALID_MEASUREMENT_ID_FORMAT); + }); + } + + private boolean isMeasurementIdInvalid(String measurementId) { + return !isMeasurementIdValid(measurementId); + } + + private boolean isMeasurementIdValid(String measurementId) { + return measurementIdPattern.matcher(measurementId).matches(); + } + private void processAll(Map> aggregatedFilesByMeasurementId, Path workingTargetDir, RegistrationRequest request) throws IOException { for (String measurementId : aggregatedFilesByMeasurementId.keySet()) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 29d5e52..cb4926d 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -13,6 +13,7 @@ users.error.directory.name=error # Needs to be present in the users' home folders # e.g. /home//registration users.registration.directory.name=registration +qbic.measurement-id.pattern=^(MS|NGS)Q[A-Z0-9]{4}[0-9]{3}[A-Z0-9]{2}-[0-9]* #-------------------------------------- # Settings for the data scanning thread @@ -30,7 +31,7 @@ scanner.interval=1000 #---------------- # Settings for the registration worker threads #---------------- -registration.threads=2 +registration.threads=${REGISTRATION_THREADS:2} registration.metadata.filename=metadata.txt registration.working.dir=${WORKING_DIR:} registration.target.dir=${PROCESSING_DIR:} @@ -39,7 +40,7 @@ registration.target.dir=${PROCESSING_DIR:} # Settings for the 1. processing step # Proper packaging and provenance data, some simple checks #------------------------------------ -processing.threads=2 +processing.threads=${PROCESSING_THREADS:2} processing.working.dir=${PROCESSING_DIR} processing.target.dir=${EVALUATION_DIR} @@ -47,7 +48,7 @@ processing.target.dir=${EVALUATION_DIR} # Setting for the 2. processing step: # Measurement ID evaluation # --------------------------------- -evaluations.threads=2 +evaluation.threads=${EVALUATION_THREADS:2} evaluation.working.dir=${EVALUATION_DIR} # Define one or more target directories here # Example single target dir: @@ -55,7 +56,6 @@ evaluation.working.dir=${EVALUATION_DIR} # Example multiple target dir: # evaluation.target.dirs=/my/example/target/dir1,/my/example/target/dir2,/my/example/target/dir3 evaluation.target.dirs=${OPENBIS_ETL_DIRS} -evaluation.measurement-id.pattern=^(MS|NGS)Q[A-Z0-9]{4}[0-9]{3}[A-Z0-9]{2}-[0-9]* # ---------------- # Logging settings From 998704cba7ed58ee242364a66a11d75fa29b0a09 Mon Sep 17 00:00:00 2001 From: Sven F <9976560+sven1103@users.noreply.github.com> Date: Wed, 15 May 2024 09:58:14 +0200 Subject: [PATCH 2/2] Provide task ID in provenance file (#9) To facilitate a simplified access on the assigned task id of a dataset registration, the ID is now written into the provenance file. --- README.md | 8 +++++++- src/main/java/life/qbic/data/processing/Provenance.java | 3 +++ .../registration/ProcessRegistrationRequest.java | 1 + 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9c6d3a5..11703f0 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,9 @@ Here is an example of the provenance file: { "origin": "/Users/myuser/registration", "user": "/Users/myuser", - "measurementId": "QTEST001AE-1234512312", + "measurementId": "NGSQTEST001AE-1234512312", + "datasetFiles" : [ "file1_1.fastq.gz", "file1_2.fastq.gz" ], + "taskId": "74c5d26f-b756-42c3-b6f4-2b4825670a2d", "history": [ "/opt/scanner-app/scanner-processing-dir/74c5d26f-b756-42c3-b6f4-2b4825670a2d" ] @@ -146,6 +148,10 @@ Here is an example of the provenance file: > > `measurementId`: any valid QBiC measurement ID that has been found in the dataset (this might > be `null`) in case the evaluation has not been done yet. +> +> `datasetFiles`: a list of relative file paths belonging to the dataset +> +> `taskId`: a unique task ID assigned by the application to track the dataset registration progress > > `history`: a list of history items, which steps have been performed. The list is ordered by first > processing steps being at the start and the latest at the end. diff --git a/src/main/java/life/qbic/data/processing/Provenance.java b/src/main/java/life/qbic/data/processing/Provenance.java index 10e6a6c..11d4916 100644 --- a/src/main/java/life/qbic/data/processing/Provenance.java +++ b/src/main/java/life/qbic/data/processing/Provenance.java @@ -47,6 +47,9 @@ public class Provenance { @JsonProperty("datasetFiles") public List datasetFiles; + @JsonProperty("taskId") + public String taskId; + /** * A list of ordered processing folder stops the dataset has traversed and passed successfully. *

diff --git a/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java b/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java index 5d5c0c4..b214e19 100644 --- a/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java +++ b/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java @@ -182,6 +182,7 @@ private void writeProvenanceInformation(Path taskDir, Path newLocation, provenance.history.add(newLocation.toString()); provenance.userWorkDirectoryPath = String.valueOf(request.userPath()); provenance.qbicMeasurementID = measurementId; + provenance.taskId = taskDir.toFile().getName(); provenance.addDatasetFiles(datasetFiles); ObjectMapper mapper = new ObjectMapper(); mapper.writerWithDefaultPrettyPrinter()