Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cleanup](load) remove unused load properties "use_new_load_scan_node" #43989

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser {
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
public static final String PRIORITY = "priority";
public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
// temp property, just make regression test happy.
// should remove when Config.enable_new_load_scan_node is set to true by default.
public static final String USE_NEW_LOAD_SCAN_NODE = "use_new_load_scan_node";

// for load data from Baidu Object Store(BOS)
public static final String BOS_ENDPOINT = "bos_endpoint";
Expand Down Expand Up @@ -224,12 +221,6 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser {
return Boolean.valueOf(s);
}
})
.put(USE_NEW_LOAD_SCAN_NODE, new Function<String, Boolean>() {
@Override
public @Nullable Boolean apply(@Nullable String s) {
return Boolean.valueOf(s);
}
})
.put(KEY_SKIP_LINES, new Function<String, Integer>() {
@Override
public @Nullable Integer apply(@Nullable String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(),
useNewLoadScanNode(), getPriority(), isEnableMemtableOnSinkNode, batchSize, cloudClusterId);
getPriority(), isEnableMemtableOnSinkNode, batchSize, cloudClusterId);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public CloudLoadLoadingTask(Database db, OlapTable table,
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, int loadParallelism, int sendBatchParallelism,
boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink,
boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode, int batchSize,
Priority priority, boolean enableMemTableOnSinkNode, int batchSize,
String clusterId) {
super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate,
txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, loadZeroTolerance,
jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, priority, enableMemTableOnSinkNode, batchSize);
jobProfile, singleTabletLoadPerSink, priority, enableMemTableOnSinkNode, batchSize);
this.cloudClusterId = clusterId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(),
useNewLoadScanNode(), getPriority(), isEnableMemtableOnSinkNode, batchSize);
getPriority(), isEnableMemtableOnSinkNode, batchSize);

UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,10 +1228,6 @@ public boolean isSingleTabletLoadPerSink() {
return (boolean) jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET);
}

public boolean useNewLoadScanNode() {
return (boolean) jobProperties.getOrDefault(LoadStmt.USE_NEW_LOAD_SCAN_NODE, false);
}

// Return true if this job is finished for a long time
public boolean isExpired(long currentTimeMs) {
if (!isCompleted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class LoadLoadingTask extends LoadTask {
private final int sendBatchParallelism;
private final boolean loadZeroTolerance;
private final boolean singleTabletLoadPerSink;
private final boolean useNewLoadScanNode;

private final boolean enableMemTableOnSinkNode;
private final int batchSize;
Expand All @@ -92,7 +91,7 @@ public LoadLoadingTask(Database db, OlapTable table,
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, int loadParallelism, int sendBatchParallelism,
boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink,
boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode, int batchSize) {
Priority priority, boolean enableMemTableOnSinkNode, int batchSize) {
super(callback, TaskType.LOADING, priority);
this.db = db;
this.table = table;
Expand All @@ -112,7 +111,6 @@ public LoadLoadingTask(Database db, OlapTable table,
this.loadZeroTolerance = loadZeroTolerance;
this.jobProfile = jobProfile;
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.useNewLoadScanNode = useNewLoadScanNode;
this.enableMemTableOnSinkNode = enableMemTableOnSinkNode;
this.batchSize = batchSize;
}
Expand All @@ -122,7 +120,7 @@ public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList,
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups,
strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism,
this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode);
userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode);
planner.plan(loadId, fileStatusList, fileNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public class LoadingTaskPlanner {
private final long timeoutS; // timeout of load job, in second
private final int loadParallelism;
private final int sendBatchParallelism;
private final boolean useNewLoadScanNode;
private final boolean singleTabletLoadPerSink;
private final boolean enableMemtableOnSinkNode;
private UserIdentity userInfo;
Expand All @@ -89,7 +88,7 @@ public class LoadingTaskPlanner {
public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism,
int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo,
int sendBatchParallelism, UserIdentity userInfo,
boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) {
this.loadJobId = loadJobId;
this.txnId = txnId;
Expand All @@ -103,7 +102,6 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
this.timeoutS = timeoutS;
this.loadParallelism = loadParallelism;
this.sendBatchParallelism = sendBatchParallelism;
this.useNewLoadScanNode = useNewLoadScanNode;
this.userInfo = userInfo;
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ suite("test_segcompaction_correctness", "nonConcurrent,p2") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ suite("test_too_many_segments", "nonConcurrent,p2") { // the epic -238 case
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

Thread.sleep(2000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class LoadAttributes {
this.isExceptFailed = isExceptFailed

properties = new HashMap<>()
properties.put("use_new_load_scan_node", "true")
}

LoadAttributes addProperties(String k, String v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ suite("test_etl_failed", "load_p0") {
"AWS_REGION" = "${s3Region}",
"provider" = "${getS3Provider()}"
)
PROPERTIES(
"use_new_load_scan_node" = "true",
"max_filter_ratio" = "0.1"
);
"""

def max_try_milli_secs = 600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ suite("test_multi_table_load", "load_p0") {
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true",
"max_filter_ratio" = "1.0"
)
"""
Expand Down Expand Up @@ -186,7 +185,6 @@ suite("test_multi_table_load", "load_p0") {
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true",
"max_filter_ratio" = "1.0"
)
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ suite("test_seq_load", "load_p0") {
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""
logger.info("submit sql: ${sql_str}");
sql """${sql_str}"""
Expand All @@ -132,4 +129,4 @@ suite("test_seq_load", "load_p0") {
}

qt_sql """ SELECT COUNT(*) FROM ${tableName} """
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,6 @@ suite("test_broker_load_p2", "p2") {
"AWS_REGION" = "${s3Region}",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""
logger.info("submit sql: ${sql_str}");
sql """${sql_str}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,6 @@ class LoadAttributes {
this.isExceptFailed = isExceptFailed

properties = new HashMap<>()
properties.put("use_new_load_scan_node", "true")
}

LoadAttributes addProperties(String k, String v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ class LoadAttributes {
this.isExceptFailed = isExceptFailed

properties = new HashMap<>()
properties.put("use_new_load_scan_node", "true")
}

LoadAttributes addProperties(String k, String v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ class LoadAttributes {
this.isExceptFailed = isExceptFailed

properties = new HashMap<>()
properties.put("use_new_load_scan_node", "true")
}

LoadAttributes addProperties(String k, String v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ suite("test_segcompaction_agg_keys") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ suite("test_segcompaction_agg_keys_index") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ suite("test_segcompaction_dup_keys") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ suite("test_segcompaction_dup_keys_index") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ suite("test_segcompaction_unique_keys") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ suite("test_segcompaction_unique_keys_mow") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ suite("test_segcompaction_unique_keys_mow_index") {
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
properties(
"use_new_load_scan_node" = "true"
)
"""

def max_try_milli_secs = 3600000
Expand Down
Loading