Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11154: Read partial segments from SegmentWriter #1746

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
import org.apache.jackrabbit.oak.segment.RecordWriters.RecordWriter;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
Expand Down Expand Up @@ -156,6 +157,11 @@ public void flush() throws IOException {
writeOperationHandler.flush(store);
}

@Override
public @Nullable PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
return writeOperationHandler.readPartialSegmentState(sid);
}

@NotNull
RecordId writeMap(@Nullable final MapRecord base, @NotNull final Map<String, RecordId> changes) throws IOException {
return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.io.HexDump;
import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry;
import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -65,7 +71,9 @@
* The behaviour of this class is undefined should the pre-allocated buffer be
* overrun be calling any of the write methods.
* <p>
* Instances of this class are <em>not thread safe</em>
* It is safe to call {@link #readPartialSegmentState(SegmentId)} concurrently with the write operations of a single
* writer (including several concurrent calls of this method). However, it is not safe to have concurrent writers,
* notably because the order in which {@code prepare} and {@code writeXYZ} methods are called matters.
*/
public class SegmentBufferWriter implements WriteOperationHandler {

Expand Down Expand Up @@ -150,7 +158,7 @@ public SegmentBufferWriter(@NotNull SegmentIdProvider idProvider,

@NotNull
@Override
public RecordId execute(@NotNull GCGeneration gcGeneration,
public synchronized RecordId execute(@NotNull GCGeneration gcGeneration,
@NotNull WriteOperation writeOperation)
throws IOException {
checkState(gcGeneration.equals(this.gcGeneration));
Expand All @@ -159,7 +167,7 @@ public RecordId execute(@NotNull GCGeneration gcGeneration,

@Override
@NotNull
public GCGeneration getGCGeneration() {
public synchronized GCGeneration getGCGeneration() {
return gcGeneration;
}

Expand Down Expand Up @@ -220,22 +228,22 @@ private void newSegment(SegmentStore store) throws IOException {
dirty = false;
}

public void writeByte(byte value) {
public synchronized void writeByte(byte value) {
position = BinaryUtils.writeByte(buffer, position, value);
dirty = true;
}

public void writeShort(short value) {
public synchronized void writeShort(short value) {
position = BinaryUtils.writeShort(buffer, position, value);
dirty = true;
}

public void writeInt(int value) {
public synchronized void writeInt(int value) {
position = BinaryUtils.writeInt(buffer, position, value);
dirty = true;
}

public void writeLong(long value) {
public synchronized void writeLong(long value) {
position = BinaryUtils.writeLong(buffer, position, value);
dirty = true;
}
Expand All @@ -245,7 +253,7 @@ public void writeLong(long value) {
*
* @param recordId the record ID.
*/
public void writeRecordId(RecordId recordId) {
public synchronized void writeRecordId(RecordId recordId) {
requireNonNull(recordId);
checkState(segmentReferences.size() + 1 < 0xffff,
"Segment cannot have more than 0xffff references");
Expand Down Expand Up @@ -278,7 +286,7 @@ private static String info(Segment segment) {
return info;
}

public void writeBytes(byte[] data, int offset, int length) {
public synchronized void writeBytes(byte[] data, int offset, int length) {
arraycopy(data, offset, buffer, position, length);
position += length;
dirty = true;
Expand Down Expand Up @@ -308,7 +316,7 @@ private String dumpSegmentBuffer() {
* enough space for a record. It can also be called explicitly.
*/
@Override
public void flush(@NotNull SegmentStore store) throws IOException {
public synchronized void flush(@NotNull SegmentStore store) throws IOException {
if (dirty) {
int referencedSegmentIdCount = segmentReferences.size();
BinaryUtils.writeInt(buffer, Segment.REFERENCED_SEGMENT_ID_COUNT_OFFSET, referencedSegmentIdCount);
Expand Down Expand Up @@ -381,7 +389,7 @@ public void flush(@NotNull SegmentStore store) throws IOException {
* @param store the {@code SegmentStore} instance to write full segments to
* @return a new record id
*/
public RecordId prepare(RecordType type, int size, Collection<RecordId> ids, SegmentStore store) throws IOException {
public synchronized RecordId prepare(RecordType type, int size, Collection<RecordId> ids, SegmentStore store) throws IOException {
checkArgument(size >= 0);
requireNonNull(ids);

Expand Down Expand Up @@ -459,4 +467,65 @@ public RecordId prepare(RecordType type, int size, Collection<RecordId> ids, Seg
return new RecordId(segment.getSegmentId(), recordNumber);
}

@Override
public synchronized @Nullable PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
if (segment == null || !segment.getSegmentId().equals(sid)) {
return null;
}

byte version = SegmentVersion.asByte(LATEST_VERSION);
int generation = gcGeneration.getGeneration();
int fullGeneration = gcGeneration.getFullGeneration();
boolean isCompacted = gcGeneration.isCompacted();

List<PartialSegmentState.SegmentReference> segmentReferencesList = StreamSupport.stream(segmentReferences.spliterator(), false)
.map(segmentId -> new PartialSegmentState.SegmentReference(segmentId.getMostSignificantBits(), segmentId.getLeastSignificantBits()))
.collect(Collectors.toUnmodifiableList());

List<PartialSegmentState.Record> records = getCurrentRecords();

return new PartialSegmentState(
version,
generation,
fullGeneration,
isCompacted,
segmentReferencesList,
records
);
}

/**
* Get the current records in the buffer, in descending order of their offset.
*
* <p>
* The contents of the record currently being written to can be incomplete. In this case,
* {@link PartialSegmentState.Record#contents()} will only contain the data that has been written so far.
*/
private @NotNull List<PartialSegmentState.Record> getCurrentRecords() {
List<PartialSegmentState.Record> result = new ArrayList<>();

Entry previousEntry = null;
for (Entry entry : recordNumbers) {
int currentRecordStart = entry.getOffset();

// Record in recordNumbers are sorted in descending order of offset
assert previousEntry == null || previousEntry.getOffset() >= currentRecordStart;

int nextRecordStart = previousEntry == null ? MAX_SEGMENT_SIZE : previousEntry.getOffset();
boolean isPartiallyWrittenRecord = position >= currentRecordStart;
int currentRecordEnd = isPartiallyWrittenRecord ? position : nextRecordStart;
result.add(
new PartialSegmentState.Record(
entry.getRecordNumber(),
entry.getType(),
currentRecordStart,
Arrays.copyOfRange(buffer, currentRecordStart, currentRecordEnd)
)
);

previousEntry = entry;
}

return List.copyOf(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -33,6 +34,19 @@ public interface SegmentWriter {

void flush() throws IOException;

/**
* Get the {@link PartialSegmentState partial state} of a segment
* that has started being written to but hasn’t been flushed yet.
*
* @param sid The ID of the segment
* @return The partial state or {@code null} if no partial state was found for the given segment ID.
* @throws UnsupportedOperationException if reading partial segment states is not supported.
*/
@Nullable
default PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
throw new UnsupportedOperationException("Trying to read partial segment state from a SegmentWriter that doesn’t support it.");
}

/**
* Write a blob (as list of block records)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import java.io.IOException;

import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.jetbrains.annotations.NotNull;

import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.jetbrains.annotations.Nullable;

/**
* A {@code WriteOperationHandler} executes {@link WriteOperation
Expand Down Expand Up @@ -73,4 +75,10 @@ RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation wri
* @throws IOException
*/
void flush(@NotNull SegmentStore store) throws IOException;

/** @see SegmentWriter#readPartialSegmentState(SegmentId) */
@Nullable
default PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
throw new UnsupportedOperationException("Trying to read partial segment state from a WriteOperationHandler that doesn’t support it.");
}
}
Loading