Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: speed improvements for primer pair hit building from single primer hits #99

Merged
merged 13 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 121 additions & 26 deletions prymer/offtarget/offtarget_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from prymer.api.oligo import Oligo
from prymer.api.primer_pair import PrimerPair
from prymer.api.span import Span
from prymer.api.span import Strand
from prymer.offtarget.bwa import BWA_EXECUTABLE_NAME
from prymer.offtarget.bwa import BwaAlnInteractive
from prymer.offtarget.bwa import BwaHit
Expand Down Expand Up @@ -126,6 +127,24 @@ class OffTargetResult:
right_primer_spans: list[Span] = field(default_factory=list)


@dataclass(init=True)
class PrimerPairBwaHitsBySideAndStrand:
"""A helper class for storing BWA hits for the left and right primers of a primer pair organized
by left/right and hit strand.

Attributes:
left_positive: A list of BwaHit objects on the positive strand for the left primer.
left_negative: A list of BwaHit objects on the negative strand for the left primer.
right_positive: A list of BwaHit objects on the positive strand for the right primer.
right_negative: A list of BwaHit objects on the negative strand for the right primer.
"""

left_positive: list[BwaHit] = field(default_factory=list)
left_negative: list[BwaHit] = field(default_factory=list)
right_positive: list[BwaHit] = field(default_factory=list)
right_negative: list[BwaHit] = field(default_factory=list)


class OffTargetDetector(AbstractContextManager):
"""A class for detecting off-target mappings of primers and primer pairs that uses a custom
version of "bwa aln" named "bwa-aln-interactive".
Expand Down Expand Up @@ -334,26 +353,71 @@ def _build_off_target_result(
result: OffTargetResult

# Get the mappings for the left primer and right primer respectively
p1: BwaResult = hits_by_primer[primer_pair.left_primer.bases]
p2: BwaResult = hits_by_primer[primer_pair.right_primer.bases]

# Get all possible amplicons from the left_primer_mappings and right_primer_mappings
# primer hits, filtering if there are too many for either
if p1.hit_count > self._max_primer_hits or p2.hit_count > self._max_primer_hits:
left_bwa_result: BwaResult = hits_by_primer[primer_pair.left_primer.bases]
right_bwa_result: BwaResult = hits_by_primer[primer_pair.right_primer.bases]

# If there are too many hits, this primer pair will not pass. Exit early.
if (
left_bwa_result.hit_count > self._max_primer_hits
or right_bwa_result.hit_count > self._max_primer_hits
):
result = OffTargetResult(primer_pair=primer_pair, passes=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just return here? Is there any value to caching the result in this case, where it's so little work to compute the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching is required by the unit test at

# Test that using the cache (or not) does not affect the results

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Perhaps I'll "fix" this in a separate PR.

else:
amplicons = self._to_amplicons(p1.hits, p2.hits, self._max_amplicon_size)
result = OffTargetResult(
primer_pair=primer_pair,
passes=self._min_primer_pair_hits <= len(amplicons) <= self._max_primer_pair_hits,
spans=amplicons if self._keep_spans else [],
left_primer_spans=(
[self._hit_to_span(h) for h in p1.hits] if self._keep_primer_spans else []
),
right_primer_spans=(
[self._hit_to_span(h) for h in p2.hits] if self._keep_primer_spans else []
),
return result

# Get the set of reference names with hits
hits_by_refname: dict[str, PrimerPairBwaHitsBySideAndStrand] = {
hit.refname: PrimerPairBwaHitsBySideAndStrand()
for hit in left_bwa_result.hits + right_bwa_result.hits
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These feels pretty inefficient .. it will concatenate two fairly large lists and (I think?) generate a PrimerPairBwaHitsBySideAndStrand per hit, then unique them, rather than generate one per reference contig.

I think something like this would work:

for refname in set(hit.refname for hit in itertools.chain(left_bwa_result.hits, right_bwa_result.hits)

}

# Split the hits for left and right by reference name and strand
for hit in left_bwa_result.hits:
if hit.negative:
hits_by_refname[hit.refname].left_negative.append(hit)
else:
hits_by_refname[hit.refname].left_positive.append(hit)

for hit in right_bwa_result.hits:
if hit.negative:
hits_by_refname[hit.refname].right_negative.append(hit)
else:
hits_by_refname[hit.refname].right_positive.append(hit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I see this ... I think actually maybe you don't care any more about left vs. right here and beyond this point? Sorry, as I think I may have sent you down this path.

Specifically if a single left primer happens to bind to the genome in two places, close together, in the right relative orientations, that could still cause problems. So I think it's really just one collection of + strand mappings and one collection of - strand mappings.


# Build amplicons from hits on the same reference with valid relative orientation
amplicons: list[Span] = []
for hits in hits_by_refname.values():
amplicons.extend(
self._to_amplicons(
positive_hits=hits.left_positive,
negative_hits=hits.right_negative,
max_len=self._max_amplicon_size,
strand=Strand.POSITIVE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to think about strand here. I think it's probably mostly irrelevant and could just always be set to positive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PrimerPair code, it's not explicitly enforced by a __post_init__ but is strongly assumed by e.g. calculate_amplicon_span that the left primer is left of the right primer. This implies that the left primer is on the positive strand and the right primer is on the negative strand. All amplicons are coded as on the positive strand (default value for Span, again, not explicit).

When a hit to a primer pair is in the same orientation as the primer pair definition, e.g. left positive and right negative, then it makes sense for the amplicon to also be on the positive strand.

If the hit is in the opposite orientation to the primer pair definition, the amplicon should be on the negative strand.

I think it makes sense to assign strandedness like this:

  • left primer + / right primer - = amplicon +
  • left primer + / left primer - = amplicon +
  • right primer + / left primer - = amplicon -
  • right primer + / right primer - = amplicon -

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed elsewhere, will leave questions of strand and L/L R/R hits to a separate PR.

)
)
amplicons.extend(
self._to_amplicons(
positive_hits=hits.right_positive,
negative_hits=hits.left_negative,
max_len=self._max_amplicon_size,
strand=Strand.NEGATIVE,
)
)

result = OffTargetResult(
primer_pair=primer_pair,
passes=self._min_primer_pair_hits <= len(amplicons) <= self._max_primer_pair_hits,
spans=amplicons if self._keep_spans else [],
left_primer_spans=(
[self._hit_to_span(h) for h in left_bwa_result.hits]
if self._keep_primer_spans
else []
),
right_primer_spans=(
[self._hit_to_span(h) for h in right_bwa_result.hits]
if self._keep_primer_spans
else []
),
)

if self._cache_results:
self._primer_pair_cache[primer_pair] = replace(result, cached=True)
Expand Down Expand Up @@ -410,19 +474,50 @@ def mappings_of(self, primers: list[PrimerType]) -> dict[str, BwaResult]:
return hits_by_primer

@staticmethod
def _to_amplicons(lefts: list[BwaHit], rights: list[BwaHit], max_len: int) -> list[Span]:
def _to_amplicons(
positive_hits: list[BwaHit], negative_hits: list[BwaHit], max_len: int, strand: Strand
) -> list[Span]:
"""Takes a set of hits for one or more left primers and right primers and constructs
amplicon mappings anywhere a left primer hit and a right primer hit align in F/R
orientation up to `maxLen` apart on the same reference. Primers may not overlap.

Args:
positive_hits: List of hits on the positive strand for one of the primers in the pair.
negative_hits: List of hits on the negative strand for the other primer in the pair.
max_len: Maximum length of amplicons to consider.
strand: The strand of the amplicon to generate. Set to Strand.POSITIVE if
`positive_hits` are for the left primer and `negative_hits` are for the right
primer. Set to Strand.NEGATIVE if `positive_hits` are for the right primer and
`negative_hits` are for the left primer.

Raises:
ValueError: If any of the positive hits are not on the positive strand, or any of the
negative hits are not on the negative strand. If hits are present on more than one
reference.
"""
amplicons: list[Span] = []
for h1, h2 in itertools.product(lefts, rights):
if h1.negative == h2.negative or h1.refname != h2.refname: # not F/R orientation
continue
if any(h.negative for h in positive_hits):
raise ValueError("Positive hits must be on the positive strand.")
if any(not h.negative for h in negative_hits):
raise ValueError("Negative hits must be on the negative strand.")

plus, minus = (h2, h1) if h1.negative else (h1, h2)
if minus.start > plus.end and (minus.end - plus.start + 1) <= max_len:
amplicons.append(Span(refname=plus.refname, start=plus.start, end=minus.end))
refnames: set[str] = {h.refname for h in positive_hits + negative_hits}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
refnames: set[str] = {h.refname for h in positive_hits + negative_hits}
refnames: set[str] = {h.refname for h in itertools.chain(positive_hits,
negative_hits)}

if len(refnames) > 1:
raise ValueError("Hits are present on more than one reference.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the refnames here?


amplicons: list[Span] = []
for positive_hit, negative_hit in itertools.product(positive_hits, negative_hits):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good enough if it meets your performance needs @ameynert. As noted elsewhere, if you had many many hits to one reference, you could speed this up by sorting the positive and negative strand hits, looping over list indices, and avoiding whole swathes of pairs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this separately and the speed improvement by sorting the pos/neg strand hits is significant when there are a large number of hits returned, e.g. in a scenario where the max mismatches parameters are relaxed, one of the primer sequences is getting 100+ hits, and there are multiple highly similar reference contigs in the indexed genome that's being searched.

if (
negative_hit.start > positive_hit.end
and (negative_hit.end - positive_hit.start + 1) <= max_len
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow for situations where the two primers' mappings overlap on the genome? E.g. maybe the check should be:

negative_hit.start > positive_hit.start and negative_hit.end > positive_hit.end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable. Do you think we'd ever see a situation where a primer is a palindrome and hits the same target both forward and reverse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change causes a number of tests to fail. Since it's a breaking change, I'd like to address it in a separate PR.

FAILED prymer/offtarget/offtarget_detector.py::prymer.offtarget.offtarget_detector
FAILED tests/offtarget/test_offtarget.py::test_ok[True] - AssertionError: assert 1187 == 856
FAILED tests/offtarget/test_offtarget.py::test_ok[False] - AssertionError: assert 1187 == 856
FAILED tests/offtarget/test_offtarget.py::test_check_too_many_primer_pair_hits[at the maximum-204-856-True-True] - AssertionError: at the maximum
FAILED tests/offtarget/test_offtarget.py::test_check_too_many_primer_pair_hits[at the maximum-204-856-True-False] - AssertionError: at the maximum
FAILED tests/offtarget/test_offtarget.py::test_to_amplicons[No mappings - overlapping primers (1bp overlap)-positive0-negative0-+-expected0-True] - AssertionError: No mappings - overlapping primers (1bp overlap)
FAILED tests/offtarget/test_offtarget.py::test_to_amplicons[No mappings - overlapping primers (1bp overlap)-positive0-negative0-+-expected0-False] - AssertionError: No mappings - overlapping primers (1bp overlap)
FAILED tests/offtarget/test_offtarget.py::test_to_amplicons[No mappings - amplicon size too big (1bp too big)-positive1-negative1-+-expected1-True] - AssertionError: No mappings - amplicon size too big (1bp too big)
FAILED tests/offtarget/test_offtarget.py::test_to_amplicons[No mappings - amplicon size too big (1bp too big)-positive1-negative1-+-expected1-False] - AssertionError: No mappings - amplicon size too big (1bp too big)

):
amplicons.append(
Span(
refname=positive_hit.refname,
start=positive_hit.start,
end=negative_hit.end,
strand=strand,
)
)

return amplicons

Expand Down
142 changes: 117 additions & 25 deletions tests/offtarget/test_offtarget.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from prymer.offtarget.bwa import BWA_EXECUTABLE_NAME
from prymer.offtarget.bwa import BwaHit
from prymer.offtarget.bwa import BwaResult
from prymer.offtarget.bwa import Query
from prymer.offtarget.offtarget_detector import OffTargetDetector
from prymer.offtarget.offtarget_detector import OffTargetResult

Expand Down Expand Up @@ -171,68 +172,159 @@ def test_mappings_of(ref_fasta: Path, cache_results: bool) -> None:
assert results_dict[p2.bases].hits[0] == expected_hit2


# Test building an OffTargetResult for a primer pair with left/right hits on different references
# and in different orientations
def test_build_off_target_result(ref_fasta: Path) -> None:
hits_by_primer: dict[str, BwaResult] = {
"A" * 100: BwaResult(
query=Query(
id="left",
bases="A" * 100,
),
hit_count=3,
hits=[
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr1", 400, True, "100M", 0),
BwaHit.build("chr2", 100, False, "100M", 0),
BwaHit.build("chr3", 700, True, "100M", 0),
],
),
"C" * 100: BwaResult(
query=Query(
id="right",
bases="C" * 100,
),
hit_count=2,
hits=[
BwaHit.build("chr1", 800, False, "100M", 0),
BwaHit.build("chr1", 200, True, "100M", 0),
BwaHit.build("chr3", 600, False, "100M", 0),
],
),
}

primer_pair = PrimerPair(
left_primer=Oligo(
tm=50,
penalty=0,
span=Span(refname="chr10", start=100, end=199, strand=Strand.POSITIVE),
bases="A" * 100,
),
right_primer=Oligo(
tm=50,
penalty=0,
span=Span(refname="chr10", start=300, end=399, strand=Strand.NEGATIVE),
bases="C" * 100,
),
amplicon_tm=100,
penalty=0,
)

with _build_detector(ref_fasta=ref_fasta) as detector:
off_target_result: OffTargetResult = detector._build_off_target_result(
primer_pair=primer_pair,
hits_by_primer=hits_by_primer,
)

print(off_target_result.spans)

assert off_target_result.spans == [
Span(refname="chr1", start=100, end=299, strand=Strand.POSITIVE),
Span(refname="chr3", start=600, end=799, strand=Strand.NEGATIVE),
]


# Test that using the cache (or not) does not affect the results
@pytest.mark.parametrize("cache_results", [True, False])
@pytest.mark.parametrize(
"test_id, left, right, expected",
"test_id, positive, negative, strand, expected",
[
(
"No mappings - different refnames",
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr2", 100, True, "100M", 0),
[],
),
(
"No mappings - FF pair",
BwaHit.build("chr1", 100, True, "100M", 0),
BwaHit.build("chr1", 100, True, "100M", 0),
[],
),
(
"No mappings - RR pair",
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr1", 100, False, "100M", 0),
[],
),
(
"No mappings - overlapping primers (1bp overlap)",
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr1", 199, True, "100M", 0),
Strand.POSITIVE,
[],
),
(
"No mappings - amplicon size too big (1bp too big)",
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr1", 151, True, "100M", 0),
Strand.POSITIVE,
[],
),
(
"Mappings - FR pair (R1 F)",
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr1", 200, True, "100M", 0),
[Span(refname="chr1", start=100, end=299)],
Strand.POSITIVE,
[Span(refname="chr1", start=100, end=299, strand=Strand.POSITIVE)],
),
(
"Mappings - FR pair (R1 R)",
BwaHit.build("chr1", 200, True, "100M", 0),
BwaHit.build("chr1", 100, False, "100M", 0),
[Span(refname="chr1", start=100, end=299)],
BwaHit.build("chr1", 200, True, "100M", 0),
Strand.NEGATIVE,
[Span(refname="chr1", start=100, end=299, strand=Strand.NEGATIVE)],
),
],
)
def test_to_amplicons(
ref_fasta: Path,
test_id: str,
left: BwaHit,
right: BwaHit,
positive: BwaHit,
negative: BwaHit,
strand: Strand,
expected: list[Span],
cache_results: bool,
) -> None:
with _build_detector(ref_fasta=ref_fasta, cache_results=cache_results) as detector:
actual = detector._to_amplicons(lefts=[left], rights=[right], max_len=250)
actual = detector._to_amplicons(
positive_hits=[positive], negative_hits=[negative], max_len=250, strand=strand
)
assert actual == expected, test_id


@pytest.mark.parametrize("cache_results", [True, False])
@pytest.mark.parametrize(
"positive, negative, expected_error",
[
(
# No mappings - different refnames
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr2", 100, True, "100M", 0),
"Hits are present on more than one reference",
),
(
# No mappings - FF pair
BwaHit.build("chr1", 100, True, "100M", 0),
BwaHit.build("chr1", 100, True, "100M", 0),
"Positive hits must be on the positive strand",
),
(
# No mappings - RR pair
BwaHit.build("chr1", 100, False, "100M", 0),
BwaHit.build("chr1", 100, False, "100M", 0),
"Negative hits must be on the negative strand",
),
],
)
def test_to_amplicons_value_error(
ref_fasta: Path,
positive: BwaHit,
negative: BwaHit,
expected_error: str,
cache_results: bool,
) -> None:
with (
_build_detector(ref_fasta=ref_fasta, cache_results=cache_results) as detector,
pytest.raises(ValueError, match=expected_error),
):
detector._to_amplicons(
positive_hits=[positive], negative_hits=[negative], max_len=250, strand=Strand.POSITIVE
)


def test_generic_filter(ref_fasta: Path) -> None:
"""
This test isn't intended to validate any runtime assertions, but is a minimal example for the
Expand Down
Loading