-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'support-assemblies' into 'master'
Support processing of assemblies + clean up folders after correct execution See merge request tron/covigator-ngs-pipeline!5
- Loading branch information
Showing
9 changed files
with
933 additions
and
270 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
#!/usr/bin/env python | ||
import os | ||
from argparse import ArgumentParser | ||
from dataclasses import dataclass | ||
from Bio import Align, SeqIO | ||
from Bio.Align import PairwiseAlignment | ||
from typing import List | ||
|
||
CHROMOSOME = "MN908947.3" | ||
|
||
|
||
@dataclass | ||
class Variant: | ||
position: int | ||
reference: str | ||
alternate: str | ||
|
||
def to_vcf_line(self): | ||
# transform 0-based position to 1-based position | ||
return CHROMOSOME, str(self.position + 1), ".", self.reference, self.alternate, ".", "PASS", "." | ||
|
||
|
||
class AssemblyVariantCaller: | ||
|
||
def call_variants(self, sequence: str, reference: str) -> List[Variant]: | ||
alignment = self._run_alignment(sequence=sequence, reference=reference) | ||
variants = self._call_mutations(alignment) | ||
return variants | ||
|
||
def _run_alignment(self, sequence: str, reference: str) -> PairwiseAlignment: | ||
aligner = Align.PairwiseAligner() | ||
aligner.mode = 'global' | ||
aligner.match = 2 | ||
aligner.mismatch = -1 | ||
aligner.open_gap_score = -3 | ||
aligner.extend_gap_score = -0.1 | ||
aligner.target_end_gap_score = 0.0 | ||
aligner.query_end_gap_score = 0.0 | ||
alignments = aligner.align(reference, sequence) | ||
return alignments[0] | ||
|
||
def _call_mutations(self, alignment: PairwiseAlignment) -> List[Variant]: | ||
# CHROM POS ID REF ALT QUAL FILTER INFO FORMAT | ||
# MN908947.3 9924 . C T 228 . | ||
# DP=139;VDB=0.784386;SGB=-0.693147;RPB=0.696296;MQB=1;MQSB=1;BQB=0.740741;MQ0F=0;AC=1;AN=1;DP4=2,0,123,12;MQ=60 | ||
# GT:PL 1:255,0 | ||
alternate = alignment.query | ||
reference = alignment.target | ||
|
||
variants = [] | ||
prev_ref_end = None | ||
prev_alt_end = None | ||
for (ref_start, ref_end), (alt_start, alt_end) in zip(alignment.aligned[0], alignment.aligned[1]): | ||
# calls indels | ||
# NOTE: it does not call indels at beginning and end of sequence | ||
if prev_ref_end is not None and prev_ref_end != ref_start: | ||
# deletion | ||
if ref_start - prev_ref_end <= 50: # skips deletions longer than 50 bp | ||
ref = reference[prev_ref_end - 1: ref_start] | ||
if 'N' not in ref: # do not call deletions with Ns | ||
variants.append(Variant( | ||
position=prev_ref_end - 1, | ||
reference=ref, | ||
alternate=reference[prev_ref_end - 1])) | ||
elif prev_ref_end is not None and prev_alt_end != alt_start: | ||
# insertion | ||
if alt_start - prev_alt_end <= 50: # skips insertions longer than 50 bp | ||
ref = reference[prev_ref_end - 1] | ||
alt = alternate[prev_alt_end:alt_start] | ||
if ref != 'N' and 'N' not in alt: # do not call insertions with Ns | ||
variants.append(Variant( | ||
position=prev_ref_end - 1, | ||
reference=ref, | ||
alternate=ref + alt)) | ||
|
||
# calls SNVs | ||
for pos, ref, alt in zip( | ||
range(ref_start, ref_end), reference[ref_start: ref_end], alternate[alt_start: alt_end]): | ||
# contiguous SNVs are reported separately | ||
if ref != alt and ref != 'N' and alt != 'N': # do not call SNVs on Ns | ||
variants.append(Variant(position=pos, reference=ref, alternate=alt)) | ||
|
||
prev_ref_end = ref_end | ||
prev_alt_end = alt_end | ||
|
||
return variants | ||
|
||
|
||
def write_vcf(mutations, output_vcf): | ||
with open(output_vcf, "w") as vcf_out: | ||
header = ( | ||
"##fileformat=VCFv4.0", | ||
"##FILTER=<ID=PASS,Description=\"All filters passed\">", | ||
"##contig=<ID=MN908947.3>", | ||
"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO" | ||
) | ||
for row in header: | ||
vcf_out.write(row + "\n") | ||
for row in mutations: | ||
vcf_out.write("\t".join(row.to_vcf_line()) + "\n") | ||
|
||
|
||
def main(): | ||
parser = ArgumentParser(description="Run Pipeline for testing") | ||
parser.add_argument("--fasta", dest="fasta", | ||
help="The fasta file with the query sequence. Only one sequence is expected", | ||
required=True) | ||
parser.add_argument("--reference", dest="reference", | ||
help="The fasta file with the reference sequence. Only one sequence is expected", | ||
required=True) | ||
parser.add_argument("--output-vcf", dest="output_vcf", | ||
help="The path to the output VCF", | ||
required=True) | ||
args = parser.parse_args() | ||
|
||
assert os.path.exists(args.fasta), "Fasta file {} does not exist!".format(args.fasta) | ||
assert os.path.exists(args.reference), "Fasta file {} does not exist!".format(args.reference) | ||
|
||
query = next(SeqIO.parse(args.fasta, "fasta")) | ||
reference = next(SeqIO.parse(args.reference, "fasta")) | ||
variant_caller = AssemblyVariantCaller() | ||
variants = variant_caller.call_variants(sequence=query.seq, reference=reference.seq) | ||
write_vcf(mutations=variants, output_vcf=args.output_vcf) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from unittest import TestCase | ||
|
||
from .assembly_variant_caller import AssemblyVariantCaller | ||
|
||
|
||
class TestCountryParser(TestCase): | ||
|
||
def test_assembly_variant_caller(self): | ||
caller = AssemblyVariantCaller() | ||
# no mutations | ||
variants = caller.call_variants(sequence="ACGTACGT", reference="ACGTACGT") | ||
self.assertEqual(len(variants), 0) | ||
# SNV | ||
variants = caller.call_variants(sequence="ACGTCCGT", reference="ACGTACGT") | ||
self.assertEqual(len(variants), 1) | ||
snv = variants[0] | ||
self.assertEqual(snv.reference, "A") | ||
self.assertEqual(snv.alternate, "C") | ||
self.assertEqual(snv.position, 4) | ||
# deletion | ||
variants = caller.call_variants( | ||
reference="CTGGTGTGAGCCTGGTCACCAGGGTGGTAGGACAGACCCTCCTCTGGAGGCAAAGTGACG", | ||
sequence="CTGGTGTGAGCCTGGTCACCAGGGTGGTAGGACAGACCCTCCTCTGGCAAAGTGACG") | ||
self.assertEqual(len(variants), 1) | ||
snv = variants[0] | ||
self.assertEqual(snv.reference, "TGGA") | ||
self.assertEqual(snv.alternate, "T") | ||
self.assertEqual(snv.position, 44) | ||
# insertion | ||
variants = caller.call_variants( | ||
sequence= "CTGGTGTGAGCCTGGTCACCAGGGTGGTAGGACAGACCCTCCTCTGCCCGAGGCAAAGTGACG", | ||
reference="CTGGTGTGAGCCTGGTCACCAGGGTGGTAGGACAGACCCTCCTCTGGAGGCAAAGTGACG") | ||
self.assertEqual(len(variants), 1) | ||
snv = variants[0] | ||
self.assertEqual(snv.reference, "G") | ||
self.assertEqual(snv.alternate, "GCCC") | ||
self.assertEqual(snv.position, 45) | ||
# another insertion | ||
variants = caller.call_variants( | ||
sequence= "CTGGTGTGAGTCCTGGTCACCAGGGTGGTAGGACAGACCCTCCTCTGCCCGAGGCAAAGTGACG", | ||
reference="CTGGTGTGAGCCTGGTCACCAGGGTGGTAGGACAGACCCTCCTCTGGAGGCAAAGTGACG") | ||
self.assertEqual(len(variants), 2) | ||
snv = variants[1] | ||
self.assertEqual(snv.reference, "G") | ||
self.assertEqual(snv.alternate, "GCCC") | ||
self.assertEqual(snv.position, 45) | ||
snv = variants[0] | ||
self.assertEqual(snv.reference, "G") | ||
self.assertEqual(snv.alternate, "GT") | ||
self.assertEqual(snv.position, 9) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.