Skip to content

Commit 667e58c

Browse files
authored
Making the IcebergWriter always use 1 thread for writing to the data file (#130)
Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
1 parent 28ce730 commit 667e58c

File tree

12 files changed

+335
-15
lines changed

12 files changed

+335
-15
lines changed

mantis-connectors/mantis-connector-iceberg/build.gradle

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,24 @@ apply plugin: 'mantis'
2020
ext {
2121
hadoopVersion = '2.7.3'
2222
icebergVersion = '0.9.+'
23-
junitVersion = '5.3.+'
23+
junitVersion = '5.4.+'
2424
mockitoVersion = '2.18.+'
25+
parquetVersion = '1.12.0'
2526
}
2627

2728
dependencies {
29+
configurations {
30+
// we need parquet dependency to be present in testing classpath.
31+
// hence we need to extend from shadow configuration
32+
testImplementation.extendsFrom shadow
33+
all {
34+
// we want a parquet version above 1.12.0 because we need this fix
35+
// https://issues.apache.org/jira/browse/PARQUET-1851
36+
resolutionStrategy {
37+
force "org.apache.parquet:parquet-hadoop:${parquetVersion}"
38+
}
39+
}
40+
}
2841
implementation project(":mantis-runtime")
2942

3043
// We only need the Configuration interface. Users can bring their own hadoop-common version.
@@ -41,6 +54,7 @@ dependencies {
4154
shadow "org.slf4j:slf4j-log4j12:$slf4jVersion"
4255

4356
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
57+
testImplementation "org.junit.jupiter:junit-jupiter-params:$junitVersion"
4458
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
4559
testImplementation "org.mockito:mockito-core:$mockitoVersion"
4660

mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/config/SinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import io.mantisrx.connector.iceberg.sink.committer.config.CommitterConfig;
2525
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
2626
import io.mantisrx.runtime.parameter.Parameters;
27+
import lombok.RequiredArgsConstructor;
2728

2829
/**
2930
* Convenient base config used by {@link WriterConfig} and {@link CommitterConfig}.
3031
*/
32+
@RequiredArgsConstructor
3133
public class SinkConfig {
3234

3335
private final String catalog;

mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.mantisrx.connector.iceberg.sink.writer;
1818

19+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
1920
import io.mantisrx.connector.iceberg.sink.codecs.IcebergCodecs;
2021
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
2122
import io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties;
@@ -34,6 +35,8 @@
3435
import io.mantisrx.runtime.parameter.type.IntParameter;
3536
import io.mantisrx.runtime.parameter.type.StringParameter;
3637
import io.mantisrx.runtime.parameter.validator.Validators;
38+
import io.mantisrx.runtime.scheduler.MantisRxSingleThreadScheduler;
39+
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
3740
import java.io.IOException;
3841
import java.util.ArrayList;
3942
import java.util.Arrays;
@@ -75,6 +78,7 @@ public static ScalarToScalar.Config<Record, DataFile> config() {
7578
return new ScalarToScalar.Config<Record, DataFile>()
7679
.description("")
7780
.codec(IcebergCodecs.dataFile())
81+
.serialInput()
7882
.withParameters(parameters());
7983
}
8084

@@ -119,15 +123,27 @@ public static Transformer newTransformer(Context context) {
119123

120124
LocationProvider locationProvider = context.getServiceLocator().service(LocationProvider.class);
121125
IcebergWriterFactory factory = new DefaultIcebergWriterFactory(config, workerInfo, table, locationProvider);
122-
IcebergWriterPool writerPool = new FixedIcebergWriterPool(
123-
factory,
124-
config.getWriterFlushFrequencyBytes(),
125-
config.getWriterMaximumPoolSize());
126+
IcebergWriterPool writerPool = new FixedIcebergWriterPool(factory, config);
126127
WriterMetrics metrics = new WriterMetrics();
127128
PartitionerFactory partitionerFactory = context.getServiceLocator().service(PartitionerFactory.class);
128129
Partitioner partitioner = partitionerFactory.getPartitioner(table);
129130

130-
return new Transformer(config, metrics, writerPool, partitioner, Schedulers.computation(), Schedulers.io());
131+
return newTransformer(config, metrics, writerPool, partitioner, context.getWorkerInfo());
132+
}
133+
134+
@VisibleForTesting
135+
static Transformer newTransformer(
136+
WriterConfig writerConfig,
137+
WriterMetrics writerMetrics,
138+
IcebergWriterPool writerPool,
139+
Partitioner partitioner,
140+
WorkerInfo workerInfo) {
141+
int workerIdx = workerInfo.getWorkerIndex();
142+
String nameFormat = "IcebergWriter (" + (workerIdx + 1) + ")-%d";
143+
Scheduler executingService = new MantisRxSingleThreadScheduler(
144+
new ThreadFactoryBuilder().setNameFormat(nameFormat).build());
145+
return new Transformer(writerConfig, writerMetrics, writerPool, partitioner,
146+
Schedulers.computation(), executingService);
131147
}
132148

133149
public IcebergWriterStage() {

mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/config/WriterConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@ public WriterConfig(Parameters parameters, Configuration hadoopConfig) {
5252
this.hadoopConfig = hadoopConfig;
5353
}
5454

55+
public WriterConfig(String catalog, String database, String table, int writerRowGroupSize,
56+
long writerFlushFrequencyBytes, long writerFlushFrequencyMsec,
57+
String writerFileFormat, int writerMaximumPoolSize,
58+
Configuration hadoopConfig) {
59+
super(catalog, database, table);
60+
this.writerRowGroupSize = writerRowGroupSize;
61+
this.writerFlushFrequencyBytes = writerFlushFrequencyBytes;
62+
this.writerFlushFrequencyMsec = writerFlushFrequencyMsec;
63+
this.writerFileFormat = writerFileFormat;
64+
this.writerMaximumPoolSize = writerMaximumPoolSize;
65+
this.hadoopConfig = hadoopConfig;
66+
}
67+
5568
/**
5669
* Returns an int representing maximum number of rows that should exist in a file.
5770
*/

mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/pool/FixedIcebergWriterPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ public class FixedIcebergWriterPool implements IcebergWriterPool {
4343
private final long flushFrequencyBytes;
4444
private final int maximumPoolSize;
4545

46+
public FixedIcebergWriterPool(IcebergWriterFactory factory, WriterConfig writerConfig) {
47+
this(factory, writerConfig.getWriterFlushFrequencyBytes(), writerConfig.getWriterMaximumPoolSize());
48+
}
49+
4650
public FixedIcebergWriterPool(IcebergWriterFactory factory, long flushFrequencyBytes, int maximumPoolSize) {
4751
this.factory = factory;
4852
this.flushFrequencyBytes = flushFrequencyBytes;

mantis-connectors/mantis-connector-iceberg/src/main/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ log4j.rootLogger=INFO, stdout
1919
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
2020
log4j.appender.stdout.Target=System.out
2121
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
22-
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
22+
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2021 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.connector.iceberg.sink.writer;
18+
19+
import com.google.common.io.Files;
20+
import java.io.File;
21+
import lombok.Builder;
22+
import lombok.Getter;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.io.FileUtils;
25+
import org.apache.iceberg.PartitionSpec;
26+
import org.apache.iceberg.Schema;
27+
import org.apache.iceberg.Table;
28+
import org.apache.iceberg.catalog.TableIdentifier;
29+
import org.apache.iceberg.hadoop.HadoopTables;
30+
import org.apache.iceberg.io.LocationProvider;
31+
import org.junit.jupiter.api.extension.AfterEachCallback;
32+
import org.junit.jupiter.api.extension.BeforeAllCallback;
33+
import org.junit.jupiter.api.extension.BeforeEachCallback;
34+
import org.junit.jupiter.api.extension.ExtensionContext;
35+
36+
/**
37+
* Junit Jupiter Extension to create Iceberg Tables for unit testing with a specified schema,
38+
* properties, etc.... and to also clean the files after unit tests.
39+
* <p>
40+
* The way to use the IcebergTableExtension is by adding the following code to your test class. This
41+
* creates the table before the test is executed.
42+
* <pre>
43+
* @RegisterExtension
44+
* static IcebergTableExtension tableExtension =
45+
* IcebergTableExtension.builder()
46+
* .schema(SCHEMA)
47+
* .spec(SPEC)
48+
* .build();
49+
* </pre>
50+
*
51+
* <p> The created table can be obtained by the {@link IcebergTableExtension#getTable()} method.
52+
*/
53+
@Slf4j
54+
@Builder
55+
public class IcebergTableExtension implements BeforeAllCallback, BeforeEachCallback,
56+
AfterEachCallback {
57+
58+
private File rootDir;
59+
60+
@Getter
61+
@Builder.Default
62+
private String catalog = "catalog";
63+
64+
@Getter
65+
@Builder.Default
66+
private String database = "database";
67+
68+
@Getter
69+
@Builder.Default
70+
private String tableName = "table";
71+
72+
@Getter
73+
private Schema schema;
74+
private PartitionSpec spec;
75+
76+
@Getter
77+
private Table table;
78+
79+
@Override
80+
public void beforeAll(ExtensionContext context) throws Exception {
81+
log.info("Before All");
82+
}
83+
84+
@Override
85+
public void beforeEach(ExtensionContext context) throws Exception {
86+
log.info("Before Each");
87+
if (rootDir == null) {
88+
rootDir = Files.createTempDir();
89+
}
90+
91+
final File tableDir = new File(rootDir, getTableIdentifier().toString());
92+
final HadoopTables tables = new HadoopTables();
93+
table = tables.create(schema, spec, tableDir.getPath());
94+
}
95+
96+
@Override
97+
public void afterEach(ExtensionContext context) throws Exception {
98+
FileUtils.deleteDirectory(rootDir);
99+
rootDir = null;
100+
}
101+
102+
public LocationProvider getLocationProvider() {
103+
return table.locationProvider();
104+
}
105+
106+
public TableIdentifier getTableIdentifier() {
107+
return TableIdentifier.of(catalog, database, tableName);
108+
}
109+
}

0 commit comments

Comments
 (0)