diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index d8a51b535014..97e05f8768c7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -262,6 +262,108 @@ public void testLoad() throws Exception { } } + @Test + // Shall succeed with tablet conversion + public void testLoadWithAlignmentMismatch() throws Exception { + registerSchema(); + + final long writtenPoint1; + // device 0, sg 0 + try (final TsFileGenerator generator = + new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) { + // Wrong, with 04-07 non-exist + generator.registerAlignedTimeseries( + SchemaConfig.DEVICE_0, + Arrays.asList( + SchemaConfig.MEASUREMENT_00, + SchemaConfig.MEASUREMENT_01, + SchemaConfig.MEASUREMENT_02, + SchemaConfig.MEASUREMENT_03, + SchemaConfig.MEASUREMENT_04, + SchemaConfig.MEASUREMENT_05, + SchemaConfig.MEASUREMENT_06, + SchemaConfig.MEASUREMENT_07)); + generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, true); + writtenPoint1 = generator.getTotalNumber(); + } + + final long writtenPoint2; + // device 2, device 3, device4, sg 1 + try (final TsFileGenerator generator = + new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) { + // right + generator.registerTimeseries( + SchemaConfig.DEVICE_2, Collections.singletonList(SchemaConfig.MEASUREMENT_20)); + // right + generator.registerTimeseries( + SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30)); + // Wrong, with 06 non-exist + generator.registerTimeseries( + SchemaConfig.DEVICE_4, + Arrays.asList(SchemaConfig.MEASUREMENT_40, SchemaConfig.MEASUREMENT_06)); + generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false); + generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false); + generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, false); + for (int i = 0; i < 1000; i++) { + generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, false); + } + writtenPoint2 = generator.getTotalNumber(); + } + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + + try { + statement.execute( + String.format( + "load \"%s\" with ('database-level'='2', 'convert-on-type-mismatch'='false')", + tmpDir.getAbsolutePath() + File.separator + "1-0-0-0.tsfile")); + Assert.fail(); + } catch (final Exception e) { + Assert.assertTrue( + e.getMessage() + .contains( + "TimeSeries under this device is not aligned, please use createTimeSeries or change device. (Path: root.sg.test_0.d_0).")); + } + + try { + statement.execute( + String.format( + "load \"%s\" with ('database-level'='2', 'convert-on-type-mismatch'='false')", + tmpDir.getAbsolutePath() + File.separator + "2-0-0-0.tsfile")); + Assert.fail(); + } catch (final Exception e) { + Assert.assertTrue( + e.getMessage() + .contains( + "TimeSeries under this device is aligned, please use createAlignedTimeSeries or change device. (Path: root.sg.test_1.a_4).")); + } + + statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath())); + + try (final ResultSet resultSet = + statement.executeQuery("select count(*) from root.sg.** group by level=1,2")) { + if (resultSet.next()) { + long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)"); + Assert.assertEquals(writtenPoint1, sg1Count); + long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)"); + Assert.assertEquals(writtenPoint2, sg2Count); + } else { + Assert.fail("This ResultSet is empty."); + } + } + } + + // Try to delete after loading. Expect no deadlock + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "delete timeseries %s.%s", + SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_00.getMeasurementId())); + } + } + @Test public void testLoadAcrossMultipleTimePartitions() throws Exception { registerSchema(); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java index 6a00ece6783e..23e2b5b99d2a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.it.schema; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.itbase.category.ClusterIT; @@ -34,6 +35,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; + +import static org.junit.Assert.fail; /** * Notice that, all test begins with "IoTDB" is integration test. All test which will start the @@ -141,4 +145,22 @@ private void assertTimeseriesEquals(String[] timeSeriesArray) throws SQLExceptio } Assert.assertEquals(timeSeriesArray.length, count); } + + @Test + public void testDifferentDeviceAlignment() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + // Should ignore the alignment difference + statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)"); + // Should use the existing alignment + statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64"); + statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from root.sg2.d"), + "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,", + Collections.singleton("-1,null,1.0,null,null,")); + } catch (SQLException ignored) { + fail(); + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java index 9aa0d0a132b7..7d550f40214c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.it.schema; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.itbase.category.ClusterIT; @@ -35,6 +36,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -308,6 +310,12 @@ public void testDifferentDeviceAlignment() { statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64"); // Should ignore the alignment difference statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)"); + // Should use the existing alignment + statement.execute("insert into root.sg2.d (time, s4) aligned values (-1, 1)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from root.sg2.d"), + "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,", + Collections.singleton("-1,null,1.0,null,null,")); } catch (SQLException ignored) { fail(); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index b3939048bd6c..deb486ae4665 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -93,7 +93,6 @@ public enum TSStatusCode { WRITE_PROCESS_REJECT(606), OUT_OF_TTL(607), COMPACTION_ERROR(608), - @Deprecated ALIGNED_TIMESERIES_ERROR(609), WAL_ERROR(610), DISK_SPACE_INSUFFICIENT(611), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index 9698c2a463a5..b392ac0f479e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -179,7 +179,7 @@ public TSStatus visitInternalCreateTimeSeries( final PartialPath devicePath = node.getDevicePath(); final MeasurementGroup measurementGroup = node.getMeasurementGroup(); - final List alreadyExistingTimeSeries = new ArrayList<>(); + final List existingTimeSeriesAndAlignmentMismatch = new ArrayList<>(); final List failingStatus = new ArrayList<>(); if (node.isAligned()) { @@ -187,7 +187,7 @@ public TSStatus visitInternalCreateTimeSeries( devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } else { @@ -195,7 +195,7 @@ public TSStatus visitInternalCreateTimeSeries( devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } @@ -204,8 +204,8 @@ public TSStatus visitInternalCreateTimeSeries( return RpcUtils.getStatus(failingStatus); } - if (!alreadyExistingTimeSeries.isEmpty()) { - return RpcUtils.getStatus(alreadyExistingTimeSeries); + if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) { + return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); @@ -217,7 +217,7 @@ public TSStatus visitInternalCreateMultiTimeSeries( PartialPath devicePath; MeasurementGroup measurementGroup; - final List alreadyExistingTimeSeries = new ArrayList<>(); + final List existingTimeSeriesAndAlignmentMismatch = new ArrayList<>(); final List failingStatus = new ArrayList<>(); for (final Map.Entry> deviceEntry : @@ -229,7 +229,7 @@ public TSStatus visitInternalCreateMultiTimeSeries( devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } else { @@ -237,7 +237,7 @@ public TSStatus visitInternalCreateMultiTimeSeries( devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } @@ -247,8 +247,8 @@ public TSStatus visitInternalCreateMultiTimeSeries( return RpcUtils.getStatus(failingStatus); } - if (!alreadyExistingTimeSeries.isEmpty()) { - return RpcUtils.getStatus(alreadyExistingTimeSeries); + if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) { + return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); @@ -258,11 +258,12 @@ private void executeInternalCreateTimeSeries( final PartialPath devicePath, final MeasurementGroup measurementGroup, final ISchemaRegion schemaRegion, - final List alreadyExistingTimeSeries, + final List existingTimeSeriesAndAlignmentMismatch, final List failingStatus, final boolean withMerge) { final int size = measurementGroup.getMeasurements().size(); // todo implement batch creation of one device in SchemaRegion + boolean alignedIsSet = false; for (int i = 0; i < size; i++) { try { final ICreateTimeSeriesPlan createTimeSeriesPlan = @@ -273,11 +274,17 @@ private void executeInternalCreateTimeSeries( // Thus the original ones are not altered ((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).setWithMerge(withMerge); schemaRegion.createTimeSeries(createTimeSeriesPlan, -1); + if (((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).getAligned().get() && !alignedIsSet) { + existingTimeSeriesAndAlignmentMismatch.add( + new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) + .setMessage(PartialPath.transformDataToString(devicePath))); + alignedIsSet = true; + } } catch (final MeasurementAlreadyExistException e) { // There's no need to internal create time series. - alreadyExistingTimeSeries.add( + existingTimeSeriesAndAlignmentMismatch.add( RpcUtils.getStatus( - e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath()))); + e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath()))); } catch (final MetadataException e) { logger.warn("{}: MetaData error: ", e.getMessage(), e); failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); @@ -289,7 +296,7 @@ private void executeInternalCreateAlignedTimeSeries( final PartialPath devicePath, final MeasurementGroup measurementGroup, final ISchemaRegion schemaRegion, - final List alreadyExistingTimeSeries, + final List existingTimeSeriesAndAlignmentMismatch, final List failingStatus, final boolean withMerge) { final List measurementList = measurementGroup.getMeasurements(); @@ -326,9 +333,9 @@ private void executeInternalCreateAlignedTimeSeries( // The existence check will be executed before truly creation // There's no need to internal create time series. final MeasurementPath measurementPath = e.getMeasurementPath(); - alreadyExistingTimeSeries.add( + existingTimeSeriesAndAlignmentMismatch.add( RpcUtils.getStatus( - e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath()))); + e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath()))); // remove the existing time series from plan final int index = measurementList.indexOf(measurementPath.getMeasurement()); @@ -379,6 +386,11 @@ private void executeInternalCreateAlignedTimeSeries( shouldRetry = false; } } + if (!((CreateAlignedTimeSeriesPlanImpl) createAlignedTimeSeriesPlan).getAligned().get()) { + existingTimeSeriesAndAlignmentMismatch.add( + new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) + .setMessage(PartialPath.transformDataToString(devicePath))); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 61472663f56c..5a63a4b4e3b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -584,7 +584,7 @@ private RegionExecutionResult executeInternalCreateTimeSeries( alreadyExistingStatus.add( RpcUtils.getStatus( metadataException.getErrorCode(), - MeasurementPath.transformDataToString( + PartialPath.transformDataToString( ((MeasurementAlreadyExistException) metadataException) .getMeasurementPath()))); } else { @@ -674,7 +674,7 @@ private RegionExecutionResult executeInternalCreateMultiTimeSeries( alreadyExistingStatus.add( RpcUtils.getStatus( metadataException.getErrorCode(), - MeasurementPath.transformDataToString( + PartialPath.transformDataToString( ((MeasurementAlreadyExistException) metadataException) .getMeasurementPath()))); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 2ab134b3e40d..5a346fdf8e09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -678,6 +678,9 @@ private void doAutoCreateAndVerify() } catch (AuthException | LoadAnalyzeTypeMismatchException e) { throw e; } catch (Exception e) { + if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && isConvertOnTypeMismatch) { + throw (LoadAnalyzeTypeMismatchException) e.getCause(); + } LOGGER.warn("Auto create or verify schema error.", e); throw new SemanticException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java index 34fa662d172c..e7fa45be64a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -190,7 +191,7 @@ void autoCreateTimeSeries( } if (!devicesNeedAutoCreateTimeSeries.isEmpty()) { - internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context); + internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context, false); } } @@ -437,7 +438,7 @@ void autoCreateMissingMeasurements( } if (!devicesNeedAutoCreateTimeSeries.isEmpty()) { - internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context); + internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context, true); } } @@ -475,11 +476,15 @@ private void internalCreateTimeSeries( List compressors, boolean isAligned, MPPQueryContext context) { + final Map> input = + Collections.singletonMap(devicePath, new Pair<>(isAligned, null)); List measurementPathList = - executeInternalCreateTimeseriesStatement( + executeInternalCreateTimeSeriesStatement( + input, new InternalCreateTimeSeriesStatement( devicePath, measurements, tsDataTypes, encodings, compressors, isAligned), - context); + context, + false); Set alreadyExistingMeasurementIndexSet = measurementPathList.stream() @@ -500,13 +505,16 @@ private void internalCreateTimeSeries( null, null, null, - isAligned); + input.get(devicePath).getLeft()); } } - // Auto create timeseries and return the existing timeseries info - private List executeInternalCreateTimeseriesStatement( - final Statement statement, final MPPQueryContext context) { + // Auto create timeSeries and return the existing timeSeries info + private List executeInternalCreateTimeSeriesStatement( + final Map> devicesNeedAutoCreateTimeSeries, + final Statement statement, + final MPPQueryContext context, + final boolean isLoad) { final TSStatus status = AuthorityChecker.checkAuthority(statement, context.getSession().getUserName()); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -530,7 +538,23 @@ private List executeInternalCreateTimeseriesStatement( for (final TSStatus subStatus : executionResult.status.subStatus) { if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { alreadyExistingMeasurements.add( - MeasurementPath.parseDataFromString(subStatus.getMessage())); + (MeasurementPath) PartialPath.parseDataFromString(subStatus.getMessage())); + } else if (subStatus.code == TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) { + final PartialPath devicePath = PartialPath.parseDataFromString(subStatus.getMessage()); + final Pair pair = + devicesNeedAutoCreateTimeSeries.get(devicePath); + if (!isLoad) { + pair.setLeft(!pair.getLeft()); + } else { + // Load does not tolerate the device alignment mismatch + throw new SemanticException( + new LoadAnalyzeTypeMismatchException( + String.format( + "TimeSeries under this device is%s aligned, please use create%sTimeSeries or change device. (Path: %s)", + !pair.getLeft() ? "" : " not", + !pair.getLeft() ? "Aligned" : "", + devicePath.getFullPath()))); + } } else { failedCreationSet.add(subStatus); } @@ -597,11 +621,15 @@ private void internalActivateTemplate( private void internalCreateTimeSeries( ClusterSchemaTree schemaTree, Map> devicesNeedAutoCreateTimeSeries, - MPPQueryContext context) { + MPPQueryContext context, + final boolean isLoad) { List measurementPathList = - executeInternalCreateTimeseriesStatement( - new InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries), context); + executeInternalCreateTimeSeriesStatement( + devicesNeedAutoCreateTimeSeries, + new InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries), + context, + isLoad); schemaTree.appendMeasurementPaths(measurementPathList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index a789bda92903..5449d3c48bc1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -586,7 +586,10 @@ public void createTimeSeries(final ICreateTimeSeriesPlan plan, long offset) plan instanceof CreateTimeSeriesPlanImpl && ((CreateTimeSeriesPlanImpl) plan).isWithMerge() || plan instanceof CreateTimeSeriesNode - && ((CreateTimeSeriesNode) plan).isGeneratedByPipe()); + && ((CreateTimeSeriesNode) plan).isGeneratedByPipe(), + plan instanceof CreateTimeSeriesPlanImpl + ? ((CreateTimeSeriesPlanImpl) plan).getAligned() + : null); // Should merge if (Objects.isNull(leafMNode)) { @@ -670,7 +673,10 @@ public void createAlignedTimeSeries(final ICreateAlignedTimeSeriesPlan plan) aliasList, (plan instanceof CreateAlignedTimeSeriesPlanImpl && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()), - existingMeasurementIndexes); + existingMeasurementIndexes, + (plan instanceof CreateAlignedTimeSeriesPlanImpl + ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned() + : null)); // update statistics and schemaDataTypeNumMap regionStatistics.addMeasurement(measurementMNodeList.size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 78cae77750b0..045d39400c76 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -634,7 +634,10 @@ public void createTimeSeries(final ICreateTimeSeriesPlan plan, long offset) plan.getProps(), plan.getAlias(), (plan instanceof CreateTimeSeriesPlanImpl - && ((CreateTimeSeriesPlanImpl) plan).isWithMerge())); + && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()), + plan instanceof CreateTimeSeriesPlanImpl + ? ((CreateTimeSeriesPlanImpl) plan).getAligned() + : null); try { // Should merge @@ -745,7 +748,10 @@ public void createAlignedTimeSeries(final ICreateAlignedTimeSeriesPlan plan) aliasList, (plan instanceof CreateAlignedTimeSeriesPlanImpl && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()), - existingMeasurementIndexes); + existingMeasurementIndexes, + (plan instanceof CreateAlignedTimeSeriesPlanImpl + ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned() + : null)); try { // Update statistics and schemaDataTypeNumMap diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index f8d2bd2316a5..fd281f192a57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -211,7 +211,8 @@ public IMeasurementMNode createTimeSeries( final CompressionType compressor, final Map props, final String alias, - final boolean withMerge) + final boolean withMerge, + final AtomicBoolean isAligned) throws MetadataException { final String[] nodeNames = path.getNodes(); if (nodeNames.length <= 2) { @@ -226,6 +227,12 @@ public IMeasurementMNode createTimeSeries( synchronized (this) { final IMemMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props); final String leafName = path.getMeasurement(); @@ -301,7 +308,8 @@ public List> createAlignedTimeSeries( final List compressors, final List aliasList, final boolean withMerge, - final Set existingMeasurementIndexes) + final Set existingMeasurementIndexes, + final AtomicBoolean isAligned) throws MetadataException { final List> measurementMNodeList = new ArrayList<>(); MetaFormatUtils.checkSchemaMeasurementNames(measurements); @@ -312,6 +320,12 @@ public List> createAlignedTimeSeries( synchronized (this) { final IMemMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + for (int i = 0; i < measurements.size(); i++) { if (device.hasChild(measurements.get(i))) { final IMemMNode node = device.getChild(measurements.get(i)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java index e00835cff03c..679bf600dfa0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java @@ -264,7 +264,8 @@ public IMeasurementMNode createTimeSeries( String alias) throws MetadataException { IMeasurementMNode measurementMNode = - createTimeSeriesWithPinnedReturn(path, dataType, encoding, compressor, props, alias, false); + createTimeSeriesWithPinnedReturn( + path, dataType, encoding, compressor, props, alias, false, null); unPinMNode(measurementMNode.getAsMNode()); return measurementMNode; } @@ -287,7 +288,8 @@ public IMeasurementMNode createTimeSeriesWithPinnedReturn( final CompressionType compressor, final Map props, final String alias, - final boolean withMerge) + final boolean withMerge, + final AtomicBoolean isAligned) throws MetadataException { final String[] nodeNames = path.getNodes(); if (nodeNames.length <= 2) { @@ -303,6 +305,12 @@ public IMeasurementMNode createTimeSeriesWithPinnedReturn( synchronized (this) { ICachedMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + try { MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props); @@ -385,7 +393,8 @@ public List> createAlignedTimeSeries( final List compressors, final List aliasList, final boolean withMerge, - final Set existingMeasurementIndexes) + final Set existingMeasurementIndexes, + final AtomicBoolean isAligned) throws MetadataException { final List> measurementMNodeList = new ArrayList<>(); MetaFormatUtils.checkSchemaMeasurementNames(measurements); @@ -397,6 +406,12 @@ public List> createAlignedTimeSeries( synchronized (this) { ICachedMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + try { for (int i = 0; i < measurements.size(); i++) { if (store.hasChild(device, measurements.get(i))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java index 4ffa8aca5b2f..8a2c4c12e8c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; public class CreateAlignedTimeSeriesPlanImpl implements ICreateAlignedTimeSeriesPlan { @@ -45,6 +46,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements ICreateAlignedTimeSeries private List> attributesList; private List tagOffsets = null; private transient boolean withMerge; + private final transient AtomicBoolean aligned = new AtomicBoolean(true); public CreateAlignedTimeSeriesPlanImpl() {} @@ -194,4 +196,12 @@ public void setWithMerge(final boolean withMerge) { tagOffsets = Objects.nonNull(tagOffsets) ? new ArrayList<>(tagOffsets) : null; } } + + public void setAligned(final boolean aligned) { + this.aligned.set(aligned); + } + + public AtomicBoolean getAligned() { + return aligned; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java index 1f5aa8e81eaa..b011dca84355 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan { @@ -41,6 +42,7 @@ public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan { private Map attributes = null; private long tagOffset = -1; private transient boolean withMerge; + private final transient AtomicBoolean aligned = new AtomicBoolean(false); public CreateTimeSeriesPlanImpl() {} @@ -170,4 +172,12 @@ public boolean isWithMerge() { public void setWithMerge(final boolean withMerge) { this.withMerge = withMerge; } + + public void setAligned(final boolean aligned) { + this.aligned.set(aligned); + } + + public AtomicBoolean getAligned() { + return aligned; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java index cf5f8ab57ba7..148a3c56bd4c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java @@ -43,8 +43,8 @@ public void testTransformDataToString() throws IllegalPathException { new MeasurementPath( new PartialPath("root.sg.d.s"), new MeasurementSchema("s", TSDataType.INT32), true); rawPath.setMeasurementAlias("alias"); - String string = MeasurementPath.transformDataToString(rawPath); - MeasurementPath newPath = MeasurementPath.parseDataFromString(string); + String string = PartialPath.transformDataToString(rawPath); + MeasurementPath newPath = (MeasurementPath) PartialPath.parseDataFromString(string); Assert.assertEquals(rawPath.getFullPath(), newPath.getFullPath()); Assert.assertEquals(rawPath.getMeasurementAlias(), newPath.getMeasurementAlias()); Assert.assertEquals(rawPath.getMeasurementSchema(), newPath.getMeasurementSchema()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java index 32443cfe9f7a..59d2d9d04404 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java @@ -33,12 +33,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.rmi.UnexpectedException; import java.util.HashMap; import java.util.Map; @@ -293,27 +290,4 @@ public static MeasurementPath deserialize(ByteBuffer byteBuffer) { public PartialPath transformToPartialPath() { return getDevicePath().concatNode(getTailNode()); } - - /** - * In specific scenarios, like internal create timeseries, the message can only be passed as - * String format. - */ - public static String transformDataToString(MeasurementPath measurementPath) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - try { - measurementPath.serialize(dataOutputStream); - } catch (IOException ignored) { - // this exception won't happen. - } - byte[] bytes = byteArrayOutputStream.toByteArray(); - // must use single-byte char sets - return new String(bytes, StandardCharsets.ISO_8859_1); - } - - public static MeasurementPath parseDataFromString(String measurementPathData) { - return (MeasurementPath) - PathDeserializeUtil.deserialize( - ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1))); - } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index fc04c94ba5b1..354724bd4874 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java @@ -36,9 +36,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -964,6 +967,29 @@ public PartialPath transformToPartialPath() { return this; } + /** + * In specific scenarios, like internal create timeseries, the message can only be passed as + * String format. + */ + public static String transformDataToString(PartialPath partialPath) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + try { + partialPath.serialize(dataOutputStream); + } catch (IOException ignored) { + // this exception won't happen. + } + byte[] bytes = byteArrayOutputStream.toByteArray(); + // must use single-byte char sets + return new String(bytes, StandardCharsets.ISO_8859_1); + } + + public static PartialPath parseDataFromString(String measurementPathData) { + return (PartialPath) + PathDeserializeUtil.deserialize( + ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1))); + } + /** Return true if the path ends with ** and no other nodes contain *. Otherwise, return false. */ public boolean isPrefixPath() { if (nodes.length <= 0) {