Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,22 @@ default RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext
@Nonnull ScanProperties scanProperties) {
throw new UnsupportedOperationException("exportAllData is not supported");
}

/**
* Imports the provided data exported via {@link #exportAllData} into this {@code KeySpacePath}.
* This will validate that any data provided in {@code dataToImport} has a path that should be in this path,
* or one of the sub-directories, if not the future will complete exceptionally with
* {@link RecordCoreIllegalImportDataException}.
* If there is any data already existing under this path, the new data will overwrite if the keys are the same.
* This will use the logical values in the {@link DataInKeySpacePath#getPath()} and
* {@link DataInKeySpacePath#getRemainder()} to determine the key, rather
* than the raw key, meaning that this will work even if the data was exported from a different cluster.
* @param context the transaction context in which to save the data
* @param dataToImport the data to be saved to the database
* @return a future to be completed once all data has been important.
*/
@API(API.Status.EXPERIMENTAL)
@Nonnull
CompletableFuture<Void> importData(@Nonnull FDBRecordContext context,
@Nonnull Iterable<DataInKeySpacePath> dataToImport);
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,42 @@ public RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext
1);
}

@Nonnull
@Override
public CompletableFuture<Void> importData(@Nonnull FDBRecordContext context,
@Nonnull Iterable<DataInKeySpacePath> dataToImport) {
return toTupleAsync(context).thenCompose(targetTuple -> {
List<CompletableFuture<Void>> importFutures = new ArrayList<>();

for (DataInKeySpacePath dataItem : dataToImport) {
CompletableFuture<Void> importFuture = dataItem.getPath().toTupleAsync(context).thenCompose(itemPathTuple -> {
// Validate that this data belongs under this path
if (!TupleHelpers.isPrefix(targetTuple, itemPathTuple)) {
throw new RecordCoreIllegalImportDataException(
"Data item path does not belong under target path",
"target", targetTuple, "item", itemPathTuple);
}

// Reconstruct the key using the path and remainder
Tuple keyTuple = itemPathTuple;
if (dataItem.getRemainder() != null) {
keyTuple = keyTuple.addAll(dataItem.getRemainder());
}

// Store the data
byte[] keyBytes = keyTuple.pack();
byte[] valueBytes = dataItem.getValue();
context.ensureActive().set(keyBytes, valueBytes);

return AsyncUtil.DONE;
});
importFutures.add(importFuture);
}

return AsyncUtil.whenAll(importFutures);
});
}

/**
* Returns this path properly wrapped in whatever implementation the directory the path is contained in dictates.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,11 @@ public RecordCursor<DataInKeySpacePath> exportAllData(@Nonnull FDBRecordContext
@Nonnull ScanProperties scanProperties) {
return inner.exportAllData(context, continuation, scanProperties);
}

@Nonnull
@Override
public CompletableFuture<Void> importData(@Nonnull FDBRecordContext context,
@Nonnull Iterable<DataInKeySpacePath> dataToImport) {
return inner.importData(context, dataToImport);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* RecordCoreIllegalImportDataException.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.provider.foundationdb.keyspace;

import com.apple.foundationdb.record.RecordCoreArgumentException;

import javax.annotation.Nonnull;

public class RecordCoreIllegalImportDataException extends RecordCoreArgumentException {
private static final long serialVersionUID = 1L;

public RecordCoreIllegalImportDataException(@Nonnull final String msg, @Nonnull final Object... keyValue) {
super(msg, keyValue);
}
}

Check warning on line 33 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/RecordCoreIllegalImportDataException.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/RecordCoreIllegalImportDataException.java#L27-L33

`RecordCoreIllegalImportDataException` has inheritance depth of 5 which is deeper than maximum of 2 https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3578%2FScottDugas%2Fkeyspace-import%3AHEAD&id=3C3B0BC4FA83FF44B4E2CE5FBF7DBFF5
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -69,7 +68,7 @@
final FDBDatabaseExtension dbExtension = new FDBDatabaseExtension();

@Test
void exportAllDataFromSimplePath() throws ExecutionException, InterruptedException {
void exportAllDataFromSimplePath() {
KeySpace root = new KeySpace(
new KeySpaceDirectory("root", KeyType.STRING, UUID.randomUUID().toString())
.addSubdirectory(new KeySpaceDirectory("level1", KeyType.LONG)));
Expand All @@ -83,13 +82,14 @@

// Add data at different levels
for (int i = 0; i < 5; i++) {
Tuple key = basePath.add("level1", (long) i).toTuple(context);
final KeySpacePath path = basePath.add("level1", (long)i);
Tuple key = path.toTuple(context);
tr.set(key.pack(), Tuple.from("value" + i).pack());

// Add some sub-data under each key
for (int j = 0; j < 3; j++) {
Tuple subKey = key.add("sub" + j);
tr.set(subKey.pack(), Tuple.from("subvalue" + i + "_" + j).pack());
tr.set(path.toSubspace(context).pack(Tuple.from("sub" + j)),
Tuple.from("subvalue" + i + "_" + j).pack());
}
}
context.commit();
Expand All @@ -103,16 +103,19 @@
// Should have 5 main entries + 15 sub-entries = 20 total
assertEquals(20, allData.size());

assertThat(allData)
.allSatisfy(data ->
assertThat(data.getPath().getDirectoryName()).isEqualTo("level1"));

// Verify the data is sorted by key
for (int i = 1; i < allData.size(); i++) {
assertTrue(getKey(allData.get(i - 1), context).compareTo(getKey(allData.get(i), context)) < 0);
}
assertThat(allData.stream().map(data -> getKey(data, context)).collect(Collectors.toList()))
.isSorted();
}
}

// `toTuple` does not include the remainder, I'm not sure if that is intentional, or an oversight.
private Tuple getKey(final DataInKeySpacePath dataInKeySpacePath, final FDBRecordContext context) throws ExecutionException, InterruptedException {
final ResolvedKeySpacePath resolvedKeySpacePath = dataInKeySpacePath.getPath().toResolvedPathAsync(context).get();
private Tuple getKey(final DataInKeySpacePath dataInKeySpacePath, final FDBRecordContext context) {
final ResolvedKeySpacePath resolvedKeySpacePath = dataInKeySpacePath.getPath().toResolvedPathAsync(context).join();
if (dataInKeySpacePath.getRemainder() != null) {
return resolvedKeySpacePath.toTuple().addAll(dataInKeySpacePath.getRemainder());
} else {
Expand Down Expand Up @@ -524,9 +527,7 @@
final RecordCursor<DataInKeySpacePath> cursor = pathToExport.exportAllData(context, continuation.toBytes(),
scanProperties);
final AtomicReference<RecordCursorResult<Tuple>> tupleResult = new AtomicReference<>();
final List<Tuple> batch = cursor.map(dataInPath -> {
return Tuple.fromBytes(dataInPath.getValue());
}).asList(tupleResult).join();
final List<Tuple> batch = cursor.map(dataInPath -> Tuple.fromBytes(dataInPath.getValue())).asList(tupleResult).join();
actual.add(batch);
continuation = tupleResult.get().getContinuation();
}
Expand Down Expand Up @@ -578,7 +579,7 @@
}

@Test
void exportAllDataThroughKeySpacePathWrapperResolvedPaths() {
void exportAllDataThroughKeySpacePathWrapperRemainders() {
final FDBDatabase database = dbExtension.getDatabase();
final EnvironmentKeySpace keySpace = EnvironmentKeySpace.setupSampleData(database);

Expand Down Expand Up @@ -635,7 +636,7 @@
final List<DataInKeySpacePath> reversed = pathToExport.exportAllData(context, null, ScanProperties.REVERSE_SCAN)
.asList().join();
Collections.reverse(reversed);
assertDataInKeySpacePathEquals(asSingleExport, reversed);
assertDataInKeySpacePathEquals(context, asSingleExport, reversed);

// Assert continuations work correctly
final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(props -> props.setReturnedRowLimit(1));
Expand All @@ -655,11 +656,12 @@
}
}

assertDataInKeySpacePathEquals(asSingleExport, asContinuations);
assertDataInKeySpacePathEquals(context, asSingleExport, asContinuations);
return asSingleExport;
}

private static void assertDataInKeySpacePathEquals(final List<DataInKeySpacePath> expectedList,
private static void assertDataInKeySpacePathEquals(final FDBRecordContext context,

Check warning on line 663 in fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/KeySpacePathDataExportTest.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/keyspace/KeySpacePathDataExportTest.java#L663

The value of parameter `context` is never used https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3578%2FScottDugas%2Fkeyspace-import%3AHEAD&id=E6563484246B65E3FC66083FDA52033E
final List<DataInKeySpacePath> expectedList,
final List<DataInKeySpacePath> actualList) {
assertThat(actualList).zipSatisfy(expectedList,
(actual, other) -> {
Expand Down
Loading
Loading