Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,108 @@ public void testLoad() throws Exception {
}
}

@Test
// Shall succeed with tablet conversion
public void testLoadWithAlignmentMismatch() throws Exception {
registerSchema();

final long writtenPoint1;
// device 0, sg 0
try (final TsFileGenerator generator =
new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
// Wrong, with 04-07 non-exist
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03,
SchemaConfig.MEASUREMENT_04,
SchemaConfig.MEASUREMENT_05,
SchemaConfig.MEASUREMENT_06,
SchemaConfig.MEASUREMENT_07));
generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
}

final long writtenPoint2;
// device 2, device 3, device4, sg 1
try (final TsFileGenerator generator =
new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
// right
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Collections.singletonList(SchemaConfig.MEASUREMENT_20));
// right
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30));
// Wrong, with 06 non-exist
generator.registerTimeseries(
SchemaConfig.DEVICE_4,
Arrays.asList(SchemaConfig.MEASUREMENT_40, SchemaConfig.MEASUREMENT_06));
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, false);
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, false);
}
writtenPoint2 = generator.getTotalNumber();
}

try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

try {
statement.execute(
String.format(
"load \"%s\" with ('database-level'='2', 'convert-on-type-mismatch'='false')",
tmpDir.getAbsolutePath() + File.separator + "1-0-0-0.tsfile"));
Assert.fail();
} catch (final Exception e) {
Assert.assertTrue(
e.getMessage()
.contains(
"TimeSeries under this device is not aligned, please use createTimeSeries or change device. (Path: root.sg.test_0.d_0)."));
}

try {
statement.execute(
String.format(
"load \"%s\" with ('database-level'='2', 'convert-on-type-mismatch'='false')",
tmpDir.getAbsolutePath() + File.separator + "2-0-0-0.tsfile"));
Assert.fail();
} catch (final Exception e) {
Assert.assertTrue(
e.getMessage()
.contains(
"TimeSeries under this device is aligned, please use createAlignedTimeSeries or change device. (Path: root.sg.test_1.a_4)."));
}

statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(*) from root.sg.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}

// Try to delete after loading. Expect no deadlock
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"delete timeseries %s.%s",
SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_00.getMeasurementId()));
}
}

@Test
public void testLoadAcrossMultipleTimePartitions() throws Exception {
registerSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.it.schema;

import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand All @@ -34,6 +35,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;

import static org.junit.Assert.fail;

/**
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the
Expand Down Expand Up @@ -141,4 +145,22 @@ private void assertTimeseriesEquals(String[] timeSeriesArray) throws SQLExceptio
}
Assert.assertEquals(timeSeriesArray.length, count);
}

@Test
public void testDifferentDeviceAlignment() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
// Should ignore the alignment difference
statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)");
// Should use the existing alignment
statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)");
TestUtils.assertResultSetEqual(
statement.executeQuery("select * from root.sg2.d"),
"Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
Collections.singleton("-1,null,1.0,null,null,"));
} catch (SQLException ignored) {
fail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.it.schema;

import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand All @@ -35,6 +36,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -308,6 +310,12 @@ public void testDifferentDeviceAlignment() {
statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
// Should ignore the alignment difference
statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)");
// Should use the existing alignment
statement.execute("insert into root.sg2.d (time, s4) aligned values (-1, 1)");
TestUtils.assertResultSetEqual(
statement.executeQuery("select * from root.sg2.d"),
"Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
Collections.singleton("-1,null,1.0,null,null,"));
} catch (SQLException ignored) {
fail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public enum TSStatusCode {
WRITE_PROCESS_REJECT(606),
OUT_OF_TTL(607),
COMPACTION_ERROR(608),
@Deprecated
ALIGNED_TIMESERIES_ERROR(609),
WAL_ERROR(610),
DISK_SPACE_INSUFFICIENT(611),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,23 @@ public TSStatus visitInternalCreateTimeSeries(
final PartialPath devicePath = node.getDevicePath();
final MeasurementGroup measurementGroup = node.getMeasurementGroup();

final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new ArrayList<>();
final List<TSStatus> failingStatus = new ArrayList<>();

if (node.isAligned()) {
executeInternalCreateAlignedTimeSeries(
devicePath,
measurementGroup,
schemaRegion,
alreadyExistingTimeSeries,
existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
} else {
executeInternalCreateTimeSeries(
devicePath,
measurementGroup,
schemaRegion,
alreadyExistingTimeSeries,
existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
}
Expand All @@ -204,8 +204,8 @@ public TSStatus visitInternalCreateTimeSeries(
return RpcUtils.getStatus(failingStatus);
}

if (!alreadyExistingTimeSeries.isEmpty()) {
return RpcUtils.getStatus(alreadyExistingTimeSeries);
if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
}

return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
Expand All @@ -217,7 +217,7 @@ public TSStatus visitInternalCreateMultiTimeSeries(
PartialPath devicePath;
MeasurementGroup measurementGroup;

final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new ArrayList<>();
final List<TSStatus> failingStatus = new ArrayList<>();

for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
Expand All @@ -229,15 +229,15 @@ public TSStatus visitInternalCreateMultiTimeSeries(
devicePath,
measurementGroup,
schemaRegion,
alreadyExistingTimeSeries,
existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
} else {
executeInternalCreateTimeSeries(
devicePath,
measurementGroup,
schemaRegion,
alreadyExistingTimeSeries,
existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
}
Expand All @@ -247,8 +247,8 @@ public TSStatus visitInternalCreateMultiTimeSeries(
return RpcUtils.getStatus(failingStatus);
}

if (!alreadyExistingTimeSeries.isEmpty()) {
return RpcUtils.getStatus(alreadyExistingTimeSeries);
if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
}

return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
Expand All @@ -258,11 +258,12 @@ private void executeInternalCreateTimeSeries(
final PartialPath devicePath,
final MeasurementGroup measurementGroup,
final ISchemaRegion schemaRegion,
final List<TSStatus> alreadyExistingTimeSeries,
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
final List<TSStatus> failingStatus,
final boolean withMerge) {
final int size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
boolean alignedIsSet = false;
for (int i = 0; i < size; i++) {
try {
final ICreateTimeSeriesPlan createTimeSeriesPlan =
Expand All @@ -273,11 +274,17 @@ private void executeInternalCreateTimeSeries(
// Thus the original ones are not altered
((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).setWithMerge(withMerge);
schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
if (((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).getAligned().get() && !alignedIsSet) {
existingTimeSeriesAndAlignmentMismatch.add(
new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
.setMessage(PartialPath.transformDataToString(devicePath)));
alignedIsSet = true;
}
} catch (final MeasurementAlreadyExistException e) {
// There's no need to internal create time series.
alreadyExistingTimeSeries.add(
existingTimeSeriesAndAlignmentMismatch.add(
RpcUtils.getStatus(
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath())));
} catch (final MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
Expand All @@ -289,7 +296,7 @@ private void executeInternalCreateAlignedTimeSeries(
final PartialPath devicePath,
final MeasurementGroup measurementGroup,
final ISchemaRegion schemaRegion,
final List<TSStatus> alreadyExistingTimeSeries,
final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
final List<TSStatus> failingStatus,
final boolean withMerge) {
final List<String> measurementList = measurementGroup.getMeasurements();
Expand Down Expand Up @@ -326,9 +333,9 @@ private void executeInternalCreateAlignedTimeSeries(
// The existence check will be executed before truly creation
// There's no need to internal create time series.
final MeasurementPath measurementPath = e.getMeasurementPath();
alreadyExistingTimeSeries.add(
existingTimeSeriesAndAlignmentMismatch.add(
RpcUtils.getStatus(
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath())));

// remove the existing time series from plan
final int index = measurementList.indexOf(measurementPath.getMeasurement());
Expand Down Expand Up @@ -379,6 +386,11 @@ private void executeInternalCreateAlignedTimeSeries(
shouldRetry = false;
}
}
if (!((CreateAlignedTimeSeriesPlanImpl) createAlignedTimeSeriesPlan).getAligned().get()) {
existingTimeSeriesAndAlignmentMismatch.add(
new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
.setMessage(PartialPath.transformDataToString(devicePath)));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ private RegionExecutionResult executeInternalCreateTimeSeries(
alreadyExistingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
MeasurementPath.transformDataToString(
PartialPath.transformDataToString(
((MeasurementAlreadyExistException) metadataException)
.getMeasurementPath())));
} else {
Expand Down Expand Up @@ -674,7 +674,7 @@ private RegionExecutionResult executeInternalCreateMultiTimeSeries(
alreadyExistingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
MeasurementPath.transformDataToString(
PartialPath.transformDataToString(
((MeasurementAlreadyExistException) metadataException)
.getMeasurementPath())));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,9 @@ private void doAutoCreateAndVerify()
} catch (AuthException | LoadAnalyzeTypeMismatchException e) {
throw e;
} catch (Exception e) {
if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && isConvertOnTypeMismatch) {
throw (LoadAnalyzeTypeMismatchException) e.getCause();
}
LOGGER.warn("Auto create or verify schema error.", e);
throw new SemanticException(
String.format(
Expand Down
Loading
Loading