From 3d305475f31d72048a55eb2df80d38070ba7c932 Mon Sep 17 00:00:00 2001 From: "Moritz E. Beber" Date: Wed, 27 Mar 2024 17:40:06 +0100 Subject: [PATCH] refactor: extraction function to group profiles --- .../local/standardisation_profiles.nf | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/subworkflows/local/standardisation_profiles.nf b/subworkflows/local/standardisation_profiles.nf index 1f121b98..c005ecd3 100644 --- a/subworkflows/local/standardisation_profiles.nf +++ b/subworkflows/local/standardisation_profiles.nf @@ -14,6 +14,21 @@ include { GANON_TABLE // Custom Functions +/** +* Group all profiles per reference database. +* +* @param ch_profiles A channel containing pairs of a meta map and the report of +* a given profiler, where meta must contain a key `db_name`. +* @return A channel with one element per reference database. Each element is a +* pair of a meta map with an `id` key and all corresponding profiles. +*/ +def groupProfiles(ch_profiles, groupTupleOptions = [:]) { + return ch_profiles + .map { meta, profile -> [meta.db_name, profile] } + .groupTuple(groupTupleOptions) + .map { db_name, profiles -> [[id: db_name], profiles] } +} + /** * Combine profiles with their corresponding reference database, then separate into two channels. * @@ -120,12 +135,7 @@ workflow STANDARDISATION_PROFILES { // Bracken - ch_profiles_for_bracken = ch_input_profiles.bracken - .map { [it[0]['db_name'], it[1]] } - .groupTuple() - .map { - [[id:it[0]], it[1]] - } + ch_profiles_for_bracken = groupProfiles(ch_input_profiles.bracken) BRACKEN_COMBINEBRACKENOUTPUTS ( ch_profiles_for_bracken ) @@ -134,13 +144,10 @@ workflow STANDARDISATION_PROFILES { // Collect and replace id for db_name for prefix // Have to sort by size to ensure first file actually has hits otherwise // the script fails - ch_profiles_for_centrifuge = ch_input_profiles.centrifuge - .map { [it[0]['db_name'], it[1]] } - .groupTuple(sort: {-it.size()} ) - .map { - [[id:it[0]], it[1]] - } - + ch_profiles_for_centrifuge = groupProfiles( + ch_input_profiles.centrifuge, + [sort: { -it.size() }] + ) KRAKENTOOLS_COMBINEKREPORTS_CENTRIFUGE ( ch_profiles_for_centrifuge ) ch_multiqc_files = ch_multiqc_files.mix( KRAKENTOOLS_COMBINEKREPORTS_CENTRIFUGE.out.txt ) @@ -149,12 +156,7 @@ workflow STANDARDISATION_PROFILES { // Kaiju // Collect and replace id for db_name for prefix - ch_profiles_for_kaiju = ch_input_classifications.kaiju - .map { [it[0]['db_name'], it[1]] } - .groupTuple() - .map { - [[id:it[0]], it[1]] - } + ch_profiles_for_kaiju = groupProfiles(ch_input_classifications.kaiju) ch_input_for_kaiju2tablecombine = combineProfilesWithDatabase(ch_profiles_for_kaiju, ch_input_databases.kaiju) @@ -167,16 +169,23 @@ workflow STANDARDISATION_PROFILES { // Collect and replace id for db_name for prefix // Have to sort by size to ensure first file actually has hits otherwise // the script fails - ch_profiles_for_kraken2 = ch_input_profiles.kraken2 - .map { - meta, profiles -> - def new_meta = [:] - new_meta.tool = meta.tool == 'kraken2-bracken' ? 'kraken2' : meta.tool // replace to get the right output-format description - new_meta.id = meta.tool // append so to disambiguate when we have same databases for kraken2 step of bracken, with normal bracken - new_meta.db_name = meta.tool == 'kraken2-bracken' ? "${meta.db_name}-bracken" : "${meta.db_name}" // append so to disambiguate when we have same databases for kraken2 step of bracken, with normal bracken - [ new_meta, profiles ] - } - .groupTuple(sort: {-it.size()}) + ch_profiles_for_kraken2 = groupProfiles( + ch_input_profiles.kraken2.map { meta, profile -> + def new_meta = [ + // Replace the tool name to get the right output-format description. + tool: meta.tool == 'kraken2-bracken' ? 'kraken2' : meta.tool, + // Append so as to disambiguate when we have same databases + // for kraken2 step of bracken, with normal bracken. + id: meta.tool, + // Append so as to disambiguate when we have same databases + // for kraken2 step of bracken, with normal bracken. + db_name: meta.tool == 'kraken2-bracken' ? "${meta.db_name}-bracken" : "${meta.db_name}" + + ] + return [meta + new_meta, profile] + }, + [sort: { -it.size() }] + ) KRAKENTOOLS_COMBINEKREPORTS_KRAKEN ( ch_profiles_for_kraken2 ) ch_multiqc_files = ch_multiqc_files.mix( KRAKENTOOLS_COMBINEKREPORTS_KRAKEN.out.txt ) @@ -184,12 +193,7 @@ workflow STANDARDISATION_PROFILES { // MetaPhlAn - ch_profiles_for_metaphlan = ch_input_profiles.metaphlan - .map { [it[0]['db_name'], it[1]] } - .groupTuple() - .map { - [[id:it[0]], it[1]] - } + ch_profiles_for_metaphlan = groupProfiles(ch_input_profiles.metaphlan) METAPHLAN_MERGEMETAPHLANTABLES ( ch_profiles_for_metaphlan ) ch_multiqc_files = ch_multiqc_files.mix( METAPHLAN_MERGEMETAPHLANTABLES.out.txt ) @@ -201,12 +205,7 @@ workflow STANDARDISATION_PROFILES { // Therefore removing db info here, and publish merged at root mOTUs results // directory - ch_profiles_for_motus = ch_input_profiles.motus - .map { [it[0]['db_name'], it[1]] } - .groupTuple() - .map { - [[id:it[0]], it[1]] - } + ch_profiles_for_motus = groupProfiles(ch_input_profiles.motus) ch_input_for_motusmerge = combineProfilesWithDatabase(ch_profiles_for_motus, ch_input_databases.motus) @@ -215,12 +214,7 @@ workflow STANDARDISATION_PROFILES { // Ganon - ch_profiles_for_ganon = ch_input_profiles.ganon - .map { [it[0]['db_name'], it[1]] } - .groupTuple() - .map { - [[id:it[0]], it[1]] - } + ch_profiles_for_ganon = groupProfiles(ch_input_profiles.ganon) GANON_TABLE ( ch_profiles_for_ganon ) ch_multiqc_files = ch_multiqc_files.mix( GANON_TABLE.out.txt )