Skip to content

Commit

Permalink
Skip module execution when cohort generation fails
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonysena committed Jan 17, 2025
1 parent 9a600c0 commit 87ca609
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 22 deletions.
83 changes: 61 additions & 22 deletions R/Execution.R
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ execute <- function(analysisSpecifications,
executionStatus <- list()

# Execute the cohort generator module first if it exists
# If cohort generation fails for any reason, update the
# cohortGenerationSuccessful flag to FALSE so that subsequent
# modules are skipped
cohortGenerationSuccessful <- TRUE
for (i in 1:length(analysisSpecifications$moduleSpecifications)) {
moduleName <- analysisSpecifications$moduleSpecifications[[i]]$module
if (tolower(moduleName) == "cohortgeneratormodule") {
Expand All @@ -166,6 +170,9 @@ execute <- function(analysisSpecifications,
analysisSpecifications = analysisSpecifications,
executionSettings = executionSettings
)
# The absence of an error in moduleExecutionStatus$error
# represents a success
cohortGenerationSuccessful <- ifelse(moduleExecutionStatus$status == "SUCCESS", TRUE, FALSE)
executionStatus <- append(
executionStatus,
moduleExecutionStatus
Expand All @@ -182,8 +189,14 @@ execute <- function(analysisSpecifications,
moduleName = moduleName,
connectionDetails = connectionDetails,
analysisSpecifications = analysisSpecifications,
executionSettings = executionSettings
executionSettings = executionSettings,
skipExecution = !cohortGenerationSuccessful
)
executionStatus <- append(
executionStatus,
moduleExecutionStatus
)
} else {
executionStatus <- append(
executionStatus,
moduleExecutionStatus
Expand All @@ -194,39 +207,65 @@ execute <- function(analysisSpecifications,
# Print a summary
cli::cli_h1("EXECUTION SUMMARY")
for (i in 1:length(executionStatus)) {
status <- executionStatus[[i]]
errorMessage <- ifelse(!is.null(status$result$error), status$result$error, "")
statusMessage <- sprintf("%s %s (Execution Time: %s)", status$moduleName, errorMessage, status$executionTime)
if (!is.null(status$result$error)) {
cli::cli_alert_danger(statusMessage)
moduleStatus <- executionStatus[[i]]
errorMessage <- ifelse(moduleStatus$status == "SUCCESS", "", moduleStatus$errorMessage)
moduleStatusMessage <- sprintf("%s %s (Execution Time: %s)", moduleStatus$moduleName, errorMessage, moduleStatus$executionTime)
if (moduleStatus$status == "FAILED") {
cli::cli_alert_danger(moduleStatusMessage)
} else if (moduleStatus$status == "SKIPPED") {
cli::cli_alert_warning(moduleStatusMessage)
} else {
cli::cli_alert_success(statusMessage)
cli::cli_alert_success(moduleStatusMessage)
}
}

invisible(executionStatus)
}

.executeModule <- function(moduleName, connectionDetails, analysisSpecifications, executionSettings) {
moduleObject <- get(moduleName)$new()
safeExec <- purrr::safely(moduleObject$execute)
startTime <- Sys.time()
executionResult <- safeExec(
connectionDetails = connectionDetails,
analysisSpecifications = analysisSpecifications,
executionSettings = executionSettings
)
timeToExecute <- Sys.time() - startTime
# Emit any errors
if (!is.null(executionResult$error)) {
.printErrorMessage(executionResult$error$message)
.executeModule <- function(moduleName, connectionDetails, analysisSpecifications, executionSettings, skipExecution = FALSE) {
if (isFALSE(skipExecution)) {
moduleObject <- get(moduleName)$new()
safeExec <- purrr::safely(moduleObject$execute)
startTime <- Sys.time()
executionResult <- safeExec(
connectionDetails = connectionDetails,
analysisSpecifications = analysisSpecifications,
executionSettings = executionSettings
)
timeToExecute <- Sys.time() - startTime
# Emit any errors
status <- ifelse(is.null(executionResult$error), "SUCCESS", "FAILED")
if (status == "FAILED") {
.printErrorMessage(executionResult$error$message)
}
return(
.createModuleExecutionStatus(
moduleName = moduleName,
status = status,
errorMessage = executionResult$error$message,
executionTime = paste0(signif(timeToExecute, 3), " ", attr(timeToExecute, "units"))
)
)
} else {
return(
.createModuleExecutionStatus(
moduleName = moduleName,
status = "SKIPPED",
errorMessage = "Cohort generation failed",
executionTime = "SKIPPED"
)
)
}
}

.createModuleExecutionStatus <- function(moduleName, status, errorMessage, executionTime) {
return(
list(
list(
moduleName = moduleName,
result = executionResult,
executionTime = paste0(signif(timeToExecute, 3), " ", attr(timeToExecute, "units"))
status = status,
errorMessage = errorMessage,
executionTime = executionTime
)
)
)
Expand Down
38 changes: 38 additions & 0 deletions tests/testthat/test-Execution.R
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,41 @@ test_that("Specify subset of modules to run", {

expect_true(all(modulesExecuted %in% modulesToExecute))
})

test_that("Stop if error occurs during cohort generation", {
analysisSpecifications <- ParallelLogger::loadSettingsFromJson(
fileName = system.file("testdata/cdmModulesAnalysisSpecifications.json",
package = "Strategus"
)
)
# Add an ill-formed Circe expression to break the cohort generation process
analysisSpecifications$sharedResources[[1]]$cohortDefinitions[[6]] <- list(
cohortId = 6,
cohortName = "Failure",
cohortDefinition = "{}"
)

executionSettings <- createCdmExecutionSettings(
workDatabaseSchema = workDatabaseSchema,
cdmDatabaseSchema = cdmDatabaseSchema,
cohortTableNames = CohortGenerator::getCohortTableNames(cohortTable = "unit_test"),
workFolder = file.path(tempDir, "work_folder"),
resultsFolder = file.path(tempDir, "results_folder")
)

output <- Strategus::execute(
connectionDetails = connectionDetails,
analysisSpecifications = analysisSpecifications,
executionSettings = executionSettings
)

# Verify cohort generator failed
cohortGeneratorStatus <- sapply(output, function(x) if (x$moduleName == "CohortGeneratorModule") x$status)
cohortGeneratorStatus <- unlist(cohortGeneratorStatus[-which(sapply(cohortGeneratorStatus, is.null))])
expect_true(cohortGeneratorStatus == "FAILED")

# Verify all other modules were skipped
allOtherModuleStatuses <- sapply(output, function(x) if (x$moduleName != "CohortGeneratorModule") x$status)
allOtherModuleStatuses <- unlist(allOtherModuleStatuses)
expect_true(all(allOtherModuleStatuses == "SKIPPED"))
})

0 comments on commit 87ca609

Please sign in to comment.