Skip to content

Commit

Permalink
refactor: extraction function to group profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
Midnighter committed Apr 6, 2024
1 parent f7caedc commit 3d30547
Showing 1 changed file with 41 additions and 47 deletions.
88 changes: 41 additions & 47 deletions subworkflows/local/standardisation_profiles.nf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ include { GANON_TABLE

// Custom Functions

/**
* Group all profiles per reference database.
*
* @param ch_profiles A channel containing pairs of a meta map and the report of
* a given profiler, where meta must contain a key `db_name`.
* @return A channel with one element per reference database. Each element is a
* pair of a meta map with an `id` key and all corresponding profiles.
*/
def groupProfiles(ch_profiles, groupTupleOptions = [:]) {
return ch_profiles
.map { meta, profile -> [meta.db_name, profile] }
.groupTuple(groupTupleOptions)
.map { db_name, profiles -> [[id: db_name], profiles] }
}

/**
* Combine profiles with their corresponding reference database, then separate into two channels.
*
Expand Down Expand Up @@ -120,12 +135,7 @@ workflow STANDARDISATION_PROFILES {

// Bracken

ch_profiles_for_bracken = ch_input_profiles.bracken
.map { [it[0]['db_name'], it[1]] }
.groupTuple()
.map {
[[id:it[0]], it[1]]
}
ch_profiles_for_bracken = groupProfiles(ch_input_profiles.bracken)

BRACKEN_COMBINEBRACKENOUTPUTS ( ch_profiles_for_bracken )

Expand All @@ -134,13 +144,10 @@ workflow STANDARDISATION_PROFILES {
// Collect and replace id for db_name for prefix
// Have to sort by size to ensure first file actually has hits otherwise
// the script fails
ch_profiles_for_centrifuge = ch_input_profiles.centrifuge
.map { [it[0]['db_name'], it[1]] }
.groupTuple(sort: {-it.size()} )
.map {
[[id:it[0]], it[1]]
}

ch_profiles_for_centrifuge = groupProfiles(
ch_input_profiles.centrifuge,
[sort: { -it.size() }]
)

KRAKENTOOLS_COMBINEKREPORTS_CENTRIFUGE ( ch_profiles_for_centrifuge )
ch_multiqc_files = ch_multiqc_files.mix( KRAKENTOOLS_COMBINEKREPORTS_CENTRIFUGE.out.txt )
Expand All @@ -149,12 +156,7 @@ workflow STANDARDISATION_PROFILES {
// Kaiju

// Collect and replace id for db_name for prefix
ch_profiles_for_kaiju = ch_input_classifications.kaiju
.map { [it[0]['db_name'], it[1]] }
.groupTuple()
.map {
[[id:it[0]], it[1]]
}
ch_profiles_for_kaiju = groupProfiles(ch_input_classifications.kaiju)

ch_input_for_kaiju2tablecombine = combineProfilesWithDatabase(ch_profiles_for_kaiju, ch_input_databases.kaiju)

Expand All @@ -167,29 +169,31 @@ workflow STANDARDISATION_PROFILES {
// Collect and replace id for db_name for prefix
// Have to sort by size to ensure first file actually has hits otherwise
// the script fails
ch_profiles_for_kraken2 = ch_input_profiles.kraken2
.map {
meta, profiles ->
def new_meta = [:]
new_meta.tool = meta.tool == 'kraken2-bracken' ? 'kraken2' : meta.tool // replace to get the right output-format description
new_meta.id = meta.tool // append so to disambiguate when we have same databases for kraken2 step of bracken, with normal bracken
new_meta.db_name = meta.tool == 'kraken2-bracken' ? "${meta.db_name}-bracken" : "${meta.db_name}" // append so to disambiguate when we have same databases for kraken2 step of bracken, with normal bracken
[ new_meta, profiles ]
}
.groupTuple(sort: {-it.size()})
ch_profiles_for_kraken2 = groupProfiles(
ch_input_profiles.kraken2.map { meta, profile ->
def new_meta = [
// Replace the tool name to get the right output-format description.
tool: meta.tool == 'kraken2-bracken' ? 'kraken2' : meta.tool,
// Append so as to disambiguate when we have same databases
// for kraken2 step of bracken, with normal bracken.
id: meta.tool,
// Append so as to disambiguate when we have same databases
// for kraken2 step of bracken, with normal bracken.
db_name: meta.tool == 'kraken2-bracken' ? "${meta.db_name}-bracken" : "${meta.db_name}"

]
return [meta + new_meta, profile]
},
[sort: { -it.size() }]
)

KRAKENTOOLS_COMBINEKREPORTS_KRAKEN ( ch_profiles_for_kraken2 )
ch_multiqc_files = ch_multiqc_files.mix( KRAKENTOOLS_COMBINEKREPORTS_KRAKEN.out.txt )
ch_versions = ch_versions.mix( KRAKENTOOLS_COMBINEKREPORTS_KRAKEN.out.versions )

// MetaPhlAn

ch_profiles_for_metaphlan = ch_input_profiles.metaphlan
.map { [it[0]['db_name'], it[1]] }
.groupTuple()
.map {
[[id:it[0]], it[1]]
}
ch_profiles_for_metaphlan = groupProfiles(ch_input_profiles.metaphlan)

METAPHLAN_MERGEMETAPHLANTABLES ( ch_profiles_for_metaphlan )
ch_multiqc_files = ch_multiqc_files.mix( METAPHLAN_MERGEMETAPHLANTABLES.out.txt )
Expand All @@ -201,12 +205,7 @@ workflow STANDARDISATION_PROFILES {
// Therefore removing db info here, and publish merged at root mOTUs results
// directory

ch_profiles_for_motus = ch_input_profiles.motus
.map { [it[0]['db_name'], it[1]] }
.groupTuple()
.map {
[[id:it[0]], it[1]]
}
ch_profiles_for_motus = groupProfiles(ch_input_profiles.motus)

ch_input_for_motusmerge = combineProfilesWithDatabase(ch_profiles_for_motus, ch_input_databases.motus)

Expand All @@ -215,12 +214,7 @@ workflow STANDARDISATION_PROFILES {

// Ganon

ch_profiles_for_ganon = ch_input_profiles.ganon
.map { [it[0]['db_name'], it[1]] }
.groupTuple()
.map {
[[id:it[0]], it[1]]
}
ch_profiles_for_ganon = groupProfiles(ch_input_profiles.ganon)

GANON_TABLE ( ch_profiles_for_ganon )
ch_multiqc_files = ch_multiqc_files.mix( GANON_TABLE.out.txt )
Expand Down

0 comments on commit 3d30547

Please sign in to comment.