From 98c2438dbd86869e5ac2638ecd5a61021ced9c7e Mon Sep 17 00:00:00 2001 From: Louis Bergelson Date: Thu, 8 Jun 2017 10:49:10 -0400 Subject: [PATCH] overloading BlockCompressedInputStream.checkTerminator to support NIO (#890) * adding overloads of BlockCompressedInputStream.checkTerminator to support java.nio.Path and SeekableByteChannel * adding additional tests to BlockCompressedTerminatorTest --- .../util/BlockCompressedInputStream.java | 102 ++++++++++++++---- .../util/BlockCompressedTerminatorTest.java | 87 +++++++++++++-- 2 files changed, 155 insertions(+), 34 deletions(-) diff --git a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java index 066a0c0016..e108d1bb3c 100755 --- a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java +++ b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java @@ -32,16 +32,15 @@ import htsjdk.samtools.seekablestream.SeekableStream; import htsjdk.samtools.util.zip.InflaterFactory; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; +import java.io.*; import java.net.URL; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Arrays; -import java.util.zip.Inflater; /** * Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream. @@ -587,41 +586,98 @@ private int unpackInt32(final byte[] buffer, final int offset) { public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE} + /** + * + * @param file the file to check + * @return status of the last compressed block + * @throws IOException + */ public static FileTermination checkTermination(final File file) throws IOException { - final long fileSize = file.length(); + return checkTermination(file == null ? null : file.toPath()); + } + + /** + * + * @param path to the file to check + * @return status of the last compressed block + * @throws IOException + */ + public static FileTermination checkTermination(final Path path) throws IOException { + try( final SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ) ){ + return checkTermination(channel); + } + } + + /** + * check the status of the final bzgipped block for the given bgzipped resource + * + * @param channel an open channel to read from, + * the channel will remain open and the initial position will be restored when the operation completes + * this makes no guarantee about the state of the channel if an exception is thrown during reading + * + * @return the status of the last compressed black + * @throws IOException + */ + public static FileTermination checkTermination(SeekableByteChannel channel) throws IOException { + final long fileSize = channel.size(); if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) { return FileTermination.DEFECTIVE; } - final RandomAccessFile raFile = new RandomAccessFile(file, "r"); + final long initialPosition = channel.position(); + boolean exceptionThrown = false; try { - raFile.seek(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); - byte[] buf = new byte[BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length]; - raFile.readFully(buf); - if (Arrays.equals(buf, BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { + channel.position(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + + //Check if the end of the file is an empty gzip block which is used as the terminator for a bgzipped file + final ByteBuffer lastBlockBuffer = ByteBuffer.allocate(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + readFully(channel, lastBlockBuffer); + if (Arrays.equals(lastBlockBuffer.array(), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { return FileTermination.HAS_TERMINATOR_BLOCK; } - final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); - buf = new byte[bufsize]; - raFile.seek(fileSize - bufsize); - raFile.read(buf); - for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; - i >= 0; --i) { + + //if the last block isn't an empty gzip block, check to see if it is a healthy compressed block or if it's corrupted + final int bufsize = (int) Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); + final byte[] bufferArray = new byte[bufsize]; + channel.position(fileSize - bufsize); + readFully(channel, ByteBuffer.wrap(bufferArray)); + for (int i = bufferArray.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; + i >= 0; --i) { if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE, - buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { + bufferArray, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { continue; } - final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4); + final ByteBuffer byteBuffer = ByteBuffer.wrap(bufferArray, + i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, + 4); byteBuffer.order(ByteOrder.LITTLE_ENDIAN); - final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; - if (buf.length - i == totalBlockSizeMinusOne + 1) { + final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; + if (bufferArray.length - i == totalBlockSizeMinusOne + 1) { return FileTermination.HAS_HEALTHY_LAST_BLOCK; } else { return FileTermination.DEFECTIVE; } } return FileTermination.DEFECTIVE; + } catch (final Throwable e) { + exceptionThrown = true; + throw e; } finally { - raFile.close(); + //if an exception was thrown we don't want to reset the position because that would be likely to throw again + //and suppress the initial exception + if(!exceptionThrown) { + channel.position(initialPosition); + } + } + } + + /** + * read as many bytes as dst's capacity into dst or throw if that's not possible + * @throws EOFException if channel has fewer bytes available than dst's capacity + */ + static void readFully(SeekableByteChannel channel, ByteBuffer dst) throws IOException { + final int bytesRead = channel.read(dst); + if (bytesRead < dst.capacity()){ + throw new EOFException(); } } diff --git a/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java b/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java index d9d20ccef0..4a14bd9204 100644 --- a/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java +++ b/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java @@ -23,38 +23,103 @@ */ package htsjdk.samtools.util; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; import htsjdk.HtsjdkTest; +import htsjdk.samtools.SeekableByteChannelFromBuffer; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.EOFException; import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; /** * @author alecw@broadinstitute.org */ public class BlockCompressedTerminatorTest extends HtsjdkTest { private static final File TEST_DATA_DIR = new File("src/test/resources/htsjdk/samtools/util"); + private static final File DEFECTIVE = new File(TEST_DATA_DIR, "defective_bgzf.bam"); + private static final File NO_TERMINATOR = new File(TEST_DATA_DIR, "no_bgzf_terminator.bam"); - @Test - public void testFileWithTerminator() throws Exception { + @DataProvider + public Object[][] getFiles() throws IOException { + return new Object[][]{ + {getValidCompressedFile(), BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK}, + {NO_TERMINATOR, BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK}, + {DEFECTIVE, BlockCompressedInputStream.FileTermination.DEFECTIVE} + }; + } + + @Test( dataProvider = "getFiles") + public void testCheckTerminationForFiles(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFile), expected); + } + + @Test( dataProvider = "getFiles") + public void testCheckTerminationForPaths(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + try(FileSystem fs = Jimfs.newFileSystem("test", Configuration.unix())){ + final Path compressedFileInJimfs = Files.copy(compressedFile.toPath(), fs.getPath("something")); + Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFileInJimfs), expected); + } + } + + @Test( dataProvider = "getFiles") + public void testCheckTerminationForSeekableByteChannels(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){ + Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected); + } + } + + @Test(dataProvider = "getFiles") + public void testChannelPositionIsRestored(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + final long position = 50; + try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){ + channel.position(position); + Assert.assertEquals(channel.position(), position); + Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected); + Assert.assertEquals(channel.position(), position); + } + } + + private static File getValidCompressedFile() throws IOException { final File tmpCompressedFile = File.createTempFile("test.", ".bgzf"); tmpCompressedFile.deleteOnExit(); final BlockCompressedOutputStream os = new BlockCompressedOutputStream(tmpCompressedFile); os.write("Hi, Mom!\n".getBytes()); os.close(); - Assert.assertEquals(BlockCompressedInputStream.checkTermination(tmpCompressedFile), - BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK); + return tmpCompressedFile; } @Test - public void testValidFileWithoutTerminator() throws Exception { - Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "no_bgzf_terminator.bam")), - BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK); + public void testReadFullyReadsBytesCorrectly() throws IOException { + try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){ + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + Assert.assertTrue(channel.size() > readBuffer.capacity()); + BlockCompressedInputStream.readFully(channel, readBuffer); + + ByteBuffer expected = ByteBuffer.allocate(10); + channel.position(0).read(expected); + Assert.assertEquals(readBuffer.array(), expected.array()); + } } - @Test - public void testDefectiveFile() throws Exception { - Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "defective_bgzf.bam")), - BlockCompressedInputStream.FileTermination.DEFECTIVE); + @Test(expectedExceptions = EOFException.class) + public void testReadFullyThrowWhenItCantReadEnough() throws IOException { + try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){ + final ByteBuffer readBuffer = ByteBuffer.allocate(1000); + Assert.assertTrue(channel.size() < readBuffer.capacity()); + BlockCompressedInputStream.readFully(channel, readBuffer); + } } + + + }