Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void deleteAllFiles() {
*/
public Stream<Row> streamRows(List<String> files) {
return files.stream()
.flatMap(this::getRowsOrThrow);
.flatMap(file -> getRowsOrThrow(file).stream());
}

/**
Expand All @@ -82,10 +82,17 @@ public CloseableIterator<Row> openFile(String filename) {
return new WrappedIterator<>(getRowsOrThrow(filename).iterator());
}

private Stream<Row> getRowsOrThrow(String filename) throws NoSuchElementException {
/**
* Retrieves the list of rows in a data file, or throws an exception if it does not exist.
*
* @param filename the filename
* @return the rows
* @throws NoSuchElementException if the file does not exist
*/
public List<Row> getRowsOrThrow(String filename) throws NoSuchElementException {
if (!rowsByFilename.containsKey(filename)) {
throw new NoSuchElementException("File not found: " + filename);
}
return rowsByFilename.get(filename).stream();
return rowsByFilename.get(filename);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public static Supplier<String> supplyNumberedIdsWithPrefix(String prefix) {
.iterator()::next;
}

/**
* Creates a supplier of IDs. The IDs will be generated with a format to include an autoincrementing number.
*
* @param format the format string to include a number
* @return the supplier
*/
public static Supplier<String> supplyNumberedIdsWithFormat(String format) {
return IntStream.iterate(1, i -> i + 1)
.mapToObj(i -> String.format(format, i))
.iterator()::next;
}

/**
* Creates a supplier of IDs that would usually be generated with UUID.randomUUID. The IDs will be generated with
* each number embedded in the digits of the UUID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import sleeper.compaction.core.job.CompactionJobSerDe;
import sleeper.compaction.core.job.commit.CompactionCommitMessage;
import sleeper.compaction.core.job.commit.CompactionCommitMessageSerDe;
import sleeper.compaction.core.job.creation.CreateCompactionJobBatches;
import sleeper.compaction.core.job.creation.CreateCompactionJobs;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequest;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequestSerDe;
Expand Down Expand Up @@ -130,6 +131,12 @@ public void forceCreateJobs() {
});
}

@Override
public void createJobBatches(List<CompactionJob> jobs) {
CreateCompactionJobBatches createBatches = AwsCreateCompactionJobs.batchesFrom(instance.getInstanceProperties(), instance.getTablePropertiesProvider(), s3Client, sqsClient);
createBatches.createBatches(instance.getTableProperties(), instance.getStateStore(), jobs);
}

@Override
public void scaleToZero() {
EC2Scaler.create(instance.getInstanceProperties(), asClient, ec2Client).scaleTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,47 @@
import org.apache.parquet.hadoop.ParquetReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;

import sleeper.core.iterator.closeable.CloseableIterator;
import sleeper.core.row.Row;
import sleeper.core.schema.Schema;
import sleeper.core.table.TableFilePaths;
import sleeper.core.util.S3Filename;
import sleeper.parquet.row.ParquetReaderIterator;
import sleeper.parquet.row.RowReadSupport;
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.instance.DataFilesDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.systemtest.dsl.util.DataFileDuplication;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

public class AwsDataFilesDriver implements DataFilesDriver {
public static final Logger LOGGER = LoggerFactory.getLogger(AwsDataFilesDriver.class);

private Configuration hadoopConf;
private final S3AsyncClient s3AsyncClient;
private final Configuration hadoopConf;
private final TableFilePaths filePaths;

public AwsDataFilesDriver(SystemTestClients clients) {
hadoopConf = clients.createHadoopConf();
public AwsDataFilesDriver(SystemTestInstanceContext instance, SystemTestClients clients) {
this(clients.getS3Async(), clients.createHadoopConf(),
TableFilePaths.buildDataFilePathPrefix(instance.getInstanceProperties(), instance.getTableProperties()));
}

public AwsDataFilesDriver(S3AsyncClient s3AsyncClient, Configuration hadoopConf, TableFilePaths filePaths) {
this.s3AsyncClient = s3AsyncClient;
this.hadoopConf = hadoopConf;
this.filePaths = filePaths;
}

@Override
Expand All @@ -54,4 +76,41 @@ public CloseableIterator<Row> getRows(Schema schema, String filename) {
}
}

@Override
public List<DataFileDuplication> duplicateFiles(int times, Collection<String> files) {
LOGGER.info("Duplicating {} files", files.size());
List<DataFileDuplication> duplications = files.stream()
.map(file -> new DataFileDuplication(file, generateDuplicateFilePaths(times)))
.toList();
CompletableFuture<?>[] futures = duplications.stream()
.flatMap(this::performDuplication)
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
LOGGER.info("Duplicated {} files", files.size());
return duplications;
}

private Stream<CompletableFuture<CopyObjectResponse>> performDuplication(DataFileDuplication duplication) {
S3Filename original = S3Filename.parse(duplication.originalFilename());
return duplication.newFilenames().stream()
.map(newPath -> copy(original, S3Filename.parse(newPath)));
}

private CompletableFuture<CopyObjectResponse> copy(S3Filename original, S3Filename copy) {
return s3AsyncClient.copyObject(CopyObjectRequest.builder()
.sourceBucket(original.bucketName())
.destinationBucket(copy.bucketName())
.sourceKey(original.objectKey())
.destinationKey(copy.objectKey())
.build());
}

private List<String> generateDuplicateFilePaths(int numPaths) {
List<String> paths = new ArrayList<>(numPaths);
for (int i = 0; i < numPaths; i++) {
paths.add(filePaths.constructPartitionParquetFilePath("duplicate", UUID.randomUUID().toString()));
}
return paths;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void sendJobWithFiles(String jobId, String... files) {
"--platform", "EMRServerless",
"--jobid", jobId,
"--files"),
Stream.of(files).map(sourceFiles::ingestJobFileInBucket))
Stream.of(files).map(filename -> sourceFiles.lastFolderWrittenTo().ingestJobFileInBucket(filename)))
.toArray(String[]::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void sendJobWithFiles(String jobId, String... files) {
"--table", instance.getTableName(),
"--jobid", jobId,
"--files"),
Stream.of(files).map(sourceFiles::ingestJobFileInBucket))
Stream.of(files).map(filename -> sourceFiles.lastFolderWrittenTo().ingestJobFileInBucket(filename)))
.toArray(String[]::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public PurgeQueueDriver purgeQueues(SystemTestContext context) {

@Override
public DataFilesDriver dataFiles(SystemTestContext context) {
return new AwsDataFilesDriver(clients);
return new AwsDataFilesDriver(context.instance(), clients);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class SystemTestClients {
private final Supplier<DataFusionAwsConfig> dataFusionAwsConfig;
private final Supplier<Map<String, String>> getAuthEnvVars;
private final UnaryOperator<Configuration> configureHadoop;
private final boolean skipAssumeRole;

private SystemTestClients(Builder builder) {
credentialsProvider = builder.credentialsProvider;
Expand All @@ -104,7 +103,6 @@ private SystemTestClients(Builder builder) {
dataFusionAwsConfig = builder.dataFusionAwsConfig;
getAuthEnvVars = builder.getAuthEnvVars;
configureHadoop = builder.configureHadoop;
skipAssumeRole = builder.skipAssumeRole;
}

public static Builder builder() {
Expand Down Expand Up @@ -136,9 +134,6 @@ public static SystemTestClients fromDefaults() {
}

public SystemTestClients assumeRole(AssumeSleeperRole assumeRole) {
if (skipAssumeRole) {
return this;
}
AssumeSleeperRoleAwsSdk aws = assumeRole.forAwsSdk(sts);
AssumeSleeperRoleHadoop hadoop = assumeRole.forHadoop();
return builder()
Expand Down Expand Up @@ -294,7 +289,6 @@ public static class Builder {
private Supplier<DataFusionAwsConfig> dataFusionAwsConfig;
private Supplier<Map<String, String>> getAuthEnvVars = Map::of;
private UnaryOperator<Configuration> configureHadoop = conf -> conf;
private boolean skipAssumeRole = false;

private Builder() {
}
Expand Down Expand Up @@ -413,11 +407,6 @@ public Builder configureHadoopSetter(Consumer<Configuration> configureHadoop) {
return this;
}

public Builder skipAssumeRole(boolean skipAssumeRole) {
this.skipAssumeRole = skipAssumeRole;
return this;
}

public SystemTestClients build() {
return new SystemTestClients(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,47 @@
*/
package sleeper.systemtest.drivers.instance;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import sleeper.core.iterator.closeable.CloseableIterator;
import sleeper.core.row.Row;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.StringType;
import sleeper.core.statestore.FileReference;
import sleeper.localstack.test.SleeperLocalStackClients;
import sleeper.parquet.row.ParquetReaderIterator;
import sleeper.parquet.row.ParquetRowReaderFactory;
import sleeper.systemtest.drivers.testutil.LocalStackDslTest;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.SystemTestContext;
import sleeper.systemtest.dsl.util.DataFileDuplications;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.LongStream;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.core.schema.SchemaTestHelper.createSchemaWithKey;
import static sleeper.systemtest.drivers.testutil.LocalStackTestInstance.LOCALSTACK_MAIN;
import static sleeper.systemtest.dsl.util.SystemTestSchema.DEFAULT_SCHEMA;

@LocalStackDslTest
public class AwsDataFilesDriverIT {

@BeforeEach
void setUp(SleeperSystemTest sleeper) {
sleeper.connectToInstanceAddOnlineTable(LOCALSTACK_MAIN);
sleeper.connectToInstanceNoTables(LOCALSTACK_MAIN);
}

@Test
void shouldReadFile(SleeperSystemTest sleeper) throws Exception {
// Given
sleeper.tables().create("test", DEFAULT_SCHEMA);
sleeper.sourceFiles().inDataBucket()
.createWithNumberedRows("test.parquet", LongStream.of(1, 3, 2));
sleeper.ingest().toStateStore().addFileOnEveryPartition("test.parquet", 3);
Expand All @@ -54,4 +71,41 @@ void shouldReadFile(SleeperSystemTest sleeper) throws Exception {
});
}

@Test
void shouldDuplicateFiles(SleeperSystemTest sleeper, SystemTestContext context) {
// Given
sleeper.tables().create("test", createSchemaWithKey("key", new StringType()));
sleeper.ingest().toStateStore()
.addFileOnPartition("file-1.parquet", "root", new Row(Map.of("key", "value-1")))
.addFileOnPartition("file-2.parquet", "root", new Row(Map.of("key", "value-2")));

// When
DataFileDuplications duplications = sleeper.ingest().toStateStore().duplicateFilesOnSamePartitions(1);

// Then
List<FileReference> results = duplications.streamNewReferences().toList();
assertThat(results).hasSize(2);
assertThat(readRows(context, results.get(0)))
.containsExactly(new Row(Map.of("key", "value-1")));
assertThat(readRows(context, results.get(1)))
.containsExactly(new Row(Map.of("key", "value-2")));
}

private List<Row> readRows(SystemTestContext context, FileReference file) {
try (ParquetReaderIterator iterator = new ParquetReaderIterator(
ParquetRowReaderFactory.parquetRowReaderBuilder(
new Path(file.getFilename()), schema(context))
.withConf(SleeperLocalStackClients.HADOOP_CONF)
.build())) {
List<Row> rows = new ArrayList<>();
iterator.forEachRemaining(rows::add);
return rows;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private Schema schema(SystemTestContext context) {
return context.instance().getTableProperties().getSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import sleeper.systemtest.drivers.util.sqs.AwsDrainSqsQueue;
import sleeper.systemtest.dsl.SystemTestContext;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.AssumeAdminRoleDriver;
import sleeper.systemtest.dsl.instance.SleeperInstanceDriver;
import sleeper.systemtest.dsl.instance.SystemTestDeploymentDriver;
import sleeper.systemtest.dsl.instance.SystemTestParameters;
Expand All @@ -52,7 +53,6 @@ public static LocalStackSystemTestDrivers fromContainer() {
.sqs(SleeperLocalStackClients.SQS_CLIENT)
.dataFusionAwsConfig(() -> DataFusionAwsConfig.overrideEndpoint(SleeperLocalStackContainer.INSTANCE.getEndpoint().toString()))
.configureHadoopSetter(conf -> configureHadoop(conf, SleeperLocalStackContainer.INSTANCE))
.skipAssumeRole(true)
.build());
}

Expand Down Expand Up @@ -82,6 +82,11 @@ public PollWithRetriesDriver pollWithRetries() {
return PollWithRetriesDriver.noWaits();
}

@Override
public AssumeAdminRoleDriver assumeAdminRole() {
return properties -> this;
}

public SystemTestClients clients() {
return clients;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void updateTableProperties(Map<TableProperty, String> values) {
}

public SystemTestSourceFiles sourceFiles() {
return new SystemTestSourceFiles(context.instance(), context.sourceFiles(), baseDrivers.sourceFiles(context));
return new SystemTestSourceFiles(context, baseDrivers.sourceFiles(context));
}

public SystemTestTableFiles tableFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface CompactionDriver {

void forceCreateJobs();

void createJobBatches(List<CompactionJob> jobs);

void scaleToZero();

List<CompactionJob> drainJobsQueueForWholeInstance(int expectedJobs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import sleeper.systemtest.dsl.SystemTestDrivers;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.systemtest.dsl.sourcedata.IngestSourceFilesContext;
import sleeper.systemtest.dsl.util.DataFileDuplications;
import sleeper.systemtest.dsl.util.PollWithRetriesDriver;
import sleeper.systemtest.dsl.util.WaitForJobs;
import sleeper.systemtest.dsl.util.WaitForTasks;
Expand Down Expand Up @@ -115,6 +116,15 @@ public SystemTestCompaction splitFilesAndRunJobs(int expectedJobs) {
return this;
}

public SystemTestCompaction createSeparateCompactionsForOriginalAndDuplicates(DataFileDuplications duplications) {
CompactionJobFactory factory = new CompactionJobFactory(instance.getInstanceProperties(), instance.getTableProperties());
List<CompactionJob> jobs = duplications.createSeparateCompactionsForOriginalAndDuplicates(factory);
lastJobIds = waitForJobCreation.createJobsGetIds(jobs.size(),
pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(1), Duration.ofMinutes(1)),
() -> driver.createJobBatches(jobs));
return this;
}

public SystemTestCompaction waitForTasks(int expectedTasks) {
new WaitForTasks(driver.getJobTracker())
.waitUntilNumTasksStartedAJob(expectedTasks, lastJobIds,
Expand Down
Loading
Loading