Skip to content

Commit

Permalink
overloading BlockCompressedInputStream.checkTerminator to support NIO (
Browse files Browse the repository at this point in the history
…#890)

* adding overloads of BlockCompressedInputStream.checkTerminator to support java.nio.Path and SeekableByteChannel
* adding additional tests to BlockCompressedTerminatorTest
  • Loading branch information
lbergelson authored Jun 8, 2017
1 parent dcd20ff commit 98c2438
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 34 deletions.
102 changes: 79 additions & 23 deletions src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@
import htsjdk.samtools.seekablestream.SeekableStream;
import htsjdk.samtools.util.zip.InflaterFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.io.*;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.zip.Inflater;

/**
* Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream.
Expand Down Expand Up @@ -587,41 +586,98 @@ private int unpackInt32(final byte[] buffer, final int offset) {

public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE}

/**
*
* @param file the file to check
* @return status of the last compressed block
* @throws IOException
*/
public static FileTermination checkTermination(final File file) throws IOException {
final long fileSize = file.length();
return checkTermination(file == null ? null : file.toPath());
}

/**
*
* @param path to the file to check
* @return status of the last compressed block
* @throws IOException
*/
public static FileTermination checkTermination(final Path path) throws IOException {
try( final SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ) ){
return checkTermination(channel);
}
}

/**
* check the status of the final bzgipped block for the given bgzipped resource
*
* @param channel an open channel to read from,
* the channel will remain open and the initial position will be restored when the operation completes
* this makes no guarantee about the state of the channel if an exception is thrown during reading
*
* @return the status of the last compressed black
* @throws IOException
*/
public static FileTermination checkTermination(SeekableByteChannel channel) throws IOException {
final long fileSize = channel.size();
if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) {
return FileTermination.DEFECTIVE;
}
final RandomAccessFile raFile = new RandomAccessFile(file, "r");
final long initialPosition = channel.position();
boolean exceptionThrown = false;
try {
raFile.seek(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
byte[] buf = new byte[BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length];
raFile.readFully(buf);
if (Arrays.equals(buf, BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) {
channel.position(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);

//Check if the end of the file is an empty gzip block which is used as the terminator for a bgzipped file
final ByteBuffer lastBlockBuffer = ByteBuffer.allocate(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
readFully(channel, lastBlockBuffer);
if (Arrays.equals(lastBlockBuffer.array(), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) {
return FileTermination.HAS_TERMINATOR_BLOCK;
}
final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
buf = new byte[bufsize];
raFile.seek(fileSize - bufsize);
raFile.read(buf);
for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
i >= 0; --i) {

//if the last block isn't an empty gzip block, check to see if it is a healthy compressed block or if it's corrupted
final int bufsize = (int) Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE);
final byte[] bufferArray = new byte[bufsize];
channel.position(fileSize - bufsize);
readFully(channel, ByteBuffer.wrap(bufferArray));
for (int i = bufferArray.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length;
i >= 0; --i) {
if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE,
buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) {
bufferArray, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) {
continue;
}
final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4);
final ByteBuffer byteBuffer = ByteBuffer.wrap(bufferArray,
i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length,
4);
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF;
if (buf.length - i == totalBlockSizeMinusOne + 1) {
final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF;
if (bufferArray.length - i == totalBlockSizeMinusOne + 1) {
return FileTermination.HAS_HEALTHY_LAST_BLOCK;
} else {
return FileTermination.DEFECTIVE;
}
}
return FileTermination.DEFECTIVE;
} catch (final Throwable e) {
exceptionThrown = true;
throw e;
} finally {
raFile.close();
//if an exception was thrown we don't want to reset the position because that would be likely to throw again
//and suppress the initial exception
if(!exceptionThrown) {
channel.position(initialPosition);
}
}
}

/**
* read as many bytes as dst's capacity into dst or throw if that's not possible
* @throws EOFException if channel has fewer bytes available than dst's capacity
*/
static void readFully(SeekableByteChannel channel, ByteBuffer dst) throws IOException {
final int bytesRead = channel.read(dst);
if (bytesRead < dst.capacity()){
throw new EOFException();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,103 @@
*/
package htsjdk.samtools.util;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import htsjdk.HtsjdkTest;
import htsjdk.samtools.SeekableByteChannelFromBuffer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;

/**
* @author [email protected]
*/
public class BlockCompressedTerminatorTest extends HtsjdkTest {
private static final File TEST_DATA_DIR = new File("src/test/resources/htsjdk/samtools/util");
private static final File DEFECTIVE = new File(TEST_DATA_DIR, "defective_bgzf.bam");
private static final File NO_TERMINATOR = new File(TEST_DATA_DIR, "no_bgzf_terminator.bam");

@Test
public void testFileWithTerminator() throws Exception {
@DataProvider
public Object[][] getFiles() throws IOException {
return new Object[][]{
{getValidCompressedFile(), BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK},
{NO_TERMINATOR, BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK},
{DEFECTIVE, BlockCompressedInputStream.FileTermination.DEFECTIVE}
};
}

@Test( dataProvider = "getFiles")
public void testCheckTerminationForFiles(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFile), expected);
}

@Test( dataProvider = "getFiles")
public void testCheckTerminationForPaths(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
try(FileSystem fs = Jimfs.newFileSystem("test", Configuration.unix())){
final Path compressedFileInJimfs = Files.copy(compressedFile.toPath(), fs.getPath("something"));
Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFileInJimfs), expected);
}
}

@Test( dataProvider = "getFiles")
public void testCheckTerminationForSeekableByteChannels(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){
Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected);
}
}

@Test(dataProvider = "getFiles")
public void testChannelPositionIsRestored(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException {
final long position = 50;
try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){
channel.position(position);
Assert.assertEquals(channel.position(), position);
Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected);
Assert.assertEquals(channel.position(), position);
}
}

private static File getValidCompressedFile() throws IOException {
final File tmpCompressedFile = File.createTempFile("test.", ".bgzf");
tmpCompressedFile.deleteOnExit();
final BlockCompressedOutputStream os = new BlockCompressedOutputStream(tmpCompressedFile);
os.write("Hi, Mom!\n".getBytes());
os.close();
Assert.assertEquals(BlockCompressedInputStream.checkTermination(tmpCompressedFile),
BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK);
return tmpCompressedFile;
}

@Test
public void testValidFileWithoutTerminator() throws Exception {
Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "no_bgzf_terminator.bam")),
BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK);
public void testReadFullyReadsBytesCorrectly() throws IOException {
try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){
final ByteBuffer readBuffer = ByteBuffer.allocate(10);
Assert.assertTrue(channel.size() > readBuffer.capacity());
BlockCompressedInputStream.readFully(channel, readBuffer);

ByteBuffer expected = ByteBuffer.allocate(10);
channel.position(0).read(expected);
Assert.assertEquals(readBuffer.array(), expected.array());
}
}

@Test
public void testDefectiveFile() throws Exception {
Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "defective_bgzf.bam")),
BlockCompressedInputStream.FileTermination.DEFECTIVE);
@Test(expectedExceptions = EOFException.class)
public void testReadFullyThrowWhenItCantReadEnough() throws IOException {
try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){
final ByteBuffer readBuffer = ByteBuffer.allocate(1000);
Assert.assertTrue(channel.size() < readBuffer.capacity());
BlockCompressedInputStream.readFully(channel, readBuffer);
}
}



}

0 comments on commit 98c2438

Please sign in to comment.