Skip to content

Commit 90331ae

Browse files
authored
[To dev/1.3] Fixed the bugs related to device auto-create alignment ignorance (#16780) (#16782)
* Fixed the bugs related to device auto-create alignment ignorance (#16780) * plan-first * refact * refa * fux * fix * ref * partial-fix * fix * may-final * fix * PBTree * fix * gras * err * bugfix (cherry picked from commit 9fd9d7e) * fix * Fixed the bug related to "Fixed the bugs related to device auto-create alignment ignorance" (#16781) * gsa * ff * ff * fix * fix * fix * f9ix
1 parent db77455 commit 90331ae

File tree

17 files changed

+303
-68
lines changed

17 files changed

+303
-68
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,108 @@ public void testLoad() throws Exception {
262262
}
263263
}
264264

265+
@Test
266+
// Shall succeed with tablet conversion
267+
public void testLoadWithAlignmentMismatch() throws Exception {
268+
registerSchema();
269+
270+
final long writtenPoint1;
271+
// device 0, sg 0
272+
try (final TsFileGenerator generator =
273+
new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
274+
// Wrong, with 04-07 non-exist
275+
generator.registerAlignedTimeseries(
276+
SchemaConfig.DEVICE_0,
277+
Arrays.asList(
278+
SchemaConfig.MEASUREMENT_00,
279+
SchemaConfig.MEASUREMENT_01,
280+
SchemaConfig.MEASUREMENT_02,
281+
SchemaConfig.MEASUREMENT_03,
282+
SchemaConfig.MEASUREMENT_04,
283+
SchemaConfig.MEASUREMENT_05,
284+
SchemaConfig.MEASUREMENT_06,
285+
SchemaConfig.MEASUREMENT_07));
286+
generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, true);
287+
writtenPoint1 = generator.getTotalNumber();
288+
}
289+
290+
final long writtenPoint2;
291+
// device 2, device 3, device4, sg 1
292+
try (final TsFileGenerator generator =
293+
new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
294+
// right
295+
generator.registerTimeseries(
296+
SchemaConfig.DEVICE_2, Collections.singletonList(SchemaConfig.MEASUREMENT_20));
297+
// right
298+
generator.registerTimeseries(
299+
SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30));
300+
// Wrong, with 06 non-exist
301+
generator.registerTimeseries(
302+
SchemaConfig.DEVICE_4,
303+
Arrays.asList(SchemaConfig.MEASUREMENT_40, SchemaConfig.MEASUREMENT_06));
304+
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
305+
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
306+
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, false);
307+
for (int i = 0; i < 1000; i++) {
308+
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, false);
309+
}
310+
writtenPoint2 = generator.getTotalNumber();
311+
}
312+
313+
try (final Connection connection = EnvFactory.getEnv().getConnection();
314+
final Statement statement = connection.createStatement()) {
315+
316+
try {
317+
statement.execute(
318+
String.format(
319+
"load \"%s\" with ('database-level'='2', 'convert-on-type-mismatch'='false')",
320+
tmpDir.getAbsolutePath() + File.separator + "1-0-0-0.tsfile"));
321+
Assert.fail();
322+
} catch (final Exception e) {
323+
Assert.assertTrue(
324+
e.getMessage()
325+
.contains(
326+
"TimeSeries under this device is not aligned, please use createTimeSeries or change device. (Path: root.sg.test_0.d_0)."));
327+
}
328+
329+
try {
330+
statement.execute(
331+
String.format(
332+
"load \"%s\" with ('database-level'='2', 'convert-on-type-mismatch'='false')",
333+
tmpDir.getAbsolutePath() + File.separator + "2-0-0-0.tsfile"));
334+
Assert.fail();
335+
} catch (final Exception e) {
336+
Assert.assertTrue(
337+
e.getMessage()
338+
.contains(
339+
"TimeSeries under this device is aligned, please use createAlignedTimeSeries or change device. (Path: root.sg.test_1.a_4)."));
340+
}
341+
342+
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
343+
344+
try (final ResultSet resultSet =
345+
statement.executeQuery("select count(*) from root.sg.** group by level=1,2")) {
346+
if (resultSet.next()) {
347+
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
348+
Assert.assertEquals(writtenPoint1, sg1Count);
349+
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
350+
Assert.assertEquals(writtenPoint2, sg2Count);
351+
} else {
352+
Assert.fail("This ResultSet is empty.");
353+
}
354+
}
355+
}
356+
357+
// Try to delete after loading. Expect no deadlock
358+
try (final Connection connection = EnvFactory.getEnv().getConnection();
359+
final Statement statement = connection.createStatement()) {
360+
statement.execute(
361+
String.format(
362+
"delete timeseries %s.%s",
363+
SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_00.getMeasurementId()));
364+
}
365+
}
366+
265367
@Test
266368
public void testLoadAcrossMultipleTimePartitions() throws Exception {
267369
registerSchema();

integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iotdb.db.it.schema;
2020

21+
import org.apache.iotdb.db.it.utils.TestUtils;
2122
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
2223
import org.apache.iotdb.it.env.EnvFactory;
2324
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -34,6 +35,9 @@
3435
import java.sql.ResultSet;
3536
import java.sql.SQLException;
3637
import java.sql.Statement;
38+
import java.util.Collections;
39+
40+
import static org.junit.Assert.fail;
3741

3842
/**
3943
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the
@@ -141,4 +145,22 @@ private void assertTimeseriesEquals(String[] timeSeriesArray) throws SQLExceptio
141145
}
142146
Assert.assertEquals(timeSeriesArray.length, count);
143147
}
148+
149+
@Test
150+
public void testDifferentDeviceAlignment() {
151+
try (Connection connection = EnvFactory.getEnv().getConnection();
152+
Statement statement = connection.createStatement()) {
153+
// Should ignore the alignment difference
154+
statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)");
155+
// Should use the existing alignment
156+
statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
157+
statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)");
158+
TestUtils.assertResultSetEqual(
159+
statement.executeQuery("select * from root.sg2.d"),
160+
"Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
161+
Collections.singleton("-1,null,1.0,null,null,"));
162+
} catch (SQLException ignored) {
163+
fail();
164+
}
165+
}
144166
}

integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.it.schema;
2121

22+
import org.apache.iotdb.db.it.utils.TestUtils;
2223
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
2324
import org.apache.iotdb.it.env.EnvFactory;
2425
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -35,6 +36,7 @@
3536
import java.sql.ResultSet;
3637
import java.sql.SQLException;
3738
import java.sql.Statement;
39+
import java.util.Collections;
3840
import java.util.HashSet;
3941
import java.util.Set;
4042

@@ -308,6 +310,12 @@ public void testDifferentDeviceAlignment() {
308310
statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
309311
// Should ignore the alignment difference
310312
statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)");
313+
// Should use the existing alignment
314+
statement.execute("insert into root.sg2.d (time, s4) aligned values (-1, 1)");
315+
TestUtils.assertResultSetEqual(
316+
statement.executeQuery("select * from root.sg2.d"),
317+
"Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
318+
Collections.singleton("-1,null,1.0,null,null,"));
311319
} catch (SQLException ignored) {
312320
fail();
313321
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public enum TSStatusCode {
9393
WRITE_PROCESS_REJECT(606),
9494
OUT_OF_TTL(607),
9595
COMPACTION_ERROR(608),
96-
@Deprecated
9796
ALIGNED_TIMESERIES_ERROR(609),
9897
WAL_ERROR(610),
9998
DISK_SPACE_INSUFFICIENT(611),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -179,23 +179,23 @@ public TSStatus visitInternalCreateTimeSeries(
179179
final PartialPath devicePath = node.getDevicePath();
180180
final MeasurementGroup measurementGroup = node.getMeasurementGroup();
181181

182-
final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
182+
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new ArrayList<>();
183183
final List<TSStatus> failingStatus = new ArrayList<>();
184184

185185
if (node.isAligned()) {
186186
executeInternalCreateAlignedTimeSeries(
187187
devicePath,
188188
measurementGroup,
189189
schemaRegion,
190-
alreadyExistingTimeSeries,
190+
existingTimeSeriesAndAlignmentMismatch,
191191
failingStatus,
192192
node.isGeneratedByPipe());
193193
} else {
194194
executeInternalCreateTimeSeries(
195195
devicePath,
196196
measurementGroup,
197197
schemaRegion,
198-
alreadyExistingTimeSeries,
198+
existingTimeSeriesAndAlignmentMismatch,
199199
failingStatus,
200200
node.isGeneratedByPipe());
201201
}
@@ -204,8 +204,8 @@ public TSStatus visitInternalCreateTimeSeries(
204204
return RpcUtils.getStatus(failingStatus);
205205
}
206206

207-
if (!alreadyExistingTimeSeries.isEmpty()) {
208-
return RpcUtils.getStatus(alreadyExistingTimeSeries);
207+
if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
208+
return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
209209
}
210210

211211
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
@@ -217,7 +217,7 @@ public TSStatus visitInternalCreateMultiTimeSeries(
217217
PartialPath devicePath;
218218
MeasurementGroup measurementGroup;
219219

220-
final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
220+
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new ArrayList<>();
221221
final List<TSStatus> failingStatus = new ArrayList<>();
222222

223223
for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
@@ -229,15 +229,15 @@ public TSStatus visitInternalCreateMultiTimeSeries(
229229
devicePath,
230230
measurementGroup,
231231
schemaRegion,
232-
alreadyExistingTimeSeries,
232+
existingTimeSeriesAndAlignmentMismatch,
233233
failingStatus,
234234
node.isGeneratedByPipe());
235235
} else {
236236
executeInternalCreateTimeSeries(
237237
devicePath,
238238
measurementGroup,
239239
schemaRegion,
240-
alreadyExistingTimeSeries,
240+
existingTimeSeriesAndAlignmentMismatch,
241241
failingStatus,
242242
node.isGeneratedByPipe());
243243
}
@@ -247,8 +247,8 @@ public TSStatus visitInternalCreateMultiTimeSeries(
247247
return RpcUtils.getStatus(failingStatus);
248248
}
249249

250-
if (!alreadyExistingTimeSeries.isEmpty()) {
251-
return RpcUtils.getStatus(alreadyExistingTimeSeries);
250+
if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
251+
return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
252252
}
253253

254254
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
@@ -258,11 +258,12 @@ private void executeInternalCreateTimeSeries(
258258
final PartialPath devicePath,
259259
final MeasurementGroup measurementGroup,
260260
final ISchemaRegion schemaRegion,
261-
final List<TSStatus> alreadyExistingTimeSeries,
261+
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
262262
final List<TSStatus> failingStatus,
263263
final boolean withMerge) {
264264
final int size = measurementGroup.getMeasurements().size();
265265
// todo implement batch creation of one device in SchemaRegion
266+
boolean alignedIsSet = false;
266267
for (int i = 0; i < size; i++) {
267268
try {
268269
final ICreateTimeSeriesPlan createTimeSeriesPlan =
@@ -273,11 +274,17 @@ private void executeInternalCreateTimeSeries(
273274
// Thus the original ones are not altered
274275
((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).setWithMerge(withMerge);
275276
schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
277+
if (((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).getAligned().get() && !alignedIsSet) {
278+
existingTimeSeriesAndAlignmentMismatch.add(
279+
new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
280+
.setMessage(PartialPath.transformDataToString(devicePath)));
281+
alignedIsSet = true;
282+
}
276283
} catch (final MeasurementAlreadyExistException e) {
277284
// There's no need to internal create time series.
278-
alreadyExistingTimeSeries.add(
285+
existingTimeSeriesAndAlignmentMismatch.add(
279286
RpcUtils.getStatus(
280-
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
287+
e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath())));
281288
} catch (final MetadataException e) {
282289
logger.warn("{}: MetaData error: ", e.getMessage(), e);
283290
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
@@ -289,7 +296,7 @@ private void executeInternalCreateAlignedTimeSeries(
289296
final PartialPath devicePath,
290297
final MeasurementGroup measurementGroup,
291298
final ISchemaRegion schemaRegion,
292-
final List<TSStatus> alreadyExistingTimeSeries,
299+
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
293300
final List<TSStatus> failingStatus,
294301
final boolean withMerge) {
295302
final List<String> measurementList = measurementGroup.getMeasurements();
@@ -326,9 +333,9 @@ private void executeInternalCreateAlignedTimeSeries(
326333
// The existence check will be executed before truly creation
327334
// There's no need to internal create time series.
328335
final MeasurementPath measurementPath = e.getMeasurementPath();
329-
alreadyExistingTimeSeries.add(
336+
existingTimeSeriesAndAlignmentMismatch.add(
330337
RpcUtils.getStatus(
331-
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
338+
e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath())));
332339

333340
// remove the existing time series from plan
334341
final int index = measurementList.indexOf(measurementPath.getMeasurement());
@@ -379,6 +386,11 @@ private void executeInternalCreateAlignedTimeSeries(
379386
shouldRetry = false;
380387
}
381388
}
389+
if (!((CreateAlignedTimeSeriesPlanImpl) createAlignedTimeSeriesPlan).getAligned().get()) {
390+
existingTimeSeriesAndAlignmentMismatch.add(
391+
new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
392+
.setMessage(PartialPath.transformDataToString(devicePath)));
393+
}
382394
}
383395

384396
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ private RegionExecutionResult executeInternalCreateTimeSeries(
584584
alreadyExistingStatus.add(
585585
RpcUtils.getStatus(
586586
metadataException.getErrorCode(),
587-
MeasurementPath.transformDataToString(
587+
PartialPath.transformDataToString(
588588
((MeasurementAlreadyExistException) metadataException)
589589
.getMeasurementPath())));
590590
} else {
@@ -674,7 +674,7 @@ private RegionExecutionResult executeInternalCreateMultiTimeSeries(
674674
alreadyExistingStatus.add(
675675
RpcUtils.getStatus(
676676
metadataException.getErrorCode(),
677-
MeasurementPath.transformDataToString(
677+
PartialPath.transformDataToString(
678678
((MeasurementAlreadyExistException) metadataException)
679679
.getMeasurementPath())));
680680
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,9 @@ private void doAutoCreateAndVerify()
678678
} catch (AuthException | LoadAnalyzeTypeMismatchException e) {
679679
throw e;
680680
} catch (Exception e) {
681+
if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && isConvertOnTypeMismatch) {
682+
throw (LoadAnalyzeTypeMismatchException) e.getCause();
683+
}
681684
LOGGER.warn("Auto create or verify schema error.", e);
682685
throw new SemanticException(
683686
String.format(

0 commit comments

Comments
 (0)