Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public final class Platform {

public static final int DOUBLE_ARRAY_OFFSET;

// Field offset for java.nio.Buffer.address — used to read the native memory address
// of a DirectByteBuffer without going through sun.nio.ch.DirectBuffer.
private static final long DIRECT_BUFFER_ADDRESS_OFFSET;

private static final boolean unaligned;

// Split java.version on non-digit chars:
Expand Down Expand Up @@ -230,6 +234,14 @@ public static ByteBuffer allocateDirectBuffer(int size) {
throw new IllegalStateException("unreachable");
}

/**
* Returns the native memory address of a direct {@link ByteBuffer}.
* The buffer must be direct; passing a heap buffer produces an undefined result.
*/
public static long getDirectBufferAddress(ByteBuffer buffer) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this pr is accepted, a corresponding benchmark can be added to PlatformBenchmark at a later time.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It'd be great to have some benchmark results!

return _UNSAFE.getLong(buffer, DIRECT_BUFFER_ADDRESS_OFFSET);
}

public static void setMemory(Object object, long offset, long size, byte value) {
_UNSAFE.setMemory(object, offset, size, value);
}
Expand Down Expand Up @@ -296,6 +308,12 @@ public static void throwException(Throwable t) {
LONG_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(long[].class);
FLOAT_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(float[].class);
DOUBLE_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(double[].class);
try {
DIRECT_BUFFER_ADDRESS_OFFSET =
_UNSAFE.objectFieldOffset(java.nio.Buffer.class.getDeclaredField("address"));
} catch (NoSuchFieldException e) {
throw new IllegalStateException(e);
}
} else {
BOOLEAN_ARRAY_OFFSET = 0;
BYTE_ARRAY_OFFSET = 0;
Expand All @@ -304,6 +322,7 @@ public static void throwException(Throwable t) {
LONG_ARRAY_OFFSET = 0;
FLOAT_ARRAY_OFFSET = 0;
DOUBLE_ARRAY_OFFSET = 0;
DIRECT_BUFFER_ADDRESS_OFFSET = 0;
}
}

Expand Down
218 changes: 115 additions & 103 deletions core/benchmarks/PlatformBenchmark-jdk21-results.txt

Large diffs are not rendered by default.

264 changes: 138 additions & 126 deletions core/benchmarks/PlatformBenchmark-jdk25-results.txt

Large diffs are not rendered by default.

212 changes: 112 additions & 100 deletions core/benchmarks/PlatformBenchmark-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.unsafe

import java.nio.ByteBuffer

import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}

/**
Expand Down Expand Up @@ -55,6 +57,7 @@ object PlatformBenchmark extends BenchmarkBase {
runFloatAccess(count8m, iterations)
runDoubleAccess(count8m, iterations)
runBooleanAccess(count8m, iterations)
runDirectBufferAddressAccess(count8m, iterations)

val counts = Seq((count4k, str4k), (count16k, str16k), (count256k, str256k),
(count1m, str1m), (count8m, str8m), (count32m, str32m))
Expand Down Expand Up @@ -441,6 +444,52 @@ object PlatformBenchmark extends BenchmarkBase {
}
}

private def runDirectBufferAddressAccess(count: Long, iterations: Long): Unit = {
val addresses = new Array[Long](count.toInt)
var j = 0
while (j < count) {
addresses(j) = Platform.allocateMemory(8)
j += 1
}
val buffers = new Array[ByteBuffer](count.toInt)
j = 0
while (j < count) {
buffers(j) = ByteBuffer.allocateDirect(1)
j += 1
}
val mask = count - 1
try {
runBenchmark("Platform DirectBuffer Address Access") {
val benchmark = new Benchmark("DirectBuffer Address Access", iterations, output = output)

benchmark.addCase("getLong (baseline)") { _ =>
var i = 0L
var sum = 0L
while (i < iterations) {
sum += Platform.getLong(null, addresses((i & mask).toInt))
i += 1
}
}

benchmark.addCase("getDirectBufferAddress") { _ =>
var i = 0L
var sum = 0L
while (i < iterations) {
sum += Platform.getDirectBufferAddress(buffers((i & mask).toInt))
i += 1
}
}
benchmark.run()
}
} finally {
j = 0
while (j < count) {
Platform.freeMemory(addresses(j))
j += 1
}
}
}

private def runBulkOperations(count: Long, countStr: String): Unit = {
val size = count * 4
val address = Platform.allocateMemory(size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,9 @@ public final void readBinary(int total, WritableColumnVector v, int rowId) {
if (buffer.hasArray()) {
v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), len);
} else {
byte[] bytes = new byte[len];
buffer.get(bytes);
v.putByteArray(rowId + i, bytes);
// Copy directly from the ByteBuffer into the column vector's backing storage,
// bypassing any intermediate byte[] allocation.
v.putByteArray(rowId + i, buffer, buffer.position(), len);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId, count);
}

@Override
public void putBytes(int rowId, int count, ByteBuffer src, int srcIndex) {
if (src.hasArray()) {
Platform.copyMemory(src.array(), Platform.BYTE_ARRAY_OFFSET + src.arrayOffset() + srcIndex,
null, data + rowId, count);
} else if (src.isDirect()) {
long srcAddr = Platform.getDirectBufferAddress(src) + srcIndex;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hasArray() does not necessarily mean the buffer is a direct buffer. We can perhaps strengthen this by also check src.isDirect?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d2e8bb3 address this

Platform.copyMemory(null, srcAddr, null, data + rowId, count);
} else {
// Fallback for non-heap, non-direct buffers (e.g., read-only wrappers).
byte[] tmp = new byte[count];
src.get(srcIndex, tmp, 0, count);
Platform.copyMemory(tmp, Platform.BYTE_ARRAY_OFFSET, null, data + rowId, count);
}
}

@Override
public byte getByte(int rowId) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
System.arraycopy(src, srcIndex, byteData, rowId, count);
}

@Override
public void putBytes(int rowId, int count, ByteBuffer src, int srcIndex) {
// Absolute bulk get: single copy, does not modify src's position.
src.get(srcIndex, byteData, rowId, count);
}

@Override
public byte getByte(int rowId) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ public void putBooleans(int rowId, int count, byte src, int srcIndex) {
*/
public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);

/**
* Copies {@code count} bytes from a {@link ByteBuffer} starting at absolute position
* {@code srcIndex} into this column at {@code rowId}. Does not modify the buffer's position.
*/
public abstract void putBytes(int rowId, int count, ByteBuffer src, int srcIndex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway we can add some test cases for this new API?


/**
* Sets `value` to the value at rowId.
*/
Expand Down Expand Up @@ -435,6 +441,25 @@ public final int putByteArray(int rowId, byte[] value) {
return putByteArray(rowId, value, 0, value.length);
}

/**
* Stores bytes from a {@link ByteBuffer} as a variable-length byte array at {@code rowId}.
* Copies {@code length} bytes starting at absolute position {@code srcPosition} in the buffer.
* Does not modify the buffer's position.
*/
public final int putByteArray(int rowId, ByteBuffer src, int srcPosition, int length) {
int result = arrayData().appendBytes(length, src, srcPosition);
putArray(rowId, result, length);
return result;
}

final int appendBytes(int length, ByteBuffer src, int srcPosition) {
reserve(elementsAppended + length);
int result = elementsAppended;
putBytes(elementsAppended, length, src, srcPosition);
elementsAppended += length;
return result;
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.vectorized

import java.nio.ByteBuffer

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.YearUDT
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
Expand Down Expand Up @@ -262,6 +264,71 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
}
}

testVectors("putByteArray from ByteBuffer", 10, BinaryType) { testVector =>
def verifyPutByteArray(testVector: WritableColumnVector): Unit = {
(0 until 10).foreach { i =>
assert(testVector.getBinary(i) === s"str$i".getBytes("utf8"))
}
}

// Heap ByteBuffer
(0 until 10).foreach { i =>
val bytes = s"str$i".getBytes("utf8")
testVector.putByteArray(i, ByteBuffer.wrap(bytes), 0, bytes.length)
}
verifyPutByteArray(testVector)

// Direct ByteBuffer
testVector.reset()
(0 until 10).foreach { i =>
val bytes = s"str$i".getBytes("utf8")
val buf = ByteBuffer.allocateDirect(bytes.length)
buf.put(bytes)
testVector.putByteArray(i, buf, 0, bytes.length)
}
verifyPutByteArray(testVector)

// Read-only ByteBuffer (hasArray=false, isDirect=false)
testVector.reset()
(0 until 10).foreach { i =>
val bytes = s"str$i".getBytes("utf8")
val buf = ByteBuffer.wrap(bytes).asReadOnlyBuffer()
testVector.putByteArray(i, buf, 0, bytes.length)
}
verifyPutByteArray(testVector)
}

testVectors("putBytes from ByteBuffer", 16, ByteType) { testVector =>
Copy link
Copy Markdown
Contributor Author

@LuciferYang LuciferYang Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao Added new tests for both new APIs.

val data = Array[Byte](10, 20, 30, 40, 50, 60, 70, 80)

// Heap ByteBuffer
testVector.putBytes(0, data.length, ByteBuffer.wrap(data), 0)
(0 until data.length).foreach { i =>
assert(testVector.getByte(i) === data(i))
}

// Direct ByteBuffer
val directBuf = ByteBuffer.allocateDirect(data.length)
directBuf.put(data)
testVector.putBytes(0, data.length, directBuf, 0)
(0 until data.length).foreach { i =>
assert(testVector.getByte(i) === data(i))
}

// Read-only ByteBuffer (hasArray=false, isDirect=false)
val readOnlyBuf = ByteBuffer.wrap(data).asReadOnlyBuffer()
testVector.putBytes(0, data.length, readOnlyBuf, 0)
(0 until data.length).foreach { i =>
assert(testVector.getByte(i) === data(i))
}

// With srcIndex offset
testVector.putBytes(0, 4, ByteBuffer.wrap(data), 4)
(0 until 4).foreach { i =>
assert(testVector.getByte(i) === data(i + 4))
}
}

DataTypeTestUtils.yearMonthIntervalTypes.foreach {
dt =>
testVectors(dt.typeName,
Expand Down