diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index f0e5260..23a5957 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -49,7 +49,6 @@ runSccAnalyses <- function(connectionDetails, computeThreads = 1) { # Get negative controls - checkmate::assertChoice(controlType, c("outcome", "exposure")) negatives <- list() for (exposureOutcome in exposureOutcomeList) { @@ -123,13 +122,18 @@ runSccAnalyses <- function(connectionDetails, resultExportPath = file.path(outputFolder, paste0("A_", refRow$analysisId)), computeThreads = computeThreads) args <- append(args, getrunSelfControlledCohortArgs) - executionArgList[[length(executionArgList) + 1]] <- args + executionArgList[[length(executionArgList) + 1]] <- list(args = args) + } + + exececuteScc <- function(params) { + sccResults <- do.call("runSelfControlledCohort", params$args) } if (length(executionArgList) != 0) { cluster <- ParallelLogger::makeCluster(analysisThreads) ParallelLogger::clusterRequire(cluster, "SelfControlledCohort") - dummy <- ParallelLogger::clusterApply(cluster, executionArgList, runSelfControlledCohort) + + dummy <- ParallelLogger::clusterApply(cluster, executionArgList, exececuteScc) ParallelLogger::stopCluster(cluster) } diff --git a/R/SelfControlledCohort.R b/R/SelfControlledCohort.R index fc2e8bc..63ab160 100644 --- a/R/SelfControlledCohort.R +++ b/R/SelfControlledCohort.R @@ -56,7 +56,6 @@ computeIrrs <- function(estimates) { } - batchComputeEstimates <- function(connection, computeThreads, resultsTable, @@ -79,12 +78,12 @@ batchComputeEstimates <- function(connection, rows <- split(rows, rep_len(1:batches, nrow(rows))) rows <- ParallelLogger::clusterApply(cluster, rows, computeIrrs, progressBar = FALSE) rows <- do.call(rbind, rows) + } - if (position == 1) { - andromeda$estimates <- rows - } else { - Andromeda::appendToTable(andromeda$estimates, rows) - } + if (position == 1) { + andromeda$estimates <- rows + } else { + Andromeda::appendToTable(andromeda$estimates, rows) } return(rows) @@ -100,6 +99,11 @@ batchComputeEstimates <- function(connection, transformFunctionArgs = args, append = FALSE) + if (is.null(andromeda$estimates)) { + ParallelLogger::logInfo("No effect estimates produced") + return(NULL) + } + if (length(negativeControlPairs) > 0) { ncPairsDf <- do.call(rbind, lapply(negativeControlPairs, function(eo) { data.frame(targetCohortId = eo[[1]], outcomeCohortId = eo[[2]]) @@ -131,7 +135,7 @@ batchComputeEstimates <- function(connection, ncPairsDf |> dplyr::group_by(.data$outcomeCohortId) |> dplyr::group_map(function(data, outcomeCohortId) { - + browser() estimates <- andromeda$estimates |> dplyr::filter(.data$outcomeCohortId == outcomeCohortId) @@ -472,7 +476,7 @@ runSelfControlledCohort <- function(connectionDetails = NULL, DatabaseConnector::executeSql(connection, sql) resultExportManager$writeManifest(packageName = utils::packageName(), - packageVersion = packageVersion(utils::packageName())) + packageVersion = packageVersion(utils::packageName())) delta <- Sys.time() - start ParallelLogger::logInfo(paste("Performing SCC analysis took", signif(delta, 3), attr(delta, "units")))