Skip to content

Commit 91e48f0

Browse files
authored
feat: encode load attributes in active load directories (#16722)
* feat: encode load attributes in active load directories * spotless * fix * fix * fix * fix * fix * update it * update it * fix * update * update
1 parent a6191d9 commit 91e48f0

File tree

10 files changed

+460
-54
lines changed

10 files changed

+460
-54
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ private void generateFileWithNewModFile()
8585
// write mods file
8686
resource
8787
.getExclusiveModFile()
88-
.write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1"), 1, 2));
88+
.write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.de.s1"), 1, 2));
8989
resource.getExclusiveModFile().close();
9090
}
9191

9292
private void generateFileWithOldModFile()
9393
throws IOException, DataRegionException, WriteProcessException, IllegalPathException {
9494
TsFileResource resource = generateFile();
9595
ModificationFileV1 oldModFile = ModificationFileV1.getNormalMods(resource);
96-
oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.s1"), Long.MAX_VALUE, 1, 2));
96+
oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.de.s1"), Long.MAX_VALUE, 1, 2));
9797
oldModFile.close();
9898
}
9999

@@ -102,11 +102,11 @@ private TsFileResource generateFile()
102102
File tsfile = new File(tmpDir, "1-1-0-0.tsfile");
103103
try (TsFileWriter writer = new TsFileWriter(tsfile)) {
104104
writer.registerAlignedTimeseries(
105-
"root.test.d1",
105+
"root.test.d1.de",
106106
Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN)));
107107
Tablet tablet =
108108
new Tablet(
109-
"root.test.d1",
109+
"root.test.d1.de",
110110
Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN)));
111111
for (int i = 0; i < 5; i++) {
112112
tablet.addTimestamp(i, i);
@@ -138,13 +138,61 @@ public void testWithNewModFile()
138138
statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath()));
139139

140140
try (final ResultSet resultSet =
141-
statement.executeQuery("select count(s1) as c from root.test.d1")) {
141+
statement.executeQuery("select count(s1) as c from root.test.d1.de")) {
142142
Assert.assertTrue(resultSet.next());
143143
Assert.assertEquals(3, resultSet.getLong("c"));
144144
}
145145
}
146146
}
147147

148+
@Test
149+
public void testWithNewModFileAndLoadAttributes()
150+
throws SQLException,
151+
IOException,
152+
DataRegionException,
153+
WriteProcessException,
154+
IllegalPathException {
155+
generateFileWithNewModFile();
156+
final String databaseName = "root.test.d1";
157+
158+
try (final Connection connection = EnvFactory.getEnv().getConnection();
159+
final Statement statement = connection.createStatement()) {
160+
161+
statement.execute(
162+
String.format(
163+
"load \'%s\' with ("
164+
+ "'database-name'='%s',"
165+
+ "'database-level'='2',"
166+
+ "'verify'='true',"
167+
+ "'on-success'='none',"
168+
+ "'async'='true')",
169+
tmpDir.getAbsolutePath(), databaseName));
170+
171+
boolean databaseFound = false;
172+
out:
173+
for (int i = 0; i < 10; i++) {
174+
try (final ResultSet resultSet = statement.executeQuery("show databases")) {
175+
while (resultSet.next()) {
176+
final String currentDatabase = resultSet.getString(1);
177+
if (databaseName.equalsIgnoreCase(currentDatabase)) {
178+
databaseFound = true;
179+
break out;
180+
}
181+
}
182+
183+
try {
184+
Thread.sleep(1000);
185+
} catch (InterruptedException e) {
186+
break;
187+
}
188+
}
189+
}
190+
Assert.assertTrue(
191+
"The `database-level` parameter is not working; the generated database does not contain 'root.test.d1'.",
192+
databaseFound);
193+
}
194+
}
195+
148196
@Test
149197
public void testWithOldModFile()
150198
throws SQLException,
@@ -159,7 +207,7 @@ public void testWithOldModFile()
159207
statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath()));
160208

161209
try (final ResultSet resultSet =
162-
statement.executeQuery("select count(s1) as c from root.test.d1")) {
210+
statement.executeQuery("select count(s1) as c from root.test.d1.de")) {
163211
Assert.assertTrue(resultSet.next());
164212
Assert.assertEquals(3, resultSet.getLong("c"));
165213
Assert.assertTrue(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
101101
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
102102
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
103+
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
103104
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
104105
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
105106
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -570,7 +571,16 @@ protected TSStatus loadFileV2(
570571

571572
private TSStatus loadTsFileAsync(final String dataBaseName, final List<String> absolutePaths)
572573
throws IOException {
573-
if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths, true)) {
574+
final Map<String, String> loadAttributes =
575+
ActiveLoadPathHelper.buildAttributes(
576+
dataBaseName,
577+
null,
578+
shouldConvertDataTypeOnTypeMismatch,
579+
validateTsFile.get(),
580+
null,
581+
shouldMarkAsPipeRequest.get());
582+
583+
if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
574584
throw new PipeException("Load active listening pipe dir is not set.");
575585
}
576586
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
4343
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
4444
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
45+
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
4546
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
4647
import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
4748
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -281,7 +282,17 @@ private boolean doAsyncLoad(final IAnalysis analysis) {
281282
} else {
282283
databaseName = null;
283284
}
284-
if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, databaseName, isDeleteAfterLoad)) {
285+
final Map<String, String> activeLoadAttributes =
286+
ActiveLoadPathHelper.buildAttributes(
287+
databaseName,
288+
databaseLevel,
289+
isConvertOnTypeMismatch,
290+
isVerifySchema,
291+
tabletConversionThresholdBytes,
292+
isGeneratedByPipe);
293+
294+
if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
295+
tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
285296
analysis.setFinishQueryAfterAnalyze(true);
286297
setRealStatement(analysis);
287298
return true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
4949
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
5050
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
51+
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.PIPE_GENERATED_KEY;
5152
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY;
5253

5354
public class LoadTsFileStatement extends Statement {
@@ -63,8 +64,6 @@ public class LoadTsFileStatement extends Statement {
6364
private boolean isGeneratedByPipe = false;
6465
private boolean isAsyncLoad = false;
6566

66-
private Map<String, String> loadAttributes;
67-
6867
private List<File> tsFiles;
6968
private List<Boolean> isTableModel;
7069
private List<TsFileResource> resources;
@@ -245,15 +244,14 @@ public long getWritePointCount(int resourceIndex) {
245244
}
246245

247246
public void setLoadAttributes(final Map<String, String> loadAttributes) {
248-
this.loadAttributes = loadAttributes;
249-
initAttributes();
247+
initAttributes(loadAttributes);
250248
}
251249

252250
public boolean isAsyncLoad() {
253251
return isAsyncLoad;
254252
}
255253

256-
private void initAttributes() {
254+
private void initAttributes(final Map<String, String> loadAttributes) {
257255
this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
258256
this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
259257
this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
@@ -263,6 +261,9 @@ private void initAttributes() {
263261
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
264262
this.verifySchema = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
265263
this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
264+
if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes)) {
265+
markIsGeneratedByPipe();
266+
}
266267
}
267268

268269
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) {
@@ -314,7 +315,7 @@ public List<PartialPath> getPaths() {
314315
@Override
315316
public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelationalStatement(
316317
MPPQueryContext context) {
317-
loadAttributes = new HashMap<>();
318+
final Map<String, String> loadAttributes = new HashMap<>();
318319

319320
loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel));
320321
if (database != null) {
@@ -326,6 +327,9 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat
326327
loadAttributes.put(
327328
TABLET_CONVERSION_THRESHOLD_KEY, String.valueOf(tabletConversionThresholdBytes));
328329
loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad));
330+
if (isGeneratedByPipe) {
331+
loadAttributes.put(PIPE_GENERATED_KEY, String.valueOf(true));
332+
}
329333

330334
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
331335
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.nio.file.SimpleFileVisitor;
4040
import java.nio.file.attribute.BasicFileAttributes;
4141
import java.util.Arrays;
42+
import java.util.Map;
4243
import java.util.Objects;
4344
import java.util.Set;
4445
import java.util.concurrent.CopyOnWriteArraySet;
@@ -117,13 +118,23 @@ private void scan() throws IOException {
117118
.filter(this::isTsFileCompleted)
118119
.limit(currentAllowedPendingSize)
119120
.forEach(
120-
file -> {
121-
final File parentFile = new File(file).getParentFile();
121+
filePath -> {
122+
final File tsFile = new File(filePath);
123+
final Map<String, String> attributes =
124+
ActiveLoadPathHelper.parseAttributes(tsFile, listeningDirFile);
125+
126+
final File parentFile = tsFile.getParentFile();
127+
final boolean isTableModel =
128+
ActiveLoadPathHelper.containsDatabaseName(attributes)
129+
|| (parentFile != null
130+
&& !Objects.equals(
131+
parentFile.getAbsoluteFile(),
132+
listeningDirFile.getAbsoluteFile()));
133+
122134
activeLoadTsFileLoader.tryTriggerTsFileLoad(
123-
file,
124-
parentFile != null
125-
&& !Objects.equals(
126-
parentFile.getAbsoluteFile(), listeningDirFile.getAbsoluteFile()),
135+
tsFile.getAbsolutePath(),
136+
listeningDirFile.getAbsolutePath(),
137+
isTableModel,
127138
isGeneratedByPipe);
128139
});
129140
} catch (UncheckedIOException e) {

0 commit comments

Comments
 (0)