Skip to content

Commit d9e19ef

Browse files
committed
Memory Connector: optimize split/logicalpart allocation and remove memory.splits-per-node usage
1 parent 25732a8 commit d9e19ef

File tree

19 files changed

+108
-185
lines changed

19 files changed

+108
-185
lines changed

hetu-hive-functions/src/test/java/io/hetu/core/hive/dynamicfunctions/TestDynamicHiveScalarFunction.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ public void setUpClass()
9494
metastoreConfig.put("hetu.metastore.hetufilesystem.path", folder.newFolder("metastore").getAbsolutePath());
9595
queryRunner.getCoordinator().loadMetastore(metastoreConfig);
9696
queryRunner.createCatalog("memory", "memory",
97-
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath(),
98-
"memory.splits-per-node", "1"));
97+
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath()));
9998
}
10099
catch (Exception e) {
101100
closeAllSuppress(e, queryRunner);

presto-geospatial/src/test/java/io/prestosql/plugin/geospatial/BenchmarkGeometryAggregations.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public void setUp()
8686
metastoreConfig.put("hetu.metastore.cache.type", "local");
8787
queryRunner.loadMetastore(metastoreConfig);
8888
queryRunner.createCatalog("memory", new MemoryConnectorFactory(),
89-
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath(),
90-
"memory.splits-per-node", "1"));
89+
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath()));
9190

9291
Path path = Paths.get(BenchmarkGeometryAggregations.class.getClassLoader().getResource("us-states.tsv").getPath());
9392
String polygonValues = Files.lines(path)

presto-geospatial/src/test/java/io/prestosql/plugin/geospatial/BenchmarkSpatialJoin.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ public void setUp()
9898
metastoreConfig.put("hetu.metastore.cache.type", "local");
9999
queryRunner.loadMetastore(metastoreConfig);
100100
queryRunner.createCatalog("memory", new MemoryConnectorFactory(),
101-
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath(),
102-
"memory.splits-per-node", "1"));
101+
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath()));
103102

104103
Path path = Paths.get(BenchmarkSpatialJoin.class.getClassLoader().getResource("us-states.tsv").getPath());
105104
String polygonValues = Files.lines(path)

presto-geospatial/src/test/java/io/prestosql/plugin/geospatial/TestSpatialJoinPlanning.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ private static LocalQueryRunner createQueryRunner()
9090
metastoreConfig.put("hetu.metastore.cache.type", "local");
9191
queryRunner.loadMetastore(metastoreConfig);
9292
queryRunner.createCatalog("memory", new MemoryConnectorFactory(),
93-
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath(),
94-
"memory.splits-per-node", "1"));
93+
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath()));
9594

9695
queryRunner.execute(format("CREATE TABLE kdb_tree AS SELECT '%s' AS v", KDB_TREE_JSON));
9796
queryRunner.execute("CREATE TABLE points (lng, lat, name) AS (VALUES (2.1e0, 2.1e0, 'x'))");

presto-jdbc/src/test/java/io/hetu/core/jdbc/TestHetuConnection.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ public void setupServer()
5454
server.installPlugin(new MemoryPlugin());
5555
server.loadMetastore(metastoreConfig);
5656
server.createCatalog("memory", "memory",
57-
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath(),
58-
"memory.splits-per-node", "1"));
57+
ImmutableMap.of("memory.spill-path", folder.newFolder("memory-connector").getAbsolutePath()));
5958

6059
try (Connection connection = createConnection();
6160
Statement statement = connection.createStatement()) {

presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryConfig.java

-16
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,13 @@
3737

3838
public class MemoryConfig
3939
{
40-
private int splitsPerNode = Math.max(Runtime.getRuntime().availableProcessors(), 1);
4140
private DataSize maxDataPerNode = new DataSize(256, DataSize.Unit.MEGABYTE);
4241
private DataSize maxLogicalPartSize = new DataSize(256, MEGABYTE);
4342
private DataSize maxPageSize = new DataSize(512, DataSize.Unit.KILOBYTE);
4443
private Duration processingDelay = new Duration(5, TimeUnit.SECONDS);
4544
private Path spillRoot;
4645
private int threadPoolSize = Math.max((Runtime.getRuntime().availableProcessors() / 2), 1);
4746

48-
public int getSplitsPerNode()
49-
{
50-
return splitsPerNode;
51-
}
52-
53-
@Mandatory(name = "memory.splits-per-node",
54-
description = "Number of splits to create per node. Default value is number of available processors on the coordinator." +
55-
" Value is ignored on the workers.")
56-
@Config("memory.splits-per-node")
57-
public MemoryConfig setSplitsPerNode(int splitsPerNode)
58-
{
59-
this.splitsPerNode = splitsPerNode;
60-
return this;
61-
}
62-
6347
@NotNull
6448
public Path getSpillRoot()
6549
{

presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryDataFragment.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@ public class MemoryDataFragment
3030

3131
private final HostAddress hostAddress;
3232
private final long rows;
33+
private final int logicalPartCount;
3334

3435
@JsonCreator
3536
public MemoryDataFragment(
3637
@JsonProperty("hostAddress") HostAddress hostAddress,
37-
@JsonProperty("rows") long rows)
38+
@JsonProperty("rows") long rows,
39+
@JsonProperty("logicalPartCount") int logicalPartCount)
3840
{
3941
this.hostAddress = requireNonNull(hostAddress, "hostAddress is null");
4042
checkArgument(rows >= 0, "Rows number can not be negative");
4143
this.rows = rows;
44+
this.logicalPartCount = logicalPartCount;
4245
}
4346

4447
@JsonProperty
@@ -53,6 +56,12 @@ public long getRows()
5356
return rows;
5457
}
5558

59+
@JsonProperty
60+
public int getLogicalPartCount()
61+
{
62+
return logicalPartCount;
63+
}
64+
5665
public Slice toSlice()
5766
{
5867
return Slices.wrappedBuffer(MEMORY_DATA_FRAGMENT_CODEC.toJsonBytes(this));
@@ -66,6 +75,6 @@ public static MemoryDataFragment fromSlice(Slice fragment)
6675
public static MemoryDataFragment merge(MemoryDataFragment a, MemoryDataFragment b)
6776
{
6877
checkArgument(a.getHostAddress().equals(b.getHostAddress()), "Can not merge fragments from different hosts");
69-
return new MemoryDataFragment(a.getHostAddress(), a.getRows() + b.getRows());
78+
return new MemoryDataFragment(a.getHostAddress(), a.getRows() + b.getRows(), Math.max(a.getLogicalPartCount(), b.getLogicalPartCount()));
7079
}
7180
}

presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryMetadata.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,7 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
283283
oldTableHandle.getActiveTableIds(),
284284
oldTableHandle.getColumns(),
285285
oldTableHandle.getSortedBy(),
286-
oldTableHandle.getIndexColumns(),
287-
oldTableHandle.getSplitsPerNode());
286+
oldTableHandle.getIndexColumns());
288287

289288
metastore.alterTable(MEM_KEY, oldInfo.getSchemaName(), oldInfo.getTableName(),
290289
TableEntity.builder()
@@ -398,8 +397,7 @@ public synchronized MemoryWriteTableHandle beginCreateTable(ConnectorSession ses
398397
getTableIdSet(nextId),
399398
columnHandles,
400399
sortedBy,
401-
indexColumns,
402-
config.getSplitsPerNode());
400+
indexColumns);
403401
}
404402

405403
private void checkSchemaExists(String schemaName, boolean expectExist)

presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryPageSinkProvider.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
7373
pagesStore.refreshTables(memoryOutputTableHandle.getActiveTableIds());
7474
pagesStore.initialize(tableId,
7575
memoryOutputTableHandle.isCompressionEnabled(),
76-
memoryOutputTableHandle.getSplitsPerNode(),
7776
memoryOutputTableHandle.getColumns(),
7877
memoryOutputTableHandle.getSortedBy(),
7978
memoryOutputTableHandle.getIndexColumns());
@@ -114,7 +113,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
114113
//
115114
pagesStore.initialize(tableId,
116115
memoryOutputTableHandle.isCompressionEnabled(),
117-
memoryOutputTableHandle.getSplitsPerNode(),
118116
memoryOutputTableHandle.getColumns(),
119117
memoryOutputTableHandle.getSortedBy(),
120118
memoryOutputTableHandle.getIndexColumns());
@@ -160,7 +158,8 @@ public CompletableFuture<?> appendPage(Page page)
160158
public CompletableFuture<Collection<Slice>> finish()
161159
{
162160
tablesManager.finishUpdatingTable(tableId);
163-
return completedFuture(ImmutableList.of(new MemoryDataFragment(currentHostAddress, addedRows).toSlice()));
161+
int lpCount = tablesManager.getTableLpCount(tableId);
162+
return completedFuture(ImmutableList.of(new MemoryDataFragment(currentHostAddress, addedRows, lpCount).toSlice()));
164163
}
165164

166165
@Override

presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryPageSourceProvider.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ public ConnectorPageSource createPageSource(
7474
{
7575
MemorySplit memorySplit = (MemorySplit) split;
7676
long tableId = memorySplit.getTable();
77-
int partNumber = memorySplit.getPartNumber();
78-
int totalParts = memorySplit.getTotalPartsPerWorker();
77+
int lpNum = memorySplit.getLogicalPartNumber();
7978
long expectedRows = memorySplit.getExpectedRows();
8079
MemoryTableHandle memoryTable = (MemoryTableHandle) table;
8180
OptionalDouble sampleRatio = memoryTable.getSampleRatio();
@@ -88,8 +87,7 @@ public ConnectorPageSource createPageSource(
8887
.map(MemoryColumnHandle::getColumnIndex).collect(toList());
8988
List<Page> pages = pagesStore.getPages(
9089
tableId,
91-
partNumber,
92-
totalParts,
90+
lpNum,
9391
columnIndexes,
9492
expectedRows,
9593
memorySplit.getLimit(),

presto-memory/src/main/java/io/prestosql/plugin/memory/MemorySplit.java

+9-22
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,23 @@ public class MemorySplit
3030
implements ConnectorSplit
3131
{
3232
private final long table;
33-
private final int totalPartsPerWorker; // how many concurrent reads there will be from one worker
34-
private final int partNumber; // part of the pages on one worker that this splits is responsible
33+
private final int lpNum; // the order of the logicalPart (equivalent to a split) in each node, starting from 0 (1 split, 0 is the available index)
3534
private final HostAddress address;
3635
private final long expectedRows;
3736
private final OptionalLong limit;
3837

3938
@JsonCreator
4039
public MemorySplit(
4140
@JsonProperty("table") long table,
42-
@JsonProperty("partNumber") int partNumber,
43-
@JsonProperty("totalPartsPerWorker") int totalPartsPerWorker,
41+
@JsonProperty("lpNum") int lpNum,
4442
@JsonProperty("address") HostAddress address,
4543
@JsonProperty("expectedRows") long expectedRows,
4644
@JsonProperty("limit") OptionalLong limit)
4745
{
48-
checkState(partNumber >= 0, "partNumber must be >= 0");
49-
checkState(totalPartsPerWorker >= 1, "totalPartsPerWorker must be >= 1");
50-
checkState(totalPartsPerWorker > partNumber, "totalPartsPerWorker must be > partNumber");
46+
checkState(lpNum >= 0, "lpCount must be >= 0");
5147

5248
this.table = requireNonNull(table, "table is null");
53-
this.partNumber = partNumber;
54-
this.totalPartsPerWorker = totalPartsPerWorker;
49+
this.lpNum = lpNum;
5550
this.address = requireNonNull(address, "address is null");
5651
this.expectedRows = expectedRows;
5752
this.limit = limit;
@@ -64,15 +59,9 @@ public long getTable()
6459
}
6560

6661
@JsonProperty
67-
public int getTotalPartsPerWorker()
62+
public int getLogicalPartNumber()
6863
{
69-
return totalPartsPerWorker;
70-
}
71-
72-
@JsonProperty
73-
public int getPartNumber()
74-
{
75-
return partNumber;
64+
return lpNum;
7665
}
7766

7867
@Override
@@ -122,23 +111,21 @@ public boolean equals(Object obj)
122111
}
123112
MemorySplit other = (MemorySplit) obj;
124113
return Objects.equals(this.table, other.table) &&
125-
Objects.equals(this.totalPartsPerWorker, other.totalPartsPerWorker) &&
126-
Objects.equals(this.partNumber, other.partNumber);
114+
Objects.equals(this.lpNum, other.lpNum);
127115
}
128116

129117
@Override
130118
public int hashCode()
131119
{
132-
return Objects.hash(table, totalPartsPerWorker, partNumber);
120+
return Objects.hash(table, lpNum);
133121
}
134122

135123
@Override
136124
public String toString()
137125
{
138126
return "MemorySplit{" +
139127
"table=" + table +
140-
", totalPartsPerWorker=" + totalPartsPerWorker +
141-
", partNumber=" + partNumber +
128+
", logicalPart number=" + lpNum +
142129
'}';
143130
}
144131
}

presto-memory/src/main/java/io/prestosql/plugin/memory/MemorySplitManager.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@
3030
public final class MemorySplitManager
3131
implements ConnectorSplitManager
3232
{
33-
private final int splitsPerNode;
3433
private final MemoryMetadata metadata;
3534

3635
@Inject
3736
public MemorySplitManager(MemoryConfig config, MemoryMetadata metadata)
3837
{
39-
this.splitsPerNode = config.getSplitsPerNode();
4038
this.metadata = metadata;
4139
}
4240

@@ -47,16 +45,15 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
4745

4846
List<MemoryDataFragment> dataFragments = metadata.getDataFragments(table.getId());
4947

50-
int totalRows = 0;
51-
48+
long totalRows = 0;
5249
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
5350

5451
for (MemoryDataFragment dataFragment : dataFragments) {
5552
long rows = dataFragment.getRows();
53+
int lpCount = dataFragment.getLogicalPartCount();
5654
totalRows += rows;
57-
58-
for (int i = 0; i < splitsPerNode; i++) {
59-
splits.add(new MemorySplit(table.getId(), i, splitsPerNode, dataFragment.getHostAddress(), rows, OptionalLong.empty()));
55+
for (int i = 0; i <= lpCount; i++) {
56+
splits.add(new MemorySplit(table.getId(), i, dataFragment.getHostAddress(), rows, OptionalLong.empty()));
6057
}
6158
}
6259
return new FixedSplitSource(splits.build());

presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryWriteTableHandle.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public final class MemoryWriteTableHandle
3838
private final List<String> indexColumns;
3939
private final String schemaName;
4040
private final String tableName;
41-
private final int splitsPerNode;
4241

4342
@JsonCreator
4443
public MemoryWriteTableHandle(
@@ -49,14 +48,12 @@ public MemoryWriteTableHandle(
4948
@JsonProperty("activeTableIds") Set<Long> activeTableIds,
5049
@JsonProperty("columns") List<MemoryColumnHandle> columns,
5150
@JsonProperty("sortedBy") List<SortingColumn> sortedBy,
52-
@JsonProperty("indexColumns") List<String> indexColumns,
53-
@JsonProperty("splitsPerNode") int splitsPerNode)
51+
@JsonProperty("indexColumns") List<String> indexColumns)
5452
{
5553
this.table = table;
5654
this.schemaName = schemaName;
5755
this.tableName = tableName;
5856
this.compressionEnabled = compressionEnabled;
59-
this.splitsPerNode = splitsPerNode;
6057
this.activeTableIds = requireNonNull(activeTableIds, "activeTableIds is null");
6158
this.columns = requireNonNull(columns, "columns is null");
6259
this.sortedBy = requireNonNull(sortedBy, "sortedBy is null");
@@ -66,7 +63,7 @@ public MemoryWriteTableHandle(
6663
@VisibleForTesting
6764
MemoryWriteTableHandle(long table, Set<Long> activeTableIds)
6865
{
69-
this(table, "", "", SPILL_COMPRESSION_DEFAULT_VALUE, activeTableIds, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 1);
66+
this(table, "", "", SPILL_COMPRESSION_DEFAULT_VALUE, activeTableIds, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
7067
}
7168

7269
@JsonProperty
@@ -117,12 +114,6 @@ public String getTableName()
117114
return tableName;
118115
}
119116

120-
@JsonProperty
121-
public int getSplitsPerNode()
122-
{
123-
return splitsPerNode;
124-
}
125-
126117
@Override
127118
public String toString()
128119
{

presto-memory/src/main/java/io/prestosql/plugin/memory/data/LogicalPart.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ enum LogicalPartState
8989
private final Set<Integer> indexChannels;
9090
private final long maxLogicalPartBytes;
9191
private final int maxPageSizeBytes;
92-
private final int splitNum;
93-
private final int logicalPartNum;
92+
private final int lpNum;
9493
private final boolean compressionEnabled;
9594

9695
// indexes
@@ -144,13 +143,11 @@ public LogicalPart(
144143
int maxPageSizeBytes,
145144
TypeManager typeManager,
146145
PagesSerde pagesSerde,
147-
int splitNum,
148-
int logicalPartNum,
146+
int lpNum,
149147
boolean compressionEnabled)
150148
{
151149
this.tableDataRoot = tableDataRoot;
152-
this.splitNum = splitNum;
153-
this.logicalPartNum = logicalPartNum;
150+
this.lpNum = lpNum;
154151
this.pages = new ArrayList<>();
155152
this.maxLogicalPartBytes = maxLogicalPartBytes;
156153
this.maxPageSizeBytes = maxPageSizeBytes;
@@ -701,7 +698,7 @@ void process()
701698

702699
private String getPageFileName()
703700
{
704-
return "split" + splitNum + "lp" + logicalPartNum;
701+
return "logicalPart" + lpNum;
705702
}
706703

707704
private synchronized void readPages()

0 commit comments

Comments
 (0)