Skip to content

Commit c7ee924

Browse files
authored
[To dev/1.3] Load: Fixed the issue of TSFile parent directory being null and TSFile resource being updated during the Load process. (#16751) (#16790)
1 parent 90331ae commit c7ee924

File tree

6 files changed

+99
-11
lines changed

6 files changed

+99
-11
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.db.it.utils.TestUtils;
2424
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
2525
import org.apache.iotdb.it.env.EnvFactory;
26+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2627
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2728
import org.apache.iotdb.it.utils.TsFileGenerator;
2829
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -66,6 +67,7 @@
6667
import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQuery;
6768
import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSeriesPrivilege;
6869
import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSystemPrivileges;
70+
import static org.apache.iotdb.it.env.cluster.ClusterConstant.USER_DIR;
6971

7072
@RunWith(IoTDBTestRunner.class)
7173
@Category({LocalStandaloneIT.class, ClusterIT.class})
@@ -929,6 +931,47 @@ public void testLoadWithOnNonStandardTsFileName() throws Exception {
929931
}
930932
}
931933

934+
@Test
935+
public void testLoadWithRelativePathName() throws Exception {
936+
DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
937+
938+
registerSchema();
939+
940+
final long writtenPoint1;
941+
// device 0, device 1, sg 0
942+
File relativePathFile = new File(System.getProperty(USER_DIR), "1-0-0-0.tsfile");
943+
try {
944+
try (final TsFileGenerator generator = new TsFileGenerator(relativePathFile)) {
945+
generator.registerTimeseries(
946+
SchemaConfig.DEVICE_0, Collections.singletonList(SchemaConfig.MEASUREMENT_00));
947+
generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL / 10_000, false);
948+
writtenPoint1 = generator.getTotalNumber();
949+
}
950+
951+
try (final Connection connection =
952+
EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
953+
final Statement statement = connection.createStatement()) {
954+
955+
statement.execute(String.format("load \"%s\" sglevel=2", "1-0-0-0.tsfile"));
956+
957+
try (final ResultSet resultSet =
958+
statement.executeQuery("select count(*) from root.sg.** group by level=1,2")) {
959+
if (resultSet.next()) {
960+
final long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
961+
Assert.assertEquals(writtenPoint1, sg1Count);
962+
} else {
963+
Assert.fail("This ResultSet is empty.");
964+
}
965+
}
966+
}
967+
968+
} finally {
969+
if (relativePathFile.exists()) {
970+
relativePathFile.delete();
971+
}
972+
}
973+
}
974+
932975
@Test
933976
public void testLoadWithMods() throws Exception {
934977
final long writtenPoint1;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,22 @@ public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDi
170170
}
171171
} else if (planNode instanceof LoadSingleTsFileNode) { // do not need to split
172172
final TsFileResource tsFileResource = ((LoadSingleTsFileNode) planNode).getTsFileResource();
173+
final String filePath = tsFileResource.getTsFile().getAbsolutePath();
173174
try {
174175
PipeDataNodeAgent.runtime().assignProgressIndexForTsFileLoad(tsFileResource);
175176
tsFileResource.setGeneratedByPipe(isGeneratedByPipe);
176177
tsFileResource.serialize();
178+
TsFileResource cloneTsFileResource = null;
179+
try {
180+
cloneTsFileResource = tsFileResource.shallowCloneForNative();
181+
} catch (CloneNotSupportedException e) {
182+
cloneTsFileResource = tsFileResource.shallowClone();
183+
}
177184

178185
StorageEngine.getInstance()
179186
.getDataRegion((DataRegionId) groupId)
180187
.loadNewTsFile(
181-
tsFileResource,
188+
cloneTsFileResource,
182189
((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
183190
isGeneratedByPipe,
184191
false);
@@ -189,8 +196,7 @@ public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDi
189196
resultStatus.setMessage(e.getMessage());
190197
throw new FragmentInstanceDispatchException(resultStatus);
191198
} catch (IOException e) {
192-
LOGGER.warn(
193-
"Serialize TsFileResource {} error.", tsFileResource.getTsFile().getAbsolutePath(), e);
199+
LOGGER.warn("Serialize TsFileResource {} error.", filePath, e);
194200
TSStatus resultStatus = new TSStatus();
195201
resultStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
196202
resultStatus.setMessage(e.getMessage());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class LoadTsFileStatement extends Statement {
5656
private List<Long> writePointCountList;
5757

5858
public LoadTsFileStatement(String filePath) throws FileNotFoundException {
59-
this.file = new File(filePath);
59+
this.file = new File(filePath).getAbsoluteFile();
6060
this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel();
6161
this.verifySchema = true;
6262
this.deleteAfterLoad = false;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3071,10 +3071,12 @@ public void loadNewTsFile(
30713071
final boolean isGeneratedByPipe,
30723072
final boolean isFromConsensus)
30733073
throws LoadFileException {
3074-
final File tsfileToBeInserted = newTsFileResource.getTsFile();
3074+
final File tsfileToBeInserted = newTsFileResource.getTsFile().getAbsoluteFile();
30753075
final long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
30763076

3077-
if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)) {
3077+
if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)
3078+
|| !tsfileToBeInserted.exists()
3079+
|| tsfileToBeInserted.getParentFile() == null) {
30783080
throw new LoadFileException(
30793081
"tsfile validate failed, " + newTsFileResource.getTsFile().getName());
30803082
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
import static org.apache.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
7979

8080
@SuppressWarnings("java:S1135") // ignore todos
81-
public class TsFileResource {
81+
public class TsFileResource implements Cloneable {
8282

8383
private static final long INSTANCE_SIZE =
8484
RamUsageEstimator.shallowSizeOfInstance(TsFileResource.class)
@@ -1254,4 +1254,39 @@ public Map<IDeviceID, List<Pair<String, TimeValuePair>>> getLastValues() {
12541254
public void setLastValues(Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues) {
12551255
this.lastValues = lastValues;
12561256
}
1257+
1258+
public TsFileResource shallowClone() {
1259+
TsFileResource cloned = new TsFileResource();
1260+
cloned.file = this.file;
1261+
cloned.timeIndex = this.timeIndex;
1262+
cloned.maxPlanIndex = this.maxPlanIndex;
1263+
cloned.minPlanIndex = this.minPlanIndex;
1264+
cloned.compactionModFile = this.compactionModFile;
1265+
cloned.isSeq = this.isSeq;
1266+
cloned.tsFileRepairStatus = this.tsFileRepairStatus;
1267+
cloned.settleTsFileCallBack = this.settleTsFileCallBack;
1268+
cloned.deviceTimeIndexRamSize = this.deviceTimeIndexRamSize;
1269+
cloned.tsFileSize = this.tsFileSize;
1270+
cloned.processor = this.processor;
1271+
cloned.originTsFileResource = this.originTsFileResource;
1272+
cloned.isGeneratedByPipeConsensus = this.isGeneratedByPipeConsensus;
1273+
cloned.isGeneratedByPipe = this.isGeneratedByPipe;
1274+
cloned.insertionCompactionCandidateStatus = this.insertionCompactionCandidateStatus;
1275+
cloned.tierLevel = this.tierLevel;
1276+
cloned.pathToChunkMetadataListMap = this.pathToChunkMetadataListMap;
1277+
cloned.pathToReadOnlyMemChunkMap = this.pathToReadOnlyMemChunkMap;
1278+
cloned.pathToTimeSeriesMetadataMap = this.pathToTimeSeriesMetadataMap;
1279+
cloned.lastValues = this.lastValues;
1280+
cloned.maxProgressIndex.set(this.maxProgressIndex.get());
1281+
cloned.atomicStatus.set(this.atomicStatus.get());
1282+
cloned.isEmpty.set(this.isEmpty.get());
1283+
cloned.tsFileID = this.tsFileID;
1284+
cloned.prev = null;
1285+
cloned.next = null;
1286+
return cloned;
1287+
}
1288+
1289+
public TsFileResource shallowCloneForNative() throws CloneNotSupportedException {
1290+
return (TsFileResource) clone();
1291+
}
12571292
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,12 @@ public File selectTargetDirectory(
8181
throws DiskSpaceInsufficientException, LoadFileException {
8282
String fileDirRoot = null;
8383
try {
84-
fileDirRoot =
85-
Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath()))
86-
.map(Object::toString)
87-
.orElse(null);
84+
if (sourceDirectory != null) {
85+
fileDirRoot =
86+
Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath()))
87+
.map(Object::toString)
88+
.orElse(null);
89+
}
8890
} catch (Exception e) {
8991
logger.warn(
9092
"Exception occurs when reading target file's mount point {}",

0 commit comments

Comments
 (0)