-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Eliminate temporary buffer allocations during bson serialization #1628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a584c9c
d0c95db
2ad8385
0385da6
aa1c2f6
969079d
de504ee
1eaffc2
0ca525d
9c3aae3
9a187bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,14 @@ | |
|
||
import org.bson.ByteBuf; | ||
import org.bson.ByteBufNIO; | ||
import org.bson.types.ObjectId; | ||
|
||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.nio.Buffer; | ||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
import static java.lang.String.format; | ||
|
@@ -32,8 +35,12 @@ | |
* A BSON output stream that stores the output in a single, un-pooled byte array. | ||
*/ | ||
public class BasicOutputBuffer extends OutputBuffer { | ||
private byte[] buffer; | ||
private int position; | ||
|
||
/** | ||
* This ByteBuffer allows us to write ObjectIDs without allocating a temporary array per object, and enables us | ||
* to leverage JVM intrinsics for writing little-endian numeric values. | ||
*/ | ||
private ByteBuffer buffer; | ||
|
||
/** | ||
* Construct an instance with a default initial byte array size. | ||
|
@@ -48,7 +55,8 @@ public BasicOutputBuffer() { | |
* @param initialSize the initial size of the byte array | ||
*/ | ||
public BasicOutputBuffer(final int initialSize) { | ||
buffer = new byte[initialSize]; | ||
// Allocate heap buffer to ensure we can access underlying array | ||
buffer = ByteBuffer.allocate(initialSize).order(LITTLE_ENDIAN); | ||
} | ||
|
||
/** | ||
|
@@ -58,50 +66,76 @@ public BasicOutputBuffer(final int initialSize) { | |
* @since 3.3 | ||
*/ | ||
public byte[] getInternalBuffer() { | ||
return buffer; | ||
return buffer.array(); | ||
} | ||
|
||
@Override | ||
public void write(final byte[] b) { | ||
writeBytes(b, 0, b.length); | ||
} | ||
|
||
@Override | ||
public byte[] toByteArray() { | ||
ensureOpen(); | ||
return Arrays.copyOf(buffer.array(), buffer.position()); | ||
} | ||
|
||
@Override | ||
public void writeInt32(final int value) { | ||
ensureOpen(); | ||
ensure(4); | ||
buffer.putInt(value); | ||
} | ||
|
||
@Override | ||
public void writeInt32(final int position, final int value) { | ||
ensureOpen(); | ||
checkPosition(position, 4); | ||
buffer.putInt(position, value); | ||
} | ||
|
||
@Override | ||
public void writeInt64(final long value) { | ||
ensureOpen(); | ||
ensure(8); | ||
buffer.putLong(value); | ||
} | ||
|
||
@Override | ||
public void writeObjectId(final ObjectId value) { | ||
ensureOpen(); | ||
write(b, 0, b.length); | ||
ensure(12); | ||
value.putToByteBuffer(buffer); | ||
} | ||
|
||
@Override | ||
public void writeBytes(final byte[] bytes, final int offset, final int length) { | ||
ensureOpen(); | ||
|
||
ensure(length); | ||
System.arraycopy(bytes, offset, buffer, position, length); | ||
position += length; | ||
buffer.put(bytes, offset, length); | ||
} | ||
|
||
@Override | ||
public void writeByte(final int value) { | ||
ensureOpen(); | ||
|
||
ensure(1); | ||
buffer[position++] = (byte) (0xFF & value); | ||
buffer.put((byte) (0xFF & value)); | ||
} | ||
|
||
@Override | ||
protected void write(final int absolutePosition, final int value) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, updated in 9a187bd |
||
ensureOpen(); | ||
checkPosition(absolutePosition, 1); | ||
|
||
if (absolutePosition < 0) { | ||
throw new IllegalArgumentException(format("position must be >= 0 but was %d", absolutePosition)); | ||
} | ||
if (absolutePosition > position - 1) { | ||
throw new IllegalArgumentException(format("position must be <= %d but was %d", position - 1, absolutePosition)); | ||
} | ||
|
||
buffer[absolutePosition] = (byte) (0xFF & value); | ||
buffer.put(absolutePosition, (byte) (0xFF & value)); | ||
} | ||
|
||
@Override | ||
public int getPosition() { | ||
ensureOpen(); | ||
return position; | ||
return buffer.position(); | ||
} | ||
|
||
/** | ||
|
@@ -110,29 +144,32 @@ public int getPosition() { | |
@Override | ||
public int getSize() { | ||
ensureOpen(); | ||
return position; | ||
return buffer.position(); | ||
} | ||
|
||
@Override | ||
public int pipe(final OutputStream out) throws IOException { | ||
ensureOpen(); | ||
out.write(buffer, 0, position); | ||
return position; | ||
out.write(buffer.array(), 0, buffer.position()); | ||
return buffer.position(); | ||
} | ||
|
||
@Override | ||
public void truncateToPosition(final int newPosition) { | ||
ensureOpen(); | ||
if (newPosition > position || newPosition < 0) { | ||
if (newPosition > buffer.position() || newPosition < 0) { | ||
throw new IllegalArgumentException(); | ||
} | ||
position = newPosition; | ||
// The cast is required for compatibility with JDK 9+ where ByteBuffer's position method is inherited from Buffer. | ||
((Buffer) buffer).position(newPosition); | ||
Edarke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
public List<ByteBuf> getByteBuffers() { | ||
ensureOpen(); | ||
return Arrays.asList(new ByteBufNIO(ByteBuffer.wrap(buffer, 0, position).duplicate().order(LITTLE_ENDIAN))); | ||
// Create a flipped copy of the buffer for reading. Note that ByteBufNIO overwrites the endian-ness. | ||
ByteBuffer flipped = ByteBuffer.wrap(buffer.array(), 0, buffer.position()); | ||
return Collections.singletonList(new ByteBufNIO(flipped)); | ||
} | ||
|
||
@Override | ||
|
@@ -147,19 +184,32 @@ private void ensureOpen() { | |
} | ||
|
||
private void ensure(final int more) { | ||
int need = position + more; | ||
if (need <= buffer.length) { | ||
int length = buffer.position(); | ||
int need = length + more; | ||
if (need <= buffer.capacity()) { | ||
return; | ||
} | ||
|
||
int newSize = buffer.length * 2; | ||
int newSize = length * 2; | ||
if (newSize < need) { | ||
newSize = need + 128; | ||
} | ||
|
||
byte[] n = new byte[newSize]; | ||
System.arraycopy(buffer, 0, n, 0, position); | ||
buffer = n; | ||
ByteBuffer tmp = ByteBuffer.allocate(newSize).order(LITTLE_ENDIAN); | ||
tmp.put(buffer.array(), 0, length); // Avoids covariant call to flip on jdk8 | ||
this.buffer = tmp; | ||
} | ||
|
||
/** | ||
* Ensures that `absolutePosition` is a valid index in `this.buffer` and there is room to write at | ||
* least `bytesToWrite` bytes. | ||
*/ | ||
private void checkPosition(final int absolutePosition, final int bytesToWrite) { | ||
if (absolutePosition < 0) { | ||
throw new IllegalArgumentException(format("position must be >= 0 but was %d", absolutePosition)); | ||
} | ||
if (absolutePosition > buffer.position() - bytesToWrite) { | ||
throw new IllegalArgumentException(format("position must be <= %d but was %d", buffer.position() - bytesToWrite, absolutePosition)); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.