Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-29193: Allow ZstdByteBuffDecompressor to take direct ByteBuffer as input and heap ByteBuffer as output, or vice versa #6806

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,8 @@ public class ZstdByteBuffDecompressor implements ByteBuffDecompressor, CanReinit

@Override
public boolean canDecompress(ByteBuff output, ByteBuff input) {
if (!allowByteBuffDecompression) {
return false;
}
if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
ByteBuffer nioOutput = output.nioByteBuffers()[0];
ByteBuffer nioInput = input.nioByteBuffers()[0];
if (nioOutput.isDirect() && nioInput.isDirect()) {
return true;
} else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
return true;
}
}

return false;
return allowByteBuffDecompression && output instanceof SingleByteBuff
&& input instanceof SingleByteBuff;
}

@Override
Expand All @@ -80,38 +68,35 @@ private int decompressRaw(ByteBuff output, ByteBuff input, int inputLen) throws
if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
ByteBuffer nioOutput = output.nioByteBuffers()[0];
ByteBuffer nioInput = input.nioByteBuffers()[0];
int origOutputPos = nioOutput.position();
int n;
if (nioOutput.isDirect() && nioInput.isDirect()) {
return decompressDirectByteBuffers(nioOutput, nioInput, inputLen);
n = ctx.decompressDirectByteBuffer(nioOutput, nioOutput.position(),
nioOutput.limit() - nioOutput.position(), nioInput, nioInput.position(), inputLen);
} else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
return decompressHeapByteBuffers(nioOutput, nioInput, inputLen);
n = ctx.decompressByteArray(nioOutput.array(),
nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - nioOutput.position(),
nioInput.array(), nioInput.arrayOffset() + nioInput.position(), inputLen);
} else if (nioOutput.isDirect() && !nioInput.isDirect()) {
n = ctx.decompressByteArrayToDirectByteBuffer(nioOutput, nioOutput.position(),
nioOutput.limit() - nioOutput.position(), nioInput.array(),
nioInput.arrayOffset() + nioInput.position(), inputLen);
} else if (!nioOutput.isDirect() && nioInput.isDirect()) {
n = ctx.decompressDirectByteBufferToByteArray(nioOutput.array(),
nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - nioOutput.position(),
nioInput, nioInput.position(), inputLen);
} else {
throw new IllegalStateException("Unreachable line");
}
}

throw new IllegalStateException("One buffer is direct and the other is not, "
+ "or one or more not SingleByteBuffs. This is not supported");
}

private int decompressDirectByteBuffers(ByteBuffer output, ByteBuffer input, int inputLen) {
int origOutputPos = output.position();
nioOutput.position(origOutputPos + n);
nioInput.position(input.position() + inputLen);

int n = ctx.decompressDirectByteBuffer(output, output.position(),
output.limit() - output.position(), input, input.position(), inputLen);

output.position(origOutputPos + n);
input.position(input.position() + inputLen);
return n;
}

private int decompressHeapByteBuffers(ByteBuffer output, ByteBuffer input, int inputLen) {
int origOutputPos = output.position();

int n = ctx.decompressByteArray(output.array(), output.arrayOffset() + output.position(),
output.limit() - output.position(), input.array(), input.arrayOffset() + input.position(),
inputLen);

output.position(origOutputPos + n);
input.position(input.position() + inputLen);
return n;
return n;
} else {
throw new IllegalStateException(
"At least one buffer is not a SingleByteBuff, this is not supported");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void testCapabilities() {
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
assertTrue(decompressor.canDecompress(emptySingleHeapBuff, emptySingleHeapBuff));
assertTrue(decompressor.canDecompress(emptySingleDirectBuff, emptySingleDirectBuff));
assertFalse(decompressor.canDecompress(emptySingleHeapBuff, emptySingleDirectBuff));
assertFalse(decompressor.canDecompress(emptySingleDirectBuff, emptySingleHeapBuff));
assertTrue(decompressor.canDecompress(emptySingleHeapBuff, emptySingleDirectBuff));
assertTrue(decompressor.canDecompress(emptySingleDirectBuff, emptySingleHeapBuff));
assertFalse(decompressor.canDecompress(emptyMultiHeapBuff, emptyMultiHeapBuff));
assertFalse(decompressor.canDecompress(emptyMultiDirectBuff, emptyMultiDirectBuff));
assertFalse(decompressor.canDecompress(emptySingleHeapBuff, emptyMultiHeapBuff));
Expand All @@ -72,7 +72,7 @@ public void testCapabilities() {
}

@Test
public void testDecompressHeap() throws IOException {
public void testDecompressHeapToHeap() throws IOException {
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
Expand All @@ -83,7 +83,7 @@ public void testDecompressHeap() throws IOException {
}

@Test
public void testDecompressDirect() throws IOException {
public void testDecompressDirectToDirect() throws IOException {
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
ByteBuff input = new SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
Expand All @@ -95,4 +95,28 @@ public void testDecompressDirect() throws IOException {
}
}

@Test
public void testDecompressDirectToHeap() throws IOException {
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
ByteBuff input = new SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
input.put(COMPRESSED_PAYLOAD);
input.rewind();
int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length);
assertEquals("HBase is fun to use and very fast",
Bytes.toString(output.toBytes(0, decompressedSize)));
}
}

@Test
public void testDecompressHeapToDirect() throws IOException {
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length);
assertEquals("HBase is fun to use and very fast",
Bytes.toString(output.toBytes(0, decompressedSize)));
}
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@
<brotli4j.version>1.11.0</brotli4j.version>
<lz4.version>1.8.0</lz4.version>
<snappy.version>1.1.10.4</snappy.version>
<zstd-jni.version>1.5.5-2</zstd-jni.version>
<zstd-jni.version>1.5.7-2</zstd-jni.version>
<!--
Note that the version of protobuf shipped in hbase-thirdparty must match the version used
in hbase-protocol-shaded and hbase-examples. The version of jackson-[annotations,core,
Expand Down