Skip to content

Commit 55f21ed

Browse files
stephen-shelbymergify[bot]
authored andcommitted
[Enhancement] support specify the warehouse to execute statistics collect job (#57279)
Signed-off-by: stephen <[email protected]> (cherry picked from commit e4217b1) # Conflicts: # fe/fe-core/src/main/java/com/starrocks/common/Config.java # fe/fe-core/src/main/java/com/starrocks/statistic/hyper/HyperQueryJob.java
1 parent d2b7924 commit 55f21ed

File tree

4 files changed

+344
-0
lines changed

4 files changed

+344
-0
lines changed

fe/fe-core/src/main/java/com/starrocks/common/Config.java

+9
Original file line numberDiff line numberDiff line change
@@ -2832,6 +2832,15 @@ public class Config extends ConfigBase {
28322832
@ConfField(mutable = true)
28332833
public static String lake_background_warehouse = "default_warehouse";
28342834

2835+
<<<<<<< HEAD
2836+
=======
2837+
@ConfField(mutable = true)
2838+
public static String statistics_collect_warehouse = "default_warehouse";
2839+
2840+
@ConfField(mutable = true)
2841+
public static int lake_warehouse_max_compute_replica = 3;
2842+
2843+
>>>>>>> e4217b1e98 ([Enhancement] support specify the warehouse to execute statistics collect job (#57279))
28352844
@ConfField(mutable = true, comment = "time interval to check whether warehouse is idle")
28362845
public static long warehouse_idle_check_interval_seconds = 60;
28372846

fe/fe-core/src/main/java/com/starrocks/statistic/StatisticExecutor.java

+1
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ public AnalyzeStatus collectStatistics(ConnectContext statsConnectCtx,
364364
GlobalStateMgr.getCurrentState().getAnalyzeMgr().replayAddAnalyzeStatus(analyzeStatus);
365365

366366
statsConnectCtx.setStatisticsConnection(true);
367+
statsConnectCtx.getSessionVariable().setWarehouseName(Config.statistics_collect_warehouse);
367368
statsJob.collect(statsConnectCtx, analyzeStatus);
368369
LOG.info("execute statistics job successfully, duration={}, job={}", watch.toString(), statsJob);
369370
} catch (Exception e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.starrocks.statistic.hyper;
16+
17+
import com.google.common.collect.Lists;
18+
import com.starrocks.analysis.Expr;
19+
import com.starrocks.analysis.FunctionCallExpr;
20+
import com.starrocks.analysis.IntLiteral;
21+
import com.starrocks.analysis.StringLiteral;
22+
import com.starrocks.catalog.Column;
23+
import com.starrocks.catalog.Database;
24+
import com.starrocks.catalog.Function;
25+
import com.starrocks.catalog.FunctionSet;
26+
import com.starrocks.catalog.OlapTable;
27+
import com.starrocks.catalog.Partition;
28+
import com.starrocks.catalog.Table;
29+
import com.starrocks.catalog.Type;
30+
import com.starrocks.qe.ConnectContext;
31+
import com.starrocks.qe.SessionVariable;
32+
import com.starrocks.statistic.StatisticExecutor;
33+
import com.starrocks.statistic.StatsConstants;
34+
import com.starrocks.statistic.base.ColumnClassifier;
35+
import com.starrocks.statistic.base.ColumnStats;
36+
import com.starrocks.statistic.base.DefaultColumnStats;
37+
import com.starrocks.statistic.base.MultiColumnStats;
38+
import com.starrocks.statistic.base.PartitionSampler;
39+
import com.starrocks.statistic.sample.TabletSampleManager;
40+
import com.starrocks.thrift.TStatisticData;
41+
import org.apache.commons.lang.StringEscapeUtils;
42+
import org.apache.logging.log4j.LogManager;
43+
import org.apache.logging.log4j.Logger;
44+
45+
import java.nio.charset.StandardCharsets;
46+
import java.util.Collections;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.stream.Collectors;
50+
51+
import static com.starrocks.sql.optimizer.statistics.ColumnStatistic.DEFAULT_COLLECTION_SIZE;
52+
53+
public abstract class HyperQueryJob {
54+
private static final Logger LOG = LogManager.getLogger(HyperQueryJob.class);
55+
56+
protected final ConnectContext context;
57+
protected final Database db;
58+
protected final Table table;
59+
protected final List<ColumnStats> columnStats;
60+
protected final List<Long> partitionIdList;
61+
62+
// pipeline dop
63+
protected int pipelineDop;
64+
// result buffer
65+
protected List<String> sqlBuffer = Lists.newArrayList();
66+
protected List<List<Expr>> rowsBuffer = Lists.newArrayList();
67+
68+
protected int failures = 0;
69+
protected int totals = 0;
70+
protected Throwable lastFailure;
71+
72+
protected HyperQueryJob(ConnectContext context, Database db, Table table, List<ColumnStats> columnStats,
73+
List<Long> partitionIdList) {
74+
this.context = context;
75+
this.db = db;
76+
this.table = table;
77+
this.columnStats = columnStats;
78+
this.partitionIdList = partitionIdList;
79+
this.pipelineDop = context.getSessionVariable().getStatisticCollectParallelism();
80+
}
81+
82+
public void queryStatistics() {
83+
String tableName = StringEscapeUtils.escapeSql(db.getOriginName() + "." + table.getName());
84+
List<String> sqlList = buildQuerySQL();
85+
for (String sql : sqlList) {
86+
// execute sql
87+
List<TStatisticData> dataList = executeStatisticsQuery(sql, context);
88+
89+
for (TStatisticData data : dataList) {
90+
Partition partition = table.getPartition(data.getPartitionId());
91+
if (partition == null) {
92+
continue;
93+
}
94+
String partitionName = StringEscapeUtils.escapeSql(partition.getName());
95+
sqlBuffer.add(createInsertValueSQL(data, tableName, partitionName));
96+
rowsBuffer.add(createInsertValueExpr(data, tableName, partitionName));
97+
}
98+
}
99+
}
100+
101+
protected List<String> buildQuerySQL() {
102+
return Collections.emptyList();
103+
}
104+
105+
public List<List<Expr>> getStatisticsData() {
106+
List<List<Expr>> r = rowsBuffer;
107+
rowsBuffer = Lists.newArrayList();
108+
return r;
109+
}
110+
111+
public List<String> getStatisticsValueSQL() {
112+
List<String> s = sqlBuffer;
113+
sqlBuffer = Lists.newArrayList();
114+
return s;
115+
}
116+
117+
public int getFailures() {
118+
return failures;
119+
}
120+
121+
public int getTotals() {
122+
return totals;
123+
}
124+
125+
public Throwable getLastFailure() {
126+
return lastFailure;
127+
}
128+
129+
protected List<TStatisticData> executeStatisticsQuery(String sql, ConnectContext context) {
130+
try {
131+
totals++;
132+
LOG.debug("statistics collect sql : " + sql);
133+
StatisticExecutor executor = new StatisticExecutor();
134+
// set default session variables for stats context
135+
setDefaultSessionVariable(context);
136+
return executor.executeStatisticDQL(context, sql);
137+
} catch (Exception e) {
138+
failures++;
139+
String message = "execute statistics query failed, sql: " + sql + ", error: " + e.getMessage();
140+
LOG.error(message, e);
141+
lastFailure = new RuntimeException(message, e);
142+
return Collections.emptyList();
143+
} finally {
144+
context.setStartTime();
145+
}
146+
}
147+
148+
protected String createInsertValueSQL(TStatisticData data, String tableName, String partitionName) {
149+
List<String> params = Lists.newArrayList();
150+
151+
params.add(String.valueOf(table.getId()));
152+
params.add(String.valueOf(data.getPartitionId()));
153+
params.add("'" + StringEscapeUtils.escapeSql(data.getColumnName()) + "'");
154+
params.add(String.valueOf(db.getId()));
155+
params.add("'" + tableName + "'");
156+
params.add("'" + partitionName + "'");
157+
params.add(String.valueOf(data.getRowCount()));
158+
params.add(String.valueOf(data.getDataSize()));
159+
params.add("hll_deserialize(unhex('mockData'))");
160+
params.add(String.valueOf(data.getNullCount()));
161+
params.add("'" + data.getMax() + "'");
162+
params.add("'" + data.getMin() + "'");
163+
params.add("now()");
164+
params.add(String.valueOf(data.getCollectionSize() <= 0 ? DEFAULT_COLLECTION_SIZE : data.getCollectionSize()));
165+
return "(" + String.join(", ", params) + ")";
166+
}
167+
168+
protected List<Expr> createInsertValueExpr(TStatisticData data, String tableName, String partitionName) {
169+
List<Expr> row = Lists.newArrayList();
170+
row.add(new IntLiteral(table.getId(), Type.BIGINT)); // table id, 8 byte
171+
row.add(new IntLiteral(data.getPartitionId(), Type.BIGINT)); // partition id, 8 byte
172+
row.add(new StringLiteral(data.getColumnName())); // column name, 20 byte
173+
row.add(new IntLiteral(db.getId(), Type.BIGINT)); // db id, 8 byte
174+
row.add(new StringLiteral(tableName)); // table name, 50 byte
175+
row.add(new StringLiteral(partitionName)); // partition name, 10 byte
176+
row.add(new IntLiteral(data.getRowCount(), Type.BIGINT)); // row count, 8 byte
177+
row.add(new IntLiteral((long) data.getDataSize(), Type.BIGINT)); // data size, 8 byte
178+
row.add(hllDeserialize(data.getHll())); // hll, 32 kB mock it now
179+
row.add(new IntLiteral(data.getNullCount(), Type.BIGINT)); // null count, 8 byte
180+
row.add(new StringLiteral(data.getMax())); // max, 200 byte
181+
row.add(new StringLiteral(data.getMin())); // min, 200 byte
182+
row.add(nowFn()); // update time, 8 byte
183+
row.add(new IntLiteral(data.getCollectionSize() <= 0 ? -1 : data.getCollectionSize(), Type.BIGINT)); // collection size 8 byte
184+
return row;
185+
}
186+
187+
public static Expr hllDeserialize(byte[] hll) {
188+
String str = new String(hll, StandardCharsets.UTF_8);
189+
Function unhex = Expr.getBuiltinFunction("unhex", new Type[] {Type.VARCHAR},
190+
Function.CompareMode.IS_IDENTICAL);
191+
192+
FunctionCallExpr unhexExpr = new FunctionCallExpr("unhex", Lists.newArrayList(new StringLiteral(str)));
193+
unhexExpr.setFn(unhex);
194+
unhexExpr.setType(unhex.getReturnType());
195+
196+
Function fn = Expr.getBuiltinFunction("hll_deserialize", new Type[] {Type.VARCHAR},
197+
Function.CompareMode.IS_IDENTICAL);
198+
FunctionCallExpr fe = new FunctionCallExpr("hll_deserialize", Lists.newArrayList(unhexExpr));
199+
fe.setFn(fn);
200+
fe.setType(fn.getReturnType());
201+
return fe;
202+
}
203+
204+
public static Expr nowFn() {
205+
Function fn = Expr.getBuiltinFunction(FunctionSet.NOW, new Type[] {}, Function.CompareMode.IS_IDENTICAL);
206+
FunctionCallExpr fe = new FunctionCallExpr("now", Lists.newArrayList());
207+
fe.setType(fn.getReturnType());
208+
return fe;
209+
}
210+
211+
protected void setDefaultSessionVariable(ConnectContext context) {
212+
SessionVariable sessionVariable = context.getSessionVariable();
213+
// Statistics collecting is not user-specific, which means response latency is not that important.
214+
// Normally, if the page cache is enabled, the page cache must be full. Page cache is used for query
215+
// acceleration, then page cache is better filled with the user's data.
216+
sessionVariable.setUsePageCache(false);
217+
sessionVariable.setEnableMaterializedViewRewrite(false);
218+
// set the max task num of connector io tasks per scan operator to 4, default is 16,
219+
// to avoid generate too many chunk source for collect stats in BE
220+
sessionVariable.setConnectorIoTasksPerScanOperator(4);
221+
222+
if (table.isTemporaryTable()) {
223+
context.setSessionId(((OlapTable) table).getSessionId());
224+
}
225+
sessionVariable.setEnableAnalyzePhasePruneColumns(true);
226+
sessionVariable.setPipelineDop(pipelineDop);
227+
}
228+
229+
@Override
230+
public String toString() {
231+
return this.getClass().getSimpleName() +
232+
"{table: " + db + "." + table + ", cols: [" +
233+
columnStats.stream().map(ColumnStats::getColumnNameStr).collect(Collectors.joining(", ")) +
234+
"], pids: " + partitionIdList + '}';
235+
}
236+
237+
public static List<HyperQueryJob> createFullQueryJobs(ConnectContext context, Database db, Table table,
238+
List<String> columnNames, List<Type> columnTypes,
239+
List<Long> partitionIdList, int batchLimit) {
240+
ColumnClassifier classifier = ColumnClassifier.of(columnNames, columnTypes, table);
241+
242+
List<ColumnStats> supportedStats = classifier.getColumnStats();
243+
List<ColumnStats> dataCollectColumns =
244+
supportedStats.stream().filter(ColumnStats::supportData).collect(Collectors.toList());
245+
List<ColumnStats> unSupportedStats = classifier.getUnSupportCollectColumns();
246+
247+
List<List<Long>> pids = Lists.partition(partitionIdList, batchLimit);
248+
List<HyperQueryJob> jobs = Lists.newArrayList();
249+
for (List<Long> pid : pids) {
250+
if (!dataCollectColumns.isEmpty()) {
251+
jobs.add(new FullQueryJob(context, db, table, dataCollectColumns, pid));
252+
}
253+
if (!unSupportedStats.isEmpty()) {
254+
jobs.add(new ConstQueryJob(context, db, table, unSupportedStats, pid));
255+
}
256+
}
257+
return jobs;
258+
}
259+
260+
public static List<HyperQueryJob> createSampleQueryJobs(ConnectContext context, Database db, Table table,
261+
List<String> columnNames, List<Type> columnTypes,
262+
List<Long> partitionIdList, int batchLimit,
263+
PartitionSampler sampler) {
264+
ColumnClassifier classifier = ColumnClassifier.of(columnNames, columnTypes, table);
265+
List<ColumnStats> supportedStats = classifier.getColumnStats();
266+
267+
List<ColumnStats> metaCollectColumns =
268+
supportedStats.stream().filter(ColumnStats::supportMeta).collect(Collectors.toList());
269+
List<ColumnStats> dataCollectColumns =
270+
supportedStats.stream().filter(c -> !c.supportMeta() && c.supportData()).collect(Collectors.toList());
271+
List<ColumnStats> unSupportedStats = classifier.getUnSupportCollectColumns();
272+
273+
List<List<Long>> pids = Lists.partition(partitionIdList, batchLimit);
274+
List<HyperQueryJob> jobs = Lists.newArrayList();
275+
for (List<Long> pid : pids) {
276+
if (!metaCollectColumns.isEmpty()) {
277+
jobs.add(new MetaQueryJob(context, db, table, metaCollectColumns, pid, sampler));
278+
}
279+
if (!dataCollectColumns.isEmpty()) {
280+
jobs.add(new SampleQueryJob(context, db, table, dataCollectColumns, pid, sampler));
281+
}
282+
if (!unSupportedStats.isEmpty()) {
283+
jobs.add(new ConstQueryJob(context, db, table, unSupportedStats, pid));
284+
}
285+
}
286+
return jobs;
287+
}
288+
289+
public static List<HyperQueryJob> createMultiColumnQueryJobs(ConnectContext context, Database db, Table table,
290+
List<List<String>> columnGroups,
291+
StatsConstants.AnalyzeType analyzeType,
292+
List<StatsConstants.StatisticsType> statisticsTypes,
293+
Map<String, String> properties) {
294+
List<ColumnStats> columnStats = columnGroups.stream()
295+
.map(group -> group.stream()
296+
.map(columnName -> {
297+
Column column = table.getColumn(columnName);
298+
return new DefaultColumnStats(column.getName(), column.getType(), column.getUniqueId());
299+
})
300+
.collect(Collectors.toList())
301+
)
302+
.map(defaultColumnStats -> new MultiColumnStats(defaultColumnStats, statisticsTypes))
303+
.collect(Collectors.toList());
304+
305+
if (analyzeType == StatsConstants.AnalyzeType.FULL) {
306+
return List.of(new FullMultiColumnQueryJob(context, db, table, columnStats));
307+
} else {
308+
TabletSampleManager tabletSampleManager = TabletSampleManager.init(properties, table);
309+
return List.of(new SampleMultiColumnQueryJob(context, db, table, columnStats, tabletSampleManager));
310+
}
311+
}
312+
}

fe/fe-core/src/test/java/com/starrocks/statistic/StatisticsExecutorTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import com.starrocks.catalog.Table;
2525
import com.starrocks.catalog.Type;
2626
import com.starrocks.common.AnalysisException;
27+
import com.starrocks.common.Config;
2728
import com.starrocks.common.DdlException;
29+
import com.starrocks.common.FeConstants;
2830
import com.starrocks.common.jmockit.Deencapsulation;
2931
import com.starrocks.qe.ConnectContext;
3032
import com.starrocks.qe.QueryState;
@@ -296,4 +298,24 @@ public void testSessionVariableInStats() {
296298
ConnectContext statsContext = StatisticUtils.buildConnectContext();
297299
Assert.assertEquals(1, statsContext.getSessionVariable().getParallelExecInstanceNum());
298300
}
301+
302+
@Test
303+
public void testSpecifyStatisticsCollectWarehouse() {
304+
String sql = "analyze table test.t0_stats";
305+
Config.statistics_collect_warehouse = "xxx";
306+
FeConstants.enableUnitStatistics = false;
307+
AnalyzeStmt stmt = (AnalyzeStmt) analyzeSuccess(sql);
308+
StmtExecutor executor = new StmtExecutor(connectContext, stmt);
309+
AnalyzeStatus analyzeStatus = new NativeAnalyzeStatus(1, 2, 3, Lists.newArrayList(),
310+
StatsConstants.AnalyzeType.FULL, StatsConstants.ScheduleType.SCHEDULE, Maps.newHashMap(), LocalDateTime.MIN);
311+
312+
Database db = connectContext.getGlobalStateMgr().getMetadataMgr().getDb(connectContext, "default_catalog", "test");
313+
Table table =
314+
connectContext.getGlobalStateMgr().getLocalMetastore().getTable(connectContext, "test", "t0_stats");
315+
316+
Deencapsulation.invoke(executor, "executeAnalyze", connectContext, stmt, analyzeStatus, db, table);
317+
Assert.assertTrue(analyzeStatus.getReason().contains("Warehouse xxx not exist"));
318+
Config.statistics_collect_warehouse = "default_warehouse";
319+
FeConstants.enableUnitStatistics = true;
320+
}
299321
}

0 commit comments

Comments
 (0)