Skip to content

Commit a6191d9

Browse files
authored
Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. (#16732)
* Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. * update
1 parent db7c801 commit a6191d9

File tree

4 files changed

+54
-65
lines changed

4 files changed

+54
-65
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -539,14 +539,11 @@ private void transferQueuedEventsIfNecessary(final boolean forced) {
539539
if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty())
540540
|| (!forced
541541
&& retryEventQueueEventCounter.getTabletInsertionEventCount()
542-
< PipeConfig.getInstance()
543-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
542+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
544543
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
545-
< PipeConfig.getInstance()
546-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
544+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
547545
&& retryEventQueue.size() + retryTsFileQueue.size()
548-
< PipeConfig.getInstance()
549-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) {
546+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) {
550547
return;
551548
}
552549

@@ -604,14 +601,11 @@ private void transferQueuedEventsIfNecessary(final boolean forced) {
604601
if (System.currentTimeMillis() - retryStartTime
605602
> PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) {
606603
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
607-
< PipeConfig.getInstance()
608-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
604+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
609605
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
610-
< PipeConfig.getInstance()
611-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
606+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
612607
&& retryEventQueue.size() + retryTsFileQueue.size()
613-
< PipeConfig.getInstance()
614-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) {
608+
< PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) {
615609
return;
616610
}
617611

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,9 @@ public class CommonConfig {
269269
private long pipeConnectorRetryIntervalMs = 1000L;
270270
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
271271

272-
private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
273-
private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
274-
private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
272+
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
273+
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
274+
private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
275275
private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
276276
private int pipeAsyncConnectorSelectorNumber =
277277
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -1058,7 +1058,7 @@ public void setPipeConnectorHandshakeTimeoutMs(long pipeConnectorHandshakeTimeou
10581058
} finally {
10591059
if (fPipeConnectorHandshakeTimeoutMs != this.pipeConnectorHandshakeTimeoutMs) {
10601060
logger.info(
1061-
"pipeConnectorHandshakeTimeoutMs is set to {}.", fPipeConnectorHandshakeTimeoutMs);
1061+
"pipeConnectorHandshakeTimeoutMs is set to {}.", this.pipeConnectorHandshakeTimeoutMs);
10621062
}
10631063
}
10641064
}
@@ -1127,55 +1127,54 @@ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
11271127
return pipeConnectorRPCThriftCompressionEnabled;
11281128
}
11291129

1130-
public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
1131-
int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
1132-
if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
1133-
== pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
1130+
public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
1131+
int pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
1132+
if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize
1133+
== pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
11341134
return;
11351135
}
1136-
this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
1137-
pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
1136+
this.pipeAsyncSinkForcedRetryTsFileEventQueueSize =
1137+
pipeAsyncSinkForcedRetryTsFileEventQueueSize;
11381138
logger.info(
1139-
"pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to {}.",
1140-
pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold);
1139+
"pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.",
1140+
pipeAsyncSinkForcedRetryTsFileEventQueueSize);
11411141
}
11421142

1143-
public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
1144-
return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
1143+
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
1144+
return pipeAsyncSinkForcedRetryTsFileEventQueueSize;
11451145
}
11461146

1147-
public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
1148-
int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
1149-
if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold
1150-
== pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
1147+
public void setPipeAsyncSinkForcedRetryTabletEventQueueSize(
1148+
int pipeAsyncSinkForcedRetryTabletEventQueueSize) {
1149+
if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize
1150+
== pipeAsyncSinkForcedRetryTabletEventQueueSize) {
11511151
return;
11521152
}
1153-
this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
1154-
pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
1153+
this.pipeAsyncSinkForcedRetryTabletEventQueueSize =
1154+
pipeAsyncSinkForcedRetryTabletEventQueueSize;
11551155
logger.info(
1156-
"pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to {}.",
1157-
pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold);
1156+
"pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.",
1157+
pipeAsyncSinkForcedRetryTabletEventQueueSize);
11581158
}
11591159

1160-
public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
1161-
return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
1160+
public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
1161+
return pipeAsyncSinkForcedRetryTabletEventQueueSize;
11621162
}
11631163

1164-
public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
1165-
int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
1166-
if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold
1167-
== pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
1164+
public void setPipeAsyncSinkForcedRetryTotalEventQueueSize(
1165+
int pipeAsyncSinkForcedRetryTotalEventQueueSize) {
1166+
if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize
1167+
== pipeAsyncSinkForcedRetryTotalEventQueueSize) {
11681168
return;
11691169
}
1170-
this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
1171-
pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
1170+
this.pipeAsyncSinkForcedRetryTotalEventQueueSize = pipeAsyncSinkForcedRetryTotalEventQueueSize;
11721171
logger.info(
1173-
"pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to {}.",
1174-
pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold);
1172+
"pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.",
1173+
pipeAsyncSinkForcedRetryTotalEventQueueSize);
11751174
}
11761175

1177-
public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
1178-
return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
1176+
public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
1177+
return pipeAsyncSinkForcedRetryTotalEventQueueSize;
11791178
}
11801179

11811180
public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -183,16 +183,16 @@ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
183183
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
184184
}
185185

186-
public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
187-
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
186+
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
187+
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize();
188188
}
189189

190-
public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
191-
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
190+
public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
191+
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize();
192192
}
193193

194-
public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
195-
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
194+
public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
195+
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
196196
}
197197

198198
public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
@@ -541,13 +541,13 @@ public void printAllConfigs() {
541541

542542
LOGGER.info(
543543
"PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
544-
getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
544+
getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
545545
LOGGER.info(
546546
"PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
547-
getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
547+
getPipeAsyncSinkForcedRetryTabletEventQueueSize());
548548
LOGGER.info(
549549
"PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
550-
getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
550+
getPipeAsyncSinkForcedRetryTotalEventQueueSize());
551551
LOGGER.info(
552552
"PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
553553
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -368,36 +368,32 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
368368
"pipe_async_connector_max_retry_execution_time_ms_per_call",
369369
String.valueOf(
370370
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
371-
config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
371+
config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
372372
Integer.parseInt(
373373
Optional.ofNullable(
374374
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
375375
.orElse(
376376
properties.getProperty(
377377
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
378378
String.valueOf(
379-
config
380-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
381-
config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
379+
config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize())))));
380+
config.setPipeAsyncSinkForcedRetryTabletEventQueueSize(
382381
Integer.parseInt(
383382
Optional.ofNullable(
384383
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
385384
.orElse(
386385
properties.getProperty(
387386
"pipe_async_connector_forced_retry_tablet_event_queue_size",
388387
String.valueOf(
389-
config
390-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
391-
config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
388+
config.getPipeAsyncSinkForcedRetryTabletEventQueueSize())))));
389+
config.setPipeAsyncSinkForcedRetryTotalEventQueueSize(
392390
Integer.parseInt(
393391
Optional.ofNullable(
394392
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
395393
.orElse(
396394
properties.getProperty(
397395
"pipe_async_connector_forced_retry_total_event_queue_size",
398-
String.valueOf(
399-
config
400-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
396+
String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize())))));
401397
config.setRateLimiterHotReloadCheckIntervalMs(
402398
Integer.parseInt(
403399
properties.getProperty(

0 commit comments

Comments
 (0)