From 226d08bec7653eeebe6d6e713ae1c893d908dea8 Mon Sep 17 00:00:00 2001 From: tfenne Date: Fri, 8 Dec 2023 09:35:59 -0700 Subject: [PATCH] A couple more small optimizations and extended the tests. --- .../fulcrumgenomics/umi/GroupReadsByUmi.scala | 93 ++++++++++--------- .../umi/GroupReadsByUmiTest.scala | 26 +++--- 2 files changed, 63 insertions(+), 56 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala b/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala index a6b6fa04f..5b0ee69cd 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala @@ -244,8 +244,6 @@ object GroupReadsByUmi { /** Returns whether or not a pair of UMIs match closely enough to be considered adjacent in the graph. */ protected def matches(lhs: Umi, rhs: Umi): Boolean = { val len = lhs.length - require(rhs.length == len, s"UMIs of different length detected: $lhs vs. $rhs") - var idx = 0 var mismatches = 0 val tooManyMismatches = this.maxMismatches + 1 @@ -272,52 +270,61 @@ object GroupReadsByUmi { override def assign(rawUmis: Seq[Umi]): Map[Umi, MoleculeId] = { // Make a list of counts of all UMIs in order from most to least abundant; we'll consume from this buffer val orderedNodes = count(rawUmis).map{ case(umi,count) => new Node(umi, count.toInt) }.toIndexedSeq.sortBy((n:Node) => -n.count) - val lookup = countIndexLookup(orderedNodes) // Seq of (count, firstIdx) pairs - - // A list of all the root UMIs/Nodes that we find - val roots = Seq.newBuilder[Node] - - // Now build one or more graphs starting with the most abundant remaining umi - val working = mutable.Queue[Node]() - forloop (from=0, until=orderedNodes.length) { rootIdx => - val nextRoot = orderedNodes(rootIdx) - - if (!nextRoot.assigned) { - roots += nextRoot - working.enqueue(nextRoot) - - while (working.nonEmpty) { - val root = working.dequeue() - root.assigned = true - val maxChildCountPlusOne = (root.count / 2 + 1) + 1 - val searchFromIdx = lookup - .find { case (count, _) => count < maxChildCountPlusOne } - .map { case (_, idx) => idx } - .getOrElse(-1) - - if (searchFromIdx >= 0) { - val hits = taskSupport match { - case None => - orderedNodes - .drop(searchFromIdx) - .filter(other => !other.assigned && matches(root.umi, other.umi)) - case Some(ts) => - orderedNodes - .drop(searchFromIdx) - .parWith(ts) - .filter(other => !other.assigned && matches(root.umi, other.umi)) - .seq - } - root.children ++= hits - working.enqueueAll(hits) - hits.foreach(_.assigned = true) + if (orderedNodes.length == 1) { + orderedNodes.head.assigned = true + assignIdsToNodes(orderedNodes) + } + else { + val umiLength = orderedNodes.head.umi.length + require(orderedNodes.forall(_.umi.length == umiLength), f"Multiple UMI lengths: ${orderedNodes.map(_.umi).mkString(", ")}") + val lookup = countIndexLookup(orderedNodes) // Seq of (count, firstIdx) pairs + + // A list of all the root UMIs/Nodes that we find + val roots = Seq.newBuilder[Node] + + // Now build one or more graphs starting with the most abundant remaining umi + val working = mutable.Queue[Node]() + forloop (from=0, until=orderedNodes.length) { rootIdx => + val nextRoot = orderedNodes(rootIdx) + + if (!nextRoot.assigned) { + roots += nextRoot + working.enqueue(nextRoot) + + while (working.nonEmpty) { + val root = working.dequeue() + root.assigned = true + val maxChildCountPlusOne = (root.count / 2 + 1) + 1 + val searchFromIdx = lookup + .find { case (count, _) => count < maxChildCountPlusOne } + .map { case (_, idx) => idx } + .getOrElse(-1) + + if (searchFromIdx >= 0) { + val hits = taskSupport match { + case None => + orderedNodes + .drop(searchFromIdx) + .filter(other => !other.assigned && matches(root.umi, other.umi)) + case Some(ts) => + orderedNodes + .drop(searchFromIdx) + .parWith(ts) + .filter(other => !other.assigned && matches(root.umi, other.umi)) + .seq + } + + root.children ++= hits + working.enqueueAll(hits) + hits.foreach(_.assigned = true) + } } } } - } - assignIdsToNodes(roots.result()) + assignIdsToNodes(roots.result()) + } } /** diff --git a/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala b/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala index baec12e8e..77a5dc8f6 100644 --- a/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala +++ b/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala @@ -84,31 +84,31 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT group(assigner.assign(umis)) should contain theSameElementsAs Set(Set("AAAAAA", "AAAATA", "AAAATT", "GGCGGC", "TGCACC", "TGCACG")) } - { - "AdjacencyUmiAssigner" should "assign each UMI to separate groups" in { + Seq(1, 4).foreach { threads => + "AdjacencyUmiAssigner" should s"assign each UMI to separate groups with $threads thread(s)" in { val umis = Seq("AAAAAA", "CCCCCC", "GGGGGG", "TTTTTT", "AAATTT", "TTTAAA", "AGAGAG") - val groups = group(new AdjacencyUmiAssigner(maxMismatches=2).assign(umis)) + val groups = group(new AdjacencyUmiAssigner(maxMismatches=2, threads=threads).assign(umis)) groups shouldBe umis.map(Set(_)).toSet } - it should "assign everything into one group when all counts=1 and within mismatch threshold" in { + it should f"assign everything into one group when all counts=1 and within mismatch threshold with $threads thread(s)" in { val umis = Seq("AAAAAA", "AAAAAc", "AAAAAg").map(_.toUpperCase) - val groups = group(new AdjacencyUmiAssigner(maxMismatches=1).assign(umis)) + val groups = group(new AdjacencyUmiAssigner(maxMismatches=1, threads=threads).assign(umis)) groups shouldBe Set(umis.toSet) } - it should "assign everything into one group" in { + it should f"assign everything into one group with $threads thread(s)" in { val umis = Seq("AAAAAA", "AAAAAA", "AAAAAA", "AAAAAc", "AAAAAc", "AAAAAg", "AAAtAA").map(_.toUpperCase) - val groups = group(new AdjacencyUmiAssigner(maxMismatches=1).assign(umis)) + val groups = group(new AdjacencyUmiAssigner(maxMismatches=1, threads=threads).assign(umis)) groups shouldBe Set(umis.toSet) } - it should "make three groups" in { + it should f"make three groups with $threads thread(s)" in { val umis: Seq[String] = n("AAAAAA", 4) ++ n("AAAAAT", 2) ++ n("AATAAT", 1) ++ n("AATAAA", 2) ++ n("GACGAC", 9) ++ n("GACGAT", 1) ++ n("GACGCC", 4) ++ n("TACGAC", 7) - val groups = group(new AdjacencyUmiAssigner(maxMismatches=2).assign(umis)) + val groups = group(new AdjacencyUmiAssigner(maxMismatches=2, threads=threads).assign(umis)) groups shouldBe Set( Set("AAAAAA", "AAAAAT", "AATAAT", "AATAAA"), Set("GACGAC", "GACGAT", "GACGCC"), @@ -117,15 +117,15 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT } // Unit test for something that failed when running on real data - it should "correctly assign the following UMIs" in { + it should f"correctly assign the following UMIs with $threads thread(s)" in { val umis = Seq("CGGGGG", "GTGGGG", "GGGGGG", "CTCACA", "TGCAGT", "CTCACA", "CGGGGG") - val groups = group(new AdjacencyUmiAssigner(maxMismatches=1).assign(umis)) + val groups = group(new AdjacencyUmiAssigner(maxMismatches=1, threads=threads).assign(umis)) groups shouldBe Set(Set("CGGGGG", "GGGGGG", "GTGGGG"), Set("CTCACA"), Set("TGCAGT")) } - it should "handle a deep tree of UMIs" in { + it should f"handle a deep tree of UMIs with $threads thread(s)" in { val umis = n("AAAAAA", 256) ++ n("TAAAAA", 128) ++ n("TTAAAA", 64) ++ n("TTTAAA", 32) ++ n("TTTTAA", 16) - val groups = group(new AdjacencyUmiAssigner(maxMismatches=1).assign(umis)) + val groups = group(new AdjacencyUmiAssigner(maxMismatches=1, threads=threads).assign(umis)) groups shouldBe Set(Set("AAAAAA", "TAAAAA", "TTAAAA", "TTTAAA", "TTTTAA")) } }