Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Samtools #47

Merged
merged 4 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ jobs:
mkdir -p $HOME/bin
cp ./minimap2-2.26_x64-linux/minimap2 $HOME/bin
echo "$HOME/bin" >> $GITHUB_PATH
- name: Install dependencies for samtools
run: |
sudo apt-get update
sudo apt-get install -y libncurses5-dev libbz2-dev liblzma-dev zlib1g-dev
- name: Download and install samtools
run: |
curl -L https://github.com/samtools/samtools/releases/download/1.13/samtools-1.13.tar.bz2 | tar -jxvf -
cd samtools-1.13
./configure --prefix=$HOME/samtools
make
make install
echo "$HOME/samtools/bin" >> $GITHUB_PATH
- name: Test external
run: go test -v ./external/...
openbsd-test:
Expand Down
33 changes: 2 additions & 31 deletions external/minimap2/minimap2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ For more information on minimap2, please visit Heng Li's git: https://github.com
package minimap2

import (
"context"
"io"
"os"
"os/exec"

"golang.org/x/sync/errgroup"
)

// Minimap2 aligns sequences using minimap2 over the command line. Right
Expand Down Expand Up @@ -69,37 +66,11 @@ func Minimap2(templateFastaInput io.Reader, fastqInput io.Reader, w io.Writer) e

// Start minimap2 pointing to the temporary file and stdin for sequencing data
cmd := exec.Command("minimap2", "-ax", "map-ont", tmpFile.Name(), "-")
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
cmd.Stdout = w
cmd.Stdin = fastqInput
if err := cmd.Start(); err != nil {
return err
}

// Create an errgroup group to manage goroutines
g, _ := errgroup.WithContext(context.Background())

// Write data to the stdin of minimap2 (sequencing data) using a goroutine
g.Go(func() error {
defer stdin.Close()
_, err := io.Copy(stdin, fastqInput)
return err
})

// Start copying the output of minimap2 to w using a goroutine
g.Go(func() error {
_, err := io.Copy(w, stdout)
return err
})

// Wait for all goroutines to complete
if err := g.Wait(); err != nil {
return err
}
return cmd.Wait()
}
2 changes: 1 addition & 1 deletion external/minimap2/minimap2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMinimap2(t *testing.T) {
// Prepare the writer to capture the output
var buf bytes.Buffer

// Execute the Minimap2Raw function
// Execute the Minimap2 function
err = minimap2.Minimap2(templateFile, fastqFile, &buf)
if err != nil {
t.Errorf("Minimap2Raw returned an error: %v", err)
Expand Down
63 changes: 63 additions & 0 deletions external/samtools/data/aln.sam

Large diffs are not rendered by default.

236 changes: 236 additions & 0 deletions external/samtools/data/reads.fastq

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions external/samtools/data/template.fasta
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
>Cn_Pp_Killer-alpha-MF-Delta
AATGACCAAACCAACGCAAGTCTTAGTTCGTTCAGTCTCTATTTTATTCTTCATCACACTGTTGCACTTGGTTGTTGCAATGAGATTTCCTAGTATTTTCACTGCTGTGCTATTTGCCGCTAGTTCCGCTCTAGCTGCTCCAGTTAATACTACTACTGAAGATGAATTGGAGGGTGACTTCGATGTTGCTGTTCTGCCTTTTTCCGCTTCTATCGCAGCCAAGGAAGAAGGTGTATCTCTAGAGAAGCGTC
106 changes: 106 additions & 0 deletions external/samtools/samtools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Package samtools wraps the samtools cli to be used with Go.
*/
package samtools

import (
"context"
"io"
"os"
"os/exec"
"syscall"

"golang.org/x/sync/errgroup"
)

// Pileup runs a
func Pileup(templateFastas io.Reader, samAlignments io.Reader, w io.Writer) error {
// Create a temporary file for the template fasta
tmpFile, err := os.CreateTemp("", "template_*.fasta")
if err != nil {
return err
}
defer os.Remove(tmpFile.Name()) // Clean up file afterwards

// Write template fasta data to the temporary file
if _, err := io.Copy(tmpFile, templateFastas); err != nil {
return err
}
tmpFile.Close() // Close the file as it's no longer needed

g, ctx := errgroup.WithContext(context.Background())

// Setup pipe connections between commands
viewSortReader, viewSortWriter := io.Pipe()
sortMpileupReader, sortMpileupWriter := io.Pipe()

// Define commands with context
viewCmd := exec.CommandContext(ctx, "samtools", "view", "-bF", "4")
sortCmd := exec.CommandContext(ctx, "samtools", "sort", "-")
mpileupCmd := exec.CommandContext(ctx, "samtools", "mpileup", "-f", tmpFile.Name(), "-")

// Goroutine for the first command: samtools view
g.Go(func() error {
defer viewSortWriter.Close() // ensure the pipe is closed after this function exits

viewCmd.Stdin = samAlignments
viewCmd.Stdout = viewSortWriter

if err := viewCmd.Start(); err != nil {
return err
}

select {
case <-ctx.Done():
viewCmd.Process.Signal(syscall.SIGTERM)
return ctx.Err()
default:
return viewCmd.Wait()
}
})

// Goroutine for the second command: samtools sort
g.Go(func() error {
defer sortMpileupWriter.Close() // ensure the pipe is closed after this function exits

sortCmd.Stdin = viewSortReader
sortCmd.Stdout = sortMpileupWriter

if err := sortCmd.Start(); err != nil {
return err
}

select {
case <-ctx.Done():
sortCmd.Process.Signal(syscall.SIGTERM)
return ctx.Err()
default:
return sortCmd.Wait()
}
})

// Goroutine for the third command: samtools mpileup
g.Go(func() error {
mpileupCmd.Stdin = sortMpileupReader
mpileupCmd.Stdout = w

if err := mpileupCmd.Start(); err != nil {
return err
}

select {
case <-ctx.Done():
mpileupCmd.Process.Signal(syscall.SIGTERM)
return ctx.Err()
default:
return mpileupCmd.Wait()
}
})

// Wait for all goroutines to complete and return the first non-nil error
if err := g.Wait(); err != nil {
return err
}

return nil
}
47 changes: 47 additions & 0 deletions external/samtools/samtools_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package samtools_test

import (
"bytes"
"os"
"testing"

"github.com/koeng101/dnadesign/external/samtools"
"github.com/koeng101/dnadesign/lib/bio"
)

func TestPileup(t *testing.T) {
// Open the template FASTA file
templateFile, err := os.Open("./data/template.fasta")
if err != nil {
t.Fatalf("Failed to open template FASTA file: %v", err)
}
defer templateFile.Close()

// Open the sam file
samFile, err := os.Open("./data/aln.sam")
if err != nil {
t.Fatalf("Failed to open sam alignment file: %v", err)
}
defer samFile.Close()

// Prepare the writer to capture the output
var buf bytes.Buffer

// Execute the pileup function
err = samtools.Pileup(templateFile, samFile, &buf)
if err != nil {
t.Errorf("Pileup returned error: %s", err)
}

// Read as pileup file
parser := bio.NewPileupParser(&buf)
lines, err := parser.Parse()
if err != nil {
t.Errorf("Failed while parsing: %s", err)
}

expectedQuality := "3555457667367556657568659884340:7"
if lines[5].Quality != expectedQuality {
t.Errorf("Bad quality return. Got: %s Expected: %s", lines[5].Quality, expectedQuality)
}
}
39 changes: 27 additions & 12 deletions lib/bio/bio.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/koeng101/dnadesign/lib/bio/fastq"
"github.com/koeng101/dnadesign/lib/bio/genbank"
"github.com/koeng101/dnadesign/lib/bio/pileup"
"github.com/koeng101/dnadesign/lib/bio/sam"
"github.com/koeng101/dnadesign/lib/bio/slow5"
"github.com/koeng101/dnadesign/lib/bio/uniprot"
"golang.org/x/sync/errgroup"
Expand All @@ -33,6 +34,7 @@ const (
Fastq
Genbank
Slow5
Sam
Pileup
)

Expand All @@ -48,6 +50,7 @@ var DefaultMaxLengths = map[Format]int{
Fastq: 8 * 1024 * 1024, // The longest single nanopore sequencing read so far is 4Mb. A 8mb buffer should be large enough for any sequencing.
Genbank: defaultMaxLineLength,
Slow5: 128 * 1024 * 1024, // 128mb is used because slow5 lines can be massive, since a single read can be many millions of base pairs.
Sam: defaultMaxLineLength,
Pileup: defaultMaxLineLength,
}

Expand Down Expand Up @@ -89,36 +92,36 @@ type Parser[Data io.WriterTo, Header io.WriterTo] struct {
}

// NewFastaParser initiates a new FASTA parser from an io.Reader.
func NewFastaParser(r io.Reader) (*Parser[*fasta.Record, *fasta.Header], error) {
func NewFastaParser(r io.Reader) *Parser[*fasta.Record, *fasta.Header] {
return NewFastaParserWithMaxLineLength(r, DefaultMaxLengths[Fasta])
}

// NewFastaParserWithMaxLineLength initiates a new FASTA parser from an
// io.Reader and a user-given maxLineLength.
func NewFastaParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*fasta.Record, *fasta.Header], error) {
return &Parser[*fasta.Record, *fasta.Header]{parserInterface: fasta.NewParser(r, maxLineLength)}, nil
func NewFastaParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*fasta.Record, *fasta.Header] {
return &Parser[*fasta.Record, *fasta.Header]{parserInterface: fasta.NewParser(r, maxLineLength)}
}

// NewFastqParser initiates a new FASTQ parser from an io.Reader.
func NewFastqParser(r io.Reader) (*Parser[*fastq.Read, *fastq.Header], error) {
func NewFastqParser(r io.Reader) *Parser[*fastq.Read, *fastq.Header] {
return NewFastqParserWithMaxLineLength(r, DefaultMaxLengths[Fastq])
}

// NewFastqParserWithMaxLineLength initiates a new FASTQ parser from an
// io.Reader and a user-given maxLineLength.
func NewFastqParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*fastq.Read, *fastq.Header], error) {
return &Parser[*fastq.Read, *fastq.Header]{parserInterface: fastq.NewParser(r, maxLineLength)}, nil
func NewFastqParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*fastq.Read, *fastq.Header] {
return &Parser[*fastq.Read, *fastq.Header]{parserInterface: fastq.NewParser(r, maxLineLength)}
}

// NewGenbankParser initiates a new Genbank parser form an io.Reader.
func NewGenbankParser(r io.Reader) (*Parser[*genbank.Genbank, *genbank.Header], error) {
func NewGenbankParser(r io.Reader) *Parser[*genbank.Genbank, *genbank.Header] {
return NewGenbankParserWithMaxLineLength(r, DefaultMaxLengths[Genbank])
}

// NewGenbankParserWithMaxLineLength initiates a new Genbank parser from an
// io.Reader and a user-given maxLineLength.
func NewGenbankParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*genbank.Genbank, *genbank.Header], error) {
return &Parser[*genbank.Genbank, *genbank.Header]{parserInterface: genbank.NewParser(r, maxLineLength)}, nil
func NewGenbankParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*genbank.Genbank, *genbank.Header] {
return &Parser[*genbank.Genbank, *genbank.Header]{parserInterface: genbank.NewParser(r, maxLineLength)}
}

// NewSlow5Parser initiates a new SLOW5 parser from an io.Reader.
Expand All @@ -133,15 +136,27 @@ func NewSlow5ParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*s
return &Parser[*slow5.Read, *slow5.Header]{parserInterface: parser}, err
}

// NewSamParser initiates a new SAM parser from an io.Reader.
func NewSamParser(r io.Reader) (*Parser[*sam.Alignment, *sam.Header], error) {
return NewSamParserWithMaxLineLength(r, DefaultMaxLengths[Sam])
}

// NewSamParserWithMaxLineLength initiates a new SAM parser from an io.Reader
// and a user-given maxLineLength.
func NewSamParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*sam.Alignment, *sam.Header], error) {
parser, _, err := sam.NewParser(r, maxLineLength)
return &Parser[*sam.Alignment, *sam.Header]{parserInterface: parser}, err
}

// NewPileupParser initiates a new Pileup parser from an io.Reader.
func NewPileupParser(r io.Reader) (*Parser[*pileup.Line, *pileup.Header], error) {
func NewPileupParser(r io.Reader) *Parser[*pileup.Line, *pileup.Header] {
return NewPileupParserWithMaxLineLength(r, DefaultMaxLengths[Pileup])
}

// NewPileupParserWithMaxLineLength initiates a new Pileup parser from an
// io.Reader and a user-given maxLineLength.
func NewPileupParserWithMaxLineLength(r io.Reader, maxLineLength int) (*Parser[*pileup.Line, *pileup.Header], error) {
return &Parser[*pileup.Line, *pileup.Header]{parserInterface: pileup.NewParser(r, maxLineLength)}, nil
func NewPileupParserWithMaxLineLength(r io.Reader, maxLineLength int) *Parser[*pileup.Line, *pileup.Header] {
return &Parser[*pileup.Line, *pileup.Header]{parserInterface: pileup.NewParser(r, maxLineLength)}
}

// NewUniprotParser initiates a new Uniprot parser from an io.Reader. No
Expand Down
Loading
Loading