Skip to content

Commit

Permalink
Merge pull request #3 from broadinstitute/dp-zstd
Browse files Browse the repository at this point in the history
more zstd support
  • Loading branch information
dpark01 committed Nov 7, 2019
2 parents e3859eb + 7a326b0 commit 94281be
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 51 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
[![Docker Repository on Quay](https://quay.io/repository/broadinstitute/viral-core/status "Docker Repository on Quay")](https://quay.io/repository/broadinstitute/viral-core)
[![Build Status](https://travis-ci.com/broadinstitute/viral-core.svg?branch=master)](https://travis-ci.com/broadinstitute/viral-core)
[![Coverage Status](https://coveralls.io/repos/github/broadinstitute/viral-core/badge.svg?branch=master)](https://coveralls.io/github/broadinstitute/viral-core?branch=master)
[![Documentation Status](https://readthedocs.org/projects/viral-core/badge/?version=latest)](http://viral-core.readthedocs.io/en/latest/?badge=latest)
<!--
[![Coverage Status](https://coveralls.io/repos/github/broadinstitute/viral-core/badge.svg?branch=master)](https://coveralls.io/github/broadinstitute/viral-core?branch=master)
[![broad-viral-badge](https://img.shields.io/badge/install%20from-broad--viral-green.svg?style=flat-square)](https://anaconda.org/broad-viral/viral-ngs)
[![Documentation Status](https://readthedocs.org/projects/viral-ngs/badge/?version=latest)](http://viral-ngs.readthedocs.io/en/latest/?badge=latest)
[![Code Health](https://landscape.io/github/broadinstitute/viral-ngs/master/landscape.svg?style=flat)](https://landscape.io/github/broadinstitute/viral-ngs)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.252549.svg)](https://doi.org/10.5281/zenodo.252549)
-->

viral-ngs
viral-core
=========

A set of scripts and tools for the analysis of viral NGS data.

More detailed command line documentation can be found at [readthedocs](http://viral-core.readthedocs.org/)

Developer documentation can be found at [here](DEVELOPMENT_NOTES.md)

More detailed documentation can be found at http://viral-ngs.readthedocs.org/
This includes installation instructions,
usage instructions for the command line tools,
and usage of the pipeline infrastructure.
Higher level pipelines and workflows for viral-ngs can be found in the [pipelines github repo](https://github.com/broadinstitute/viral-pipelines).
4 changes: 2 additions & 2 deletions file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def merge_tarballs(out_tarball, in_tarballs, threads=None, extract_to_disk_path=
def parser_merge_tarballs(parser=argparse.ArgumentParser()):
parser.add_argument(
'out_tarball',
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-);
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst|-);
compression is inferred by the file extension.
Note: if "-" is used, output will be written to stdout and
--pipeOutHint must be provided to indicate compression type
when compression type is not gzip (gzip is used by default).
''')
parser.add_argument(
'in_tarballs', nargs='+',
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2)')
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst)')
)
parser.add_argument('--extractToDiskPath',
dest="extract_to_disk_path",
Expand Down
8 changes: 7 additions & 1 deletion requirements-conda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ cd-hit=4.6.8
cd-hit-auxtools=4.6.8
fastqc=0.11.7
gatk=3.8
lbzip2=2.5
lz4-c=1.9.1
mvicuna=1.0
novoalign=3.07.00
parallel=20190922
picard-slim=2.21.1
pigz=2.4
prinseq=0.20.4
#r-base=3.5.1
samtools=1.9
trimmomatic=0.38
unzip=6.0
zstd=1.3.8
# Python packages below
arrow=0.12.1
bedtools=2.28.0
biopython=1.72
matplotlib=2.2.4
pysam=0.15.0
pybedtools=0.7.10
zstandard=0.11.0
88 changes: 47 additions & 41 deletions util/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import contextlib
import os
import gzip
import bz2
import zstd
import io
import tempfile
import subprocess
Expand Down Expand Up @@ -256,7 +258,7 @@ def extract_tarball(tarfile, out_dir=None, threads=None, compression='auto', pip
elif compression == 'lz4':
decompressor = ['lz4', '-d']
elif compression == 'zst':
decompressor = ['zstd', '-d']
decompressor = ['zstd', '-dc']
elif compression == 'none':
decompressor = ['cat']
untar_cmd = ['tar', '-C', out_dir, '-x']
Expand Down Expand Up @@ -328,53 +330,60 @@ def touch_p(path, times=None):
touch(path, times=times)


def open_or_gzopen(fname, *opts, **kwargs):
mode = 'r'
open_opts = list(opts)
@contextlib.contextmanager
def zstd_open(fname, mode='r', **kwargs):
'''Handle both text and byte decompression of the file.'''
if 'r' in mode:
with open(fname, 'rb') as fh:
dctx = zstd.ZstdDecompressor()
stream_reader = dctx.stream_reader(fh)
if 'b' not in mode:
text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8-sig')
yield text_stream
return
yield stream_reader
else:
with open(fname, 'wb') as fh:
cctx = zstd.ZstdCompressor(level=kwargs.get('level', 10),
threads=util.misc.sanitize_thread_count(kwargs.get('threads', None)))
stream_writer = cctx.stream_writer(fh)
if 'b' not in mode:
text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
yield text_stream
return
yield stream_writer

def open_or_gzopen(fname, mode='r', **kwargs):
assert type(mode) == str, "open mode must be of type str"

# 'U' mode is deprecated in py3 and may be unsupported in future versions,
# so use newline=None when 'U' is specified
if len(open_opts) > 0:
mode = open_opts[0]
if sys.version_info[0] == 3:
if 'U' in mode:
if 'newline' not in kwargs:
kwargs['newline'] = None
open_opts[0] = mode.replace("U","")

# if this is a gzip file
if 'U' in mode:
if 'newline' not in kwargs:
kwargs['newline'] = None
mode = mode.replace("U","t")

if fname.endswith('.gz'):
# if text read mode is desired (by spec or default)
if ('b' not in mode) and (len(open_opts)==0 or 'r' in mode):
# if python 2
if sys.version_info[0] == 2:
# gzip.open() under py2 does not support universal newlines
# so we need to wrap it with something that does
# By ignoring errors in BufferedReader, errors should be handled by TextIoWrapper
return io.TextIOWrapper(io.BufferedReader(gzip.open(fname)))

# if 't' for text mode is not explicitly included,
# replace "U" with "t" since under gzip "rb" is the
# default and "U" depends on "rt"
gz_mode = str(mode).replace("U","" if "t" in mode else "t")
gz_opts = [gz_mode]+list(opts)[1:]
return gzip.open(fname, *gz_opts, **kwargs)
# Allow using 'level' kwarg as an alias for gzip files.
if 'level' in kwargs:
kwargs['compresslevel'] = kwargs.pop('level')
return gzip.open(fname, mode=mode, **kwargs)
elif fname.endswith('.bz2'):
return bz2.open(fname, mode=mode, **kwargs)
elif fname.endswith('.zst'):
return zstd_open(fname, mode=mode, **kwargs)
else:
return open(fname, *open_opts, **kwargs)
return open(fname, mode=mode, **kwargs)


def read_tabfile_dict(inFile, header_prefix="#", skip_prefix=None, rowcount_limit=None):
''' Read a tab text file (possibly gzipped) and return contents as an
iterator of dicts.
'''
with open_or_gzopen(inFile, 'rU') as inf:
with open_or_gzopen(inFile, 'rt', encoding='utf-8-sig', newline=None) as inf:
header = None
lines_read=0
for line_no,line in enumerate(inf):
if line_no==0:
# remove BOM, if present
line = line.replace('\ufeff','')
lines_read+=1
row = [item.strip() for item in line.rstrip('\r\n').split('\t')]
# skip empty lines/rows
Expand Down Expand Up @@ -402,17 +411,14 @@ def read_tabfile(inFile):
''' Read a tab text file (possibly gzipped) and return contents as an
iterator of arrays.
'''
with open_or_gzopen(inFile, 'rU') as inf:
with open_or_gzopen(inFile, 'rt', encoding='utf-8-sig', newline=None) as inf:
for line_no,line in enumerate(inf):
if line_no==0:
# remove BOM, if present
line = line.replace('\ufeff','')
if not line.startswith('#'):
yield list(item.strip() for item in line.rstrip('\r\n').split('\t'))


def readFlatFileHeader(filename, headerPrefix='#', delim='\t'):
with open_or_gzopen(filename, 'rt') as inf:
with open_or_gzopen(filename, 'rt', encoding='utf-8-sig', newline=None) as inf:
header = inf.readline().rstrip('\n').split(delim)
if header and header[0].startswith(headerPrefix):
header[0] = header[0][len(headerPrefix):]
Expand Down Expand Up @@ -986,8 +992,8 @@ def choose_compressor(filepath, threads=8):
return_obj["compress_cmd"] = compressor + ["-c"]
elif re.search(r'\.?zst$', filepath):
compressor = ['zstd']
return_obj["decompress_cmd"] = compressor + ["-d"]
return_obj["compress_cmd"] = compressor + ["-19"]
return_obj["decompress_cmd"] = compressor + ["-dc"]
return_obj["compress_cmd"] = compressor + ["-c19", "-T"+str(threads)]
elif re.search(r'\.?tar$', filepath):
compressor = ['cat']
return_obj["decompress_cmd"] = compressor
Expand Down Expand Up @@ -1031,7 +1037,7 @@ def read(self, size):
compressor = choose_compressor(pipe_hint_out)["compress_cmd"]
outfile = None
else:
compressor =choose_compressor(out_compressed_tarball)["compress_cmd"]
compressor = choose_compressor(out_compressed_tarball)["compress_cmd"]
outfile = open(out_compressed_tarball, "w")

out_compress_ps = subprocess.Popen(compressor, stdout=sys.stdout if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE)
Expand Down

0 comments on commit 94281be

Please sign in to comment.