diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java index ec5315aa4c02..d71d46e2946e 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java @@ -55,20 +55,8 @@ public class ZstdByteBuffDecompressor implements ByteBuffDecompressor, CanReinit @Override public boolean canDecompress(ByteBuff output, ByteBuff input) { - if (!allowByteBuffDecompression) { - return false; - } - if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) { - ByteBuffer nioOutput = output.nioByteBuffers()[0]; - ByteBuffer nioInput = input.nioByteBuffers()[0]; - if (nioOutput.isDirect() && nioInput.isDirect()) { - return true; - } else if (!nioOutput.isDirect() && !nioInput.isDirect()) { - return true; - } - } - - return false; + return allowByteBuffDecompression && output instanceof SingleByteBuff + && input instanceof SingleByteBuff; } @Override @@ -80,38 +68,35 @@ private int decompressRaw(ByteBuff output, ByteBuff input, int inputLen) throws if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) { ByteBuffer nioOutput = output.nioByteBuffers()[0]; ByteBuffer nioInput = input.nioByteBuffers()[0]; + int origOutputPos = nioOutput.position(); + int n; if (nioOutput.isDirect() && nioInput.isDirect()) { - return decompressDirectByteBuffers(nioOutput, nioInput, inputLen); + n = ctx.decompressDirectByteBuffer(nioOutput, nioOutput.position(), + nioOutput.limit() - nioOutput.position(), nioInput, nioInput.position(), inputLen); } else if (!nioOutput.isDirect() && !nioInput.isDirect()) { - return decompressHeapByteBuffers(nioOutput, nioInput, inputLen); + n = ctx.decompressByteArray(nioOutput.array(), + nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - nioOutput.position(), + nioInput.array(), nioInput.arrayOffset() + nioInput.position(), inputLen); + } else if (nioOutput.isDirect() && !nioInput.isDirect()) { + n = ctx.decompressByteArrayToDirectByteBuffer(nioOutput, nioOutput.position(), + nioOutput.limit() - nioOutput.position(), nioInput.array(), + nioInput.arrayOffset() + nioInput.position(), inputLen); + } else if (!nioOutput.isDirect() && nioInput.isDirect()) { + n = ctx.decompressDirectByteBufferToByteArray(nioOutput.array(), + nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - nioOutput.position(), + nioInput, nioInput.position(), inputLen); + } else { + throw new IllegalStateException("Unreachable line"); } - } - - throw new IllegalStateException("One buffer is direct and the other is not, " - + "or one or more not SingleByteBuffs. This is not supported"); - } - private int decompressDirectByteBuffers(ByteBuffer output, ByteBuffer input, int inputLen) { - int origOutputPos = output.position(); + nioOutput.position(origOutputPos + n); + nioInput.position(input.position() + inputLen); - int n = ctx.decompressDirectByteBuffer(output, output.position(), - output.limit() - output.position(), input, input.position(), inputLen); - - output.position(origOutputPos + n); - input.position(input.position() + inputLen); - return n; - } - - private int decompressHeapByteBuffers(ByteBuffer output, ByteBuffer input, int inputLen) { - int origOutputPos = output.position(); - - int n = ctx.decompressByteArray(output.array(), output.arrayOffset() + output.position(), - output.limit() - output.position(), input.array(), input.arrayOffset() + input.position(), - inputLen); - - output.position(origOutputPos + n); - input.position(input.position() + inputLen); - return n; + return n; + } else { + throw new IllegalStateException( + "At least one buffer is not a SingleByteBuff, this is not supported"); + } } @Override diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java index be52d17f1332..223bbc021dbf 100644 --- a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java @@ -62,8 +62,8 @@ public void testCapabilities() { try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) { assertTrue(decompressor.canDecompress(emptySingleHeapBuff, emptySingleHeapBuff)); assertTrue(decompressor.canDecompress(emptySingleDirectBuff, emptySingleDirectBuff)); - assertFalse(decompressor.canDecompress(emptySingleHeapBuff, emptySingleDirectBuff)); - assertFalse(decompressor.canDecompress(emptySingleDirectBuff, emptySingleHeapBuff)); + assertTrue(decompressor.canDecompress(emptySingleHeapBuff, emptySingleDirectBuff)); + assertTrue(decompressor.canDecompress(emptySingleDirectBuff, emptySingleHeapBuff)); assertFalse(decompressor.canDecompress(emptyMultiHeapBuff, emptyMultiHeapBuff)); assertFalse(decompressor.canDecompress(emptyMultiDirectBuff, emptyMultiDirectBuff)); assertFalse(decompressor.canDecompress(emptySingleHeapBuff, emptyMultiHeapBuff)); @@ -72,7 +72,7 @@ public void testCapabilities() { } @Test - public void testDecompressHeap() throws IOException { + public void testDecompressHeapToHeap() throws IOException { try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) { ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64)); ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD)); @@ -83,7 +83,7 @@ public void testDecompressHeap() throws IOException { } @Test - public void testDecompressDirect() throws IOException { + public void testDecompressDirectToDirect() throws IOException { try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) { ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64)); ByteBuff input = new SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length)); @@ -95,4 +95,28 @@ public void testDecompressDirect() throws IOException { } } + @Test + public void testDecompressDirectToHeap() throws IOException { + try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) { + ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64)); + ByteBuff input = new SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length)); + input.put(COMPRESSED_PAYLOAD); + input.rewind(); + int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length); + assertEquals("HBase is fun to use and very fast", + Bytes.toString(output.toBytes(0, decompressedSize))); + } + } + + @Test + public void testDecompressHeapToDirect() throws IOException { + try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) { + ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64)); + ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD)); + int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length); + assertEquals("HBase is fun to use and very fast", + Bytes.toString(output.toBytes(0, decompressedSize))); + } + } + } diff --git a/pom.xml b/pom.xml index 220cd936544d..de9ded3a1aa1 100644 --- a/pom.xml +++ b/pom.xml @@ -930,7 +930,7 @@ 1.11.0 1.8.0 1.1.10.4 - 1.5.5-2 + 1.5.7-2