diff --git a/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java b/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java index 70cb533..ced7fb6 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java +++ b/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java @@ -364,6 +364,9 @@ private long getUInt(final int idx) { private short getUByte(final int idx) { return (short)((short)buf.get(idx) & 0xff); } + private short getUShort(final int idx) { + return (short)(buf.getShort(idx) & 0xffff); + } public static void main(String[] args) throws IOException { final GenericOptionsParser parser; diff --git a/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java b/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java index 6f96b39..5c19bb3 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java +++ b/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java @@ -6,11 +6,14 @@ import java.nio.ByteOrder; import org.apache.hadoop.io.IOUtils; -class BaseSplitGuesser { +public class BaseSplitGuesser { - protected final static int BGZF_MAGIC = 0x04088b1f; - protected final static int BGZF_MAGIC_SUB = 0x00024342; - protected final static int BGZF_SUB_SIZE = 4 + 2; + private final static int BGZF_MAGIC_0 = 0x1f; + private final static int BGZF_MAGIC_1 = 0x8b; + private final static int BGZF_MAGIC_2 = 0x08; + private final static int BGZF_MAGIC_3 = 0x04; + protected final static int BGZF_MAGIC = 0x04088b1f; + private final static int BGZF_MAGIC_SUB = 0x00024342; protected SeekableStream in; protected final ByteBuffer buf; @@ -29,85 +32,60 @@ protected static class PosSize { // Gives the compressed size on the side. Returns null if it doesn't find // anything. protected PosSize guessNextBGZFPos(int p, int end) { - try { for (;;) { - for (;;) { - in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 4); - int n = buf.getInt(0); - - if (n == BGZF_MAGIC) - break; - - // Skip ahead a bit more than 1 byte if you can. - if (n >>> 8 == BGZF_MAGIC << 8 >>> 8) - ++p; - else if (n >>> 16 == BGZF_MAGIC << 16 >>> 16) - p += 2; - else - p += 3; - - if (p >= end) - return null; - } - // Found what looks like a gzip block header: now get XLEN and - // search for the BGZF subfield. - final int p0 = p; - p += 10; - in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 2); - p += 2; - final int xlen = getUShort(0); - final int subEnd = p + xlen; - - while (p < subEnd) { - IOUtils.readFully(in, buf.array(), 0, 4); - - if (buf.getInt(0) != BGZF_MAGIC_SUB) { - p += 4 + getUShort(2); - in.seek(p); - continue; - } - - // Found it: this is close enough to a BGZF block, make it - // our guess. - - // But find out the size before returning. First, grab bsize: - // we'll need it later. - IOUtils.readFully(in, buf.array(), 0, 2); - int bsize = getUShort(0); - - // Then skip the rest of the subfields. - p += BGZF_SUB_SIZE; - while (p < subEnd) { + try { + while(true) { + boolean found_block_start = false; + boolean in_magic = false; in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 4); - p += 4 + getUShort(2); - } - if (p != subEnd) { - // Cancel our guess because the xlen field didn't match the - // data. - break; - } - - // Now skip past the compressed data and the CRC-32. - p += bsize - xlen - 19 + 4; - in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 4); - return new PosSize(p0, buf.getInt(0)); + while(!found_block_start) { + int n = in.read(); + + if (n == BGZF_MAGIC_0) { + in_magic = true; + } else if (n == BGZF_MAGIC_3 && in_magic) { + found_block_start = true; + } else if (p >= end) { + return null; + } else if (!((n == BGZF_MAGIC_1 && in_magic) || + (n == BGZF_MAGIC_2 && in_magic))) { + in_magic = false; + } + p++; + } + + // after the magic number: + // skip 6 unspecified bytes (MTIME, XFL, OS) + // XLEN = 6 (little endian, so 0x0600) + // SI1 = 0x42 + // SI2 = 0x43 + // SLEN = 0x02 + in.seek(p + 6); + int n = in.read(); + if (0x06 != n) { + continue; + } + n = in.read(); + if (0x00 != n) { + continue; + } + n = in.read(); + if (0x42 != n) { + continue; + } + n = in.read(); + if (0x43 != n) { + continue; + } + n = in.read(); + if (0x02 != n) { + continue; + } + int blockSize = (in.read() << 8) + in.read(); + + return new PosSize(p - 4, blockSize); } - // No luck: look for the next gzip block header. Start right after - // where we last saw the identifiers, although we could probably - // safely skip further ahead. (If we find the correct one right - // now, the previous block contained 0x1f8b0804 bytes of data: that - // seems... unlikely.) - p = p0 + 4; - - }} catch (IOException e) { + } catch (IOException e) { return null; } } - - protected int getUShort(final int idx) { - return (int)buf.getShort(idx) & 0xffff; - } } diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java index 669e2ac..e453fb9 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -11,6 +12,7 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.seqdoop.hadoop_bam.util.WrapSeekable; /** * A Hadoop {@link CompressionCodec} for the @@ -56,7 +58,7 @@ public Compressor createCompressor() { @Override public SplitCompressionInputStream createInputStream(InputStream seekableIn, Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException { - BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn); + BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(new WrapSeekable((FSDataInputStream)seekableIn, end, null)); long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end); ((Seekable)seekableIn).seek(adjustedStart); return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end); diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java index 04112a7..0df0c77 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java @@ -4,6 +4,7 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -11,6 +12,7 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.seqdoop.hadoop_bam.util.WrapSeekable; /** * A Hadoop {@link CompressionCodec} for the @@ -66,7 +68,7 @@ public int read() throws IOException { } }; } - BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn); + BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(new WrapSeekable((FSDataInputStream)seekableIn, end, null)); long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end); ((Seekable)seekableIn).seek(adjustedStart); return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end); diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java index 09eedbb..044fea2 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java @@ -36,6 +36,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.seqdoop.hadoop_bam.util.WrapSeekable; + /** An {@link org.apache.hadoop.mapreduce.InputFormat} for BGZF-compressed * files. * @@ -130,9 +132,7 @@ private int addProbabilisticSplits( throws IOException { final Path path = ((FileSplit)splits.get(i)).getPath(); - final FSDataInputStream in = path.getFileSystem(cfg).open(path); - - final BGZFSplitGuesser guesser = new BGZFSplitGuesser(in); + final BGZFSplitGuesser guesser = new BGZFSplitGuesser(WrapSeekable.openPath(cfg, path)); FileSplit fspl; do { @@ -149,7 +149,6 @@ private int addProbabilisticSplits( ++i; } while (i < splits.size() && fspl.getPath().equals(path)); - in.close(); return i; } diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java index 9835ff5..b944930 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java @@ -22,6 +22,7 @@ package org.seqdoop.hadoop_bam.util; +import htsjdk.samtools.seekablestream.SeekableStream; import htsjdk.samtools.seekablestream.ByteArraySeekableStream; import java.io.IOException; import java.io.InputStream; @@ -34,30 +35,14 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; -public class BGZFSplitGuesser { - private InputStream inFile; - private Seekable seekableInFile; - private ByteArraySeekableStream in; - private final ByteBuffer buf; +import org.seqdoop.hadoop_bam.BaseSplitGuesser; - private final static int BGZF_MAGIC = 0x04088b1f; - private final static int BGZF_MAGIC_SUB = 0x00024342; - private final static int BGZF_SUB_SIZE = 4 + 2; +public class BGZFSplitGuesser extends BaseSplitGuesser { - public BGZFSplitGuesser(InputStream is) { - inFile = is; - seekableInFile = (Seekable) is; - - buf = ByteBuffer.allocate(8); - buf.order(ByteOrder.LITTLE_ENDIAN); - } + private SeekableStream inFile; - public BGZFSplitGuesser(FSDataInputStream is) { + public BGZFSplitGuesser(SeekableStream is) { inFile = is; - seekableInFile = is; - - buf = ByteBuffer.allocate(8); - buf.order(ByteOrder.LITTLE_ENDIAN); } /// Looks in the range [beg,end). Returns end if no BAM record was found. @@ -69,17 +54,23 @@ public long guessNextBGZFBlockStart(long beg, long end) // the previous one, we need 0xfffe bytes for the start, and then 0xffff // for the block we're looking for. - byte[] arr = new byte[2*0xffff - 1]; - - this.seekableInFile.seek(beg); - int totalRead = 0; - for (int left = Math.min((int)(end - beg), arr.length); left > 0;) { - final int r = inFile.read(arr, totalRead, left); - if (r < 0) - break; - totalRead += r; - left -= r; - } + byte[] arr = new byte[0xffff]; + + this.inFile.seek(beg); + int bytesToRead = Math.min((int) (end - beg), arr.length); + int totalRead = 0; + // previously, this code did a single read and assumed that if it did not + // return an error code, then it had filled the array. however, when an + // incomplete copy occurs, the split picker will try to read past the end + // of the bucket, which will lead to the split picker returning an error + // code (-1), which gets mishandled elsewhere... + while(totalRead < bytesToRead) { + int read = inFile.read(arr, totalRead, bytesToRead - totalRead); + if (read == -1) { + return -1; // EOF + } + totalRead += read; + } arr = Arrays.copyOf(arr, totalRead); this.in = new ByteArraySeekableStream(arr); @@ -90,11 +81,13 @@ public long guessNextBGZFBlockStart(long beg, long end) final int firstBGZFEnd = Math.min((int)(end - beg), 0xffff); + PosSize p; for (int pos = 0;;) { - pos = guessNextBGZFPos(pos, firstBGZFEnd); - if (pos < 0) + p = guessNextBGZFPos(pos, firstBGZFEnd); + if (p == null) return end; - + pos = p.pos; + try { // Seek in order to trigger decompression of the block and a CRC // check. @@ -110,64 +103,4 @@ public long guessNextBGZFBlockStart(long beg, long end) return beg + pos; } } - - // Returns a negative number if it doesn't find anything. - private int guessNextBGZFPos(int p, int end) - throws IOException - { - for (;;) { - for (;;) { - in.seek(p); - in.read(buf.array(), 0, 4); - int n = buf.getInt(0); - - if (n == BGZF_MAGIC) - break; - - // Skip ahead a bit more than 1 byte if you can. - if (n >>> 8 == BGZF_MAGIC << 8 >>> 8) - ++p; - else if (n >>> 16 == BGZF_MAGIC << 16 >>> 16) - p += 2; - else - p += 3; - - if (p >= end) - return -1; - } - // Found what looks like a gzip block header: now get XLEN and - // search for the BGZF subfield. - final int p0 = p; - p += 10; - in.seek(p); - in.read(buf.array(), 0, 2); - p += 2; - final int xlen = getUShort(0); - final int subEnd = p + xlen; - - while (p < subEnd) { - in.read(buf.array(), 0, 4); - - if (buf.getInt(0) != BGZF_MAGIC_SUB) { - p += 4 + getUShort(2); - in.seek(p); - continue; - } - - // Found it: this is close enough to a BGZF block, make it - // our guess. - return p0; - } - // No luck: look for the next gzip block header. Start right after - // where we last saw the identifiers, although we could probably - // safely skip further ahead. (If we find the correct one right - // now, the previous block contained 0x1f8b0804 bytes of data: that - // seems... unlikely.) - p = p0 + 4; - } - } - - private int getUShort(final int idx) { - return (int)buf.getShort(idx) & 0xffff; - } } diff --git a/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java b/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java index d500cdd..1eda5bc 100644 --- a/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java +++ b/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java @@ -14,7 +14,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.seqdoop.hadoop_bam.util.BGZFSplitGuesser; - +import org.seqdoop.hadoop_bam.util.WrapSeekable; import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) @@ -42,7 +42,7 @@ public void test() throws IOException { Configuration conf = new Configuration(); Path path = new Path(file.toURI()); FSDataInputStream fsDataInputStream = path.getFileSystem(conf).open(path); - BGZFSplitGuesser bgzfSplitGuesser = new BGZFSplitGuesser(fsDataInputStream); + BGZFSplitGuesser bgzfSplitGuesser = new BGZFSplitGuesser(WrapSeekable.openPath(conf, path)); LinkedList boundaries = new LinkedList<>(); long start = 1; while (true) {