Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Timeout;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Microbenchmark of Variant serialization, looking at shredding and nesting.
*
* <pre>
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=VariantSerializationBenchmark
* </pre>
*/
@Fork(0)
@State(Scope.Benchmark)
@Warmup(iterations = 100)
@Measurement(iterations = 200)
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
public class VariantSerializationBenchmark {

/** Whether to use deep or shallow. Used over a boolean for meaningful reports. */
public enum Depth {
Shallow,
Nested
}

private static final Logger LOG = LoggerFactory.getLogger(VariantSerializationBenchmark.class);

/** Number of fields in each nested variant object. */
private static final int NESTED_FIELD_COUNT = 5;

/** Percentage of fields that are shredded (0 = none, 100 = all). */
@Param({"0", "33", "67", "100"})
private int shreddedPercent;

/**
* Total number of fields in the variant object. Must be at least as big as {@link
* #NESTED_FIELD_COUNT}.
*/
@Param({"1000", "10000"})
private int fieldCount;

/** Whether to include nested variant objects in the field values. */
@Param({"Shallow", "Nested"})
private Depth depth;

/** Metadata covering every field name (shredded + unshredded). */
private VariantMetadata metadata;

/** Ordered list of all field names. */
private List<String> fieldNames;

/** Typed values for each field. Index i corresponds to {@code fieldNames.get(i)}. */
private VariantValue[] fieldValues;

/**
* Number of fields that are placed into the shredded portion of the object, derived from {@link
* #shreddedPercent} and {@link #fieldCount}.
*/
private int shreddedFieldCount;

/**
* Pre-serialized object containing the unshredded fields, or {@code null} when all fields are
* shredded.
*/
private VariantObject unshreddedObject;

/** Recycled output buffer. Must be large enough for the largest value of {@link #fieldCount}. */
private ByteBuffer outputBuffer;

/** For benchmarking cost of writing, ignoring creation cost. */
private ShreddedObject preShreddedObject;

/** Pre-shredded object marshalled to a byte buffer. */
private ByteBuffer marshalledVariant;

/** Drives choice of field types and more. */
private Random random;

/**
* Set up the benchmark state for the current parameter combination.
*
* <p>Builds {@link #fieldNames} and {@link #fieldValues} with a random mix of ints, doubles,
* strings of different sizes, and UUIDs. When {@link #depth} is {@link Depth#Nested}, null values
* are patched with nested variant objects containing string fields. The shredded/unshredded split
* and pre-serialized buffers are then constructed from these values.
*/
@Setup(Level.Trial)
public void setupBench() {
LOG.info(
"Setting up benchmark shreddedPercent={}% fields={} depth={}",
shreddedPercent, fieldCount, depth);
// fixed random for reproducibility; other benchmarks are a mix of fixed and time-based seeds.
random = new Random(0x1ceb1cebL);

fieldNames = Lists.newArrayListWithCapacity(fieldCount);
for (int i = 0; i < fieldCount; i++) {
fieldNames.add("field_" + i);
}
ByteBuffer metaBuf = VariantTestUtil.createMetadata(fieldNames, true /* sorted */);
metadata = VariantMetadata.from(metaBuf);
final boolean nested = depth == Depth.Nested;

// construct the field values such that any nested variants are composed of strings
// with the same field names as the top-level entries.
List<Integer> nullFields = Lists.newArrayList();
List<Integer> stringFields = Lists.newArrayList();

// a factory which creates variants; by adding more string functions
// it is biased towards strings.
List<Function<Integer, VariantValue>> variantFactory = Lists.newArrayList();
variantFactory.add(i -> Variants.of("string-" + i));
variantFactory.add(i -> Variants.of("longer string-" + i));
variantFactory.add(
i -> Variants.of("a longer string assuming these will be more common #" + i));
variantFactory.add(i -> Variants.of('a'));
variantFactory.add(i -> Variants.of(random.nextInt()));
variantFactory.add(i -> Variants.of(random.nextDouble()));
// as an example of a byte sequence other than string.
variantFactory.add(i -> Variants.ofUUID(UUID.randomUUID()));
if (nested) {
// on a deep variant, this will be replaced by a nested type later
variantFactory.add(i -> Variants.ofNull());
} else {
// add a string on shallow to keep the distribution of other values similar.
// leaving the null made serialization faster.
variantFactory.add(i -> Variants.of(Integer.toString(i)));
}
final int factorySize = variantFactory.size();
fieldValues = new VariantValue[fieldCount];

// build the field values.
for (int i = 0; i < fieldCount; i++) {
final VariantValue value = variantFactory.get(random.nextInt(factorySize)).apply(i);
fieldValues[i] = value;
if (value.type() == PhysicalType.STRING) {
stringFields.add(i);
} else if (value.type() == PhysicalType.NULL) {
nullFields.add(i);
}
}

// now, on a nested run patch null fields with a nested variant
if (nested) {
Preconditions.checkState(!stringFields.isEmpty(), "No string fields generated");
nullFields.forEach(index -> fieldValues[index] = buildNestedValue(index, stringFields));
}

// set up the shredding
shreddedFieldCount = fieldCount * shreddedPercent / 100;
if (shreddedFieldCount < fieldCount) {
unshreddedObject = buildSerializedObject(shreddedFieldCount, fieldCount, metaBuf);
} else {
unshreddedObject = null;
}

// build a pre-shredded object for serialization only tests
preShreddedObject = buildShreddedObject();
// use its size for buffer allocation
final int size = preShreddedObject.sizeInBytes();
// this buffer is recycled in benchmarks to avoid memory access interference
outputBuffer = ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);

// a marshalled object to measure deserialization performance
marshalledVariant = ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);
preShreddedObject.writeTo(marshalledVariant, 0);

LOG.info("Setup complete");
}

/**
* Build a nested variant object containing {@link #NESTED_FIELD_COUNT} string fields, reusing
* field names from the top-level field list.
*/
private VariantValue buildNestedValue(int index, List<Integer> stringFields) {
ShreddedObject nested = Variants.object(metadata);

final int stringCount = stringFields.size();

for (int j = 0; j < NESTED_FIELD_COUNT; j++) {
nested.put(
fieldNames.get(stringFields.get(random.nextInt(stringCount))),
Variants.of("nested_" + index + "_" + j));
}
return nested;
}

/**
* Serializes a subset of fields into a {@link VariantObject} backed by a {@link ByteBuffer} so it
* can be used as the unshredded remainder.
*
* @param from inclusive start index into {@link #fieldNames}
* @param to exclusive end index into {@link #fieldNames}
* @param metaBuf the shared metadata buffer
*/
private VariantObject buildSerializedObject(int from, int to, ByteBuffer metaBuf) {
LOG.info("serialize {}-{}", from, to);
ImmutableMap.Builder<String, VariantValue> builder = ImmutableMap.builder();
for (int i = from; i < to; i++) {
builder.put(fieldNames.get(i), fieldValues[i]);
}
Map<String, VariantValue> fields = builder.build();
ByteBuffer valueBuf = VariantTestUtil.createObject(metaBuf, fields);
return (VariantObject) Variants.value(metadata, valueBuf);
}

/**
* Build a shredded object from the benchmark's current fields.
*
* @return a new shredded object.
*/
private ShreddedObject buildShreddedObject() {
ShreddedObject shredded =
unshreddedObject == null
? Variants.object(metadata)
: Variants.object(metadata, unshreddedObject);

for (int i = 0; i < shreddedFieldCount; i++) {
shredded.put(fieldNames.get(i), fieldValues[i]);
}
return shredded;
}

/**
* Full write path: create a {@link ShreddedObject} from scratch, populate all shredded fields,
* write it.
*/
// @Benchmark
public void buildAndSerialize(Blackhole bh) {
ShreddedObject shredded = buildShreddedObject();
// create the serialization state
int size = shredded.sizeInBytes();
// write
outputBuffer.clear();
shredded.writeTo(outputBuffer, 0);
// feed to the black hole
bh.consume(size);
bh.consume(outputBuffer);
}

/**
* Build a shredded object and create serialization state, do not write. Measures cost of building
* SerializationState independent of the writing to a buffer.
*/
@Benchmark
public void build(Blackhole bh) {
ShreddedObject shredded = buildShreddedObject();
// create the serialization state
bh.consume(shredded.sizeInBytes());
}

/**
* Serialize-only path: reuse a pre-built {@link ShreddedObject} and measure only the cost of
* {@link ShreddedObject#sizeInBytes()} and {@link ShreddedObject#writeTo(ByteBuffer, int)}.
*/
@Benchmark
public void serialize(Blackhole bh) {
outputBuffer.clear();
preShreddedObject.writeTo(outputBuffer, 0);
bh.consume(outputBuffer);
}

@Benchmark
public void deserialize(Blackhole bh) {
VariantObject parsed = (VariantObject) Variants.value(metadata, marshalledVariant);
for (String name : fieldNames) {
bh.consume(parsed.get(name));
}
}
}