diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 1760301ac737..523b1a0aa25a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -149,8 +149,8 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) { } @Override - public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) { - setProperty("avg_series_point_number_threshold", String.valueOf(avgSeriesPointNumberThreshold)); + public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) { + setProperty("target_chunk_point_num", String.valueOf(targetChunkPointNum)); return this; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 969d4bb8d41c..6412c37b5ee4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -146,9 +146,9 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) { } @Override - public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) { - cnConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold); - dnConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold); + public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) { + cnConfig.setTargetChunkPointNum(targetChunkPointNum); + dnConfig.setTargetChunkPointNum(targetChunkPointNum); return this; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index cee8aadf05f9..10bfcb734e93 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -105,7 +105,7 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) { } @Override - public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) { + public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) { return this; } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index d806be8db1dc..0efc35738a06 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -56,7 +56,7 @@ public interface CommonConfig { CommonConfig setPrimitiveArraySize(int primitiveArraySize); - CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold); + CommonConfig setTargetChunkPointNum(int targetChunkPointNum); CommonConfig setMaxTsBlockLineNumber(int maxTsBlockLineNumber); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java index 209657e51618..047bb10bfd3c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBDuplicateTimeIT.java @@ -43,8 +43,6 @@ public class IoTDBDuplicateTimeIT { @Before public void setUp() throws Exception { - EnvFactory.getEnv().getConfig().getCommonConfig().setAvgSeriesPointNumberThreshold(2); - // Adjust memstable threshold size to make it flush automatically EnvFactory.getEnv().initClusterEnvironment(); } @@ -62,6 +60,7 @@ public void testDuplicateTime() throws SQLException { // version-1 tsfile statement.execute("insert into root.db.d1(time,s1) values (2,2)"); statement.execute("insert into root.db.d1(time,s1) values (3,3)"); + statement.execute("flush"); // version-2 unseq work memtable statement.execute("insert into root.db.d1(time,s1) values (2,20)"); @@ -69,9 +68,11 @@ public void testDuplicateTime() throws SQLException { // version-3 tsfile statement.execute("insert into root.db.d1(time,s1) values (5,5)"); statement.execute("insert into root.db.d1(time,s1) values (6,6)"); + statement.execute("flush root.db true"); // version-2 unseq work memtable -> unseq tsfile statement.execute("insert into root.db.d1(time,s1) values (5,50)"); + statement.execute("flush"); try (ResultSet set = statement.executeQuery("SELECT s1 FROM root.db.d1 where time = 5")) { int cnt = 0; diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java index f689ab0e7c6b..dc7355ee87c5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java @@ -308,8 +308,6 @@ public void testRecoverWALDeleteSchema() throws Exception { @Test public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception { IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold(); - config.setAvgSeriesPointNumberThreshold(2); long tsFileSize = config.getSeqTsFileSize(); long unFsFileSize = config.getSeqTsFileSize(); config.setSeqTsFileSize(10000000); @@ -320,6 +318,7 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception { statement.execute("create timeseries root.turbine1.d1.s1 with datatype=INT64"); statement.execute("insert into root.turbine1.d1(timestamp,s1) values(1,1)"); statement.execute("insert into root.turbine1.d1(timestamp,s1) values(2,1)"); + statement.execute("flush"); statement.execute("create timeseries root.turbine1.d1.s2 with datatype=BOOLEAN"); statement.execute("insert into root.turbine1.d1(timestamp,s2) values(3,true)"); statement.execute("insert into root.turbine1.d1(timestamp,s2) values(4,true)"); @@ -342,7 +341,6 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception { assertEquals(2, cnt); } - config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold); config.setSeqTsFileSize(tsFileSize); config.setUnSeqTsFileSize(unFsFileSize); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByUnseqIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByUnseqIT.java index 111217e56857..a15991c48768 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByUnseqIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByUnseqIT.java @@ -110,7 +110,7 @@ public void test2() { .getConfig() .getCommonConfig() .setMaxNumberOfPointsInPage(4) - .setAvgSeriesPointNumberThreshold(2); + .setTargetChunkPointNum(2); EnvFactory.getEnv().initClusterEnvironment(); String[] expectedHeader = new String[] {TIMESTAMP_STR, count("root.sg2.d1.s1")}; String[] retArray = new String[] {"5,1,", "10,1,", "15,2,", "20,0,", "25,1,"}; diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBRestartTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBRestartTableIT.java index 18feaf6eb198..98d1fc606862 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBRestartTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBRestartTableIT.java @@ -336,8 +336,6 @@ public void testRecoverWALDeleteSchema() throws Exception { @Test public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception { IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold(); - config.setAvgSeriesPointNumberThreshold(2); long tsFileSize = config.getSeqTsFileSize(); long unFsFileSize = config.getSeqTsFileSize(); config.setSeqTsFileSize(10000000); @@ -351,6 +349,7 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception { "create table turbine1 (id1 string id, s1 int64 measurement, s2 boolean measurement)"); statement.execute("insert into turbine1(time,id1,s1) values(1,\'d1\',1)"); statement.execute("insert into turbine1(time,id1,s1) values(2,\'d1\',2)"); + statement.execute("flush"); statement.execute("insert into turbine1(time,id1,s2) values(3,\'d1\',true)"); statement.execute("insert into turbine1(time,id1,s2) values(4,\'d1\',true)"); } @@ -373,7 +372,6 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception { assertEquals(2, cnt); } - config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold); config.setSeqTsFileSize(tsFileSize); config.setUnSeqTsFileSize(unFsFileSize); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index eb046f03989d..4b7623c22f6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -430,9 +430,6 @@ public class IoTDBConfig { /** The sort algorithm used in TVList */ private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM; - /** When average series point number reaches this, flush the memtable to disk */ - private int avgSeriesPointNumberThreshold = 100000; - /** Enable inner space compaction for sequence files */ private volatile boolean enableSeqSpaceCompaction = true; @@ -491,10 +488,10 @@ public class IoTDBConfig { /** The target tsfile size in compaction, 2 GB by default */ private long targetCompactionFileSize = 2147483648L; - /** The target chunk size in compaction. */ - private long targetChunkSize = 1048576L; + /** The target chunk size in compaction and flushing. */ + private long targetChunkSize = 1600000L; - /** The target chunk point num in compaction. */ + /** The target chunk point num in compaction and flushing. */ private long targetChunkPointNum = 100000L; /** @@ -2308,14 +2305,6 @@ public void setTvListSortAlgorithm(TVListSortAlgorithm tvListSortAlgorithm) { this.tvListSortAlgorithm = tvListSortAlgorithm; } - public int getAvgSeriesPointNumberThreshold() { - return avgSeriesPointNumberThreshold; - } - - public void setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) { - this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold; - } - public boolean isRpcThriftCompressionEnable() { return rpcThriftCompressionEnable; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 3376a4132395..675b79bdaf91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -398,12 +398,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO properties.getProperty( "tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString()))); - conf.setAvgSeriesPointNumberThreshold( - Integer.parseInt( - properties.getProperty( - "avg_series_point_number_threshold", - Integer.toString(conf.getAvgSeriesPointNumberThreshold())))); - conf.setCheckPeriodWhenInsertBlocked( Integer.parseInt( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index e7194126c1b6..6cab02a522d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -39,6 +39,7 @@ import org.apache.iotdb.metrics.utils.MetricType; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class WritingMetrics implements IMetricSet { @@ -363,7 +364,7 @@ private void unbindWALCostMetrics(AbstractMetricService metricService) { MAKE_CHECKPOINT, Tag.TYPE.toString(), type)); - Arrays.asList(SERIALIZE_WAL_ENTRY_TOTAL) + Collections.singletonList(SERIALIZE_WAL_ENTRY_TOTAL) .forEach( type -> metricService.remove( @@ -407,7 +408,6 @@ private void unbindWALCostMetrics(AbstractMetricService metricService) { public static final String WAL_FLUSH_MEMTABLE_COUNT = "wal_flush_memtable_count"; public static final String MANUAL_FLUSH_MEMTABLE_COUNT = "manual_flush_memtable_count"; public static final String MEM_CONTROL_FLUSH_MEMTABLE_COUNT = "mem_control_flush_memtable_count"; - public static final String SERIES_FULL_FLUSH_MEMTABLE = "series_full_flush_memtable"; private Gauge flushThreholdGauge = DoNothingMetricManager.DO_NOTHING_GAUGE; private Gauge rejectThreholdGauge = DoNothingMetricManager.DO_NOTHING_GAUGE; @@ -416,7 +416,6 @@ private void unbindWALCostMetrics(AbstractMetricService metricService) { private Counter walFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private Counter timedFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - private Counter seriesFullFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private Counter manualFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private Counter memControlFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; @@ -429,7 +428,6 @@ public void bindDataRegionMetrics() { createActiveTimePartitionCounterMetrics(); walFlushMemtableCounter = createWalFlushMemTableCounterMetrics(); timedFlushMemtableCounter = createTimedFlushMemTableCounterMetrics(); - seriesFullFlushMemtableCounter = createSeriesFullFlushMemTableCounterMetrics(); manualFlushMemtableCounter = createManualFlushMemTableCounterMetrics(); memControlFlushMemtableCounter = createMemControlFlushMemTableCounterMetrics(); @@ -462,7 +460,6 @@ public void unbindDataRegionMetrics() { removeActiveMemtableCounterMetrics(dataRegionId); }); removeActiveTimePartitionCounterMetrics(); - removeSeriesFullFlushMemTableCounterMetrics(); removeTimedFlushMemTableCounterMetrics(); removeWalFlushMemTableCounterMetrics(); removeManualFlushMemTableCounterMetrics(); @@ -575,15 +572,6 @@ public Counter createTimedFlushMemTableCounterMetrics() { TIMED_FLUSH_MEMTABLE_COUNT); } - public Counter createSeriesFullFlushMemTableCounterMetrics() { - return MetricService.getInstance() - .getOrCreateCounter( - Metric.FLUSH_MEMTABLE_COUNT.toString(), - MetricLevel.IMPORTANT, - Tag.TYPE.toString(), - SERIES_FULL_FLUSH_MEMTABLE); - } - public Counter createManualFlushMemTableCounterMetrics() { return MetricService.getInstance() .getOrCreateCounter( @@ -616,15 +604,6 @@ public void createActiveTimePartitionCounterMetrics() { .getOrCreateCounter(Metric.ACTIVE_TIME_PARTITION_COUNT.toString(), MetricLevel.IMPORTANT); } - public void removeSeriesFullFlushMemTableCounterMetrics() { - MetricService.getInstance() - .remove( - MetricType.COUNTER, - Metric.FLUSH_MEMTABLE_COUNT.toString(), - Tag.TYPE.toString(), - SERIES_FULL_FLUSH_MEMTABLE); - } - public void removeTimedFlushMemTableCounterMetrics() { MetricService.getInstance() .remove( @@ -929,10 +908,6 @@ public void recordWalFlushMemTableCount(int number) { walFlushMemtableCounter.inc(number); } - public void recordSeriesFullFlushMemTableCount(int number) { - seriesFullFlushMemtableCounter.inc(number); - } - public void recordManualFlushMemTableCount(int number) { manualFlushMemtableCounter.inc(number); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 16654ea8b165..30f4036d70b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3030,86 +3030,6 @@ private long computeMaxVersion(Long oldVersion, Long newVersion) { return Math.max(oldVersion, newVersion); } - /** - * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to - * reduce unnecessary merge. Only used when the file sender and the receiver share the same file - * close policy. Warning: DO NOT REMOVE - */ - @SuppressWarnings("unused") - public void removeFullyOverlapFiles(TsFileResource resource) { - writeLock("removeFullyOverlapFiles"); - try { - Iterator iterator = tsFileManager.getIterator(true); - removeFullyOverlapFiles(resource, iterator, true); - - iterator = tsFileManager.getIterator(false); - removeFullyOverlapFiles(resource, iterator, false); - } finally { - writeUnlock(); - } - } - - private void removeFullyOverlapFiles( - TsFileResource newTsFile, Iterator iterator, boolean isSeq) { - while (iterator.hasNext()) { - TsFileResource existingTsFile = iterator.next(); - if (newTsFile.isPlanRangeCovers(existingTsFile) - && !newTsFile.getTsFile().equals(existingTsFile.getTsFile()) - && existingTsFile.tryWriteLock()) { - logger.info( - "{} is covered by {}: [{}, {}], [{}, {}], remove it", - existingTsFile, - newTsFile, - existingTsFile.minPlanIndex, - existingTsFile.maxPlanIndex, - newTsFile.minPlanIndex, - newTsFile.maxPlanIndex); - // if we fail to lock the file, it means it is being queried or merged and we will not - // wait until it is free, we will just leave it to the next merge - try { - removeFullyOverlapFile(existingTsFile, iterator, isSeq); - } catch (Exception e) { - logger.error( - "Something gets wrong while removing FullyOverlapFiles: {}", - existingTsFile.getTsFile().getAbsolutePath(), - e); - } finally { - existingTsFile.writeUnlock(); - } - } - } - } - - /** - * Remove the given {@link TsFileResource}. If the corresponding {@link TsFileProcessor} is in the - * working status, close it before remove the related resource files. maybe time-consuming for - * closing a tsfile. - */ - private void removeFullyOverlapFile( - TsFileResource tsFileResource, Iterator iterator, boolean isSeq) { - logger.info( - "Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed()); - if (!tsFileResource.isClosed()) { - try { - // also remove the TsFileProcessor if the overlapped file is not closed - long timePartition = tsFileResource.getTimePartition(); - Map fileProcessorMap = - isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors; - TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition); - if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) { - // have to take some time to close the tsFileProcessor - tsFileProcessor.syncClose(); - fileProcessorMap.remove(timePartition); - } - } catch (Exception e) { - logger.error("Cannot close {}", tsFileResource, e); - } - } - tsFileManager.remove(tsFileResource, isSeq); - iterator.remove(); - tsFileResource.remove(); - } - private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) { long version = partitionMaxFileVersions.compute( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 59633610b711..3ff757182595 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -67,8 +68,8 @@ public class MemTableFlushTask { private final Future ioTaskFuture; private RestorableTsFileIOWriter writer; - private final LinkedBlockingQueue encodingTaskQueue = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue ioTaskQueue = + private final BlockingQueue encodingTaskQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue ioTaskQueue = (SystemInfo.getInstance().isEncodingFasterThanIo()) ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()) : new LinkedBlockingQueue<>(); @@ -247,16 +248,7 @@ public void run() { } else { long starTime = System.currentTimeMillis(); IWritableMemChunk writableMemChunk = (IWritableMemChunk) task; - IChunkWriter seriesWriter = writableMemChunk.createIChunkWriter(); - writableMemChunk.encode(seriesWriter); - seriesWriter.sealCurrentPage(); - seriesWriter.clearPageWriter(); - try { - ioTaskQueue.put(seriesWriter); - } catch (InterruptedException e) { - LOGGER.error("Put task into ioTaskQueue Interrupted"); - Thread.currentThread().interrupt(); - } + writableMemChunk.encode(ioTaskQueue); long subTaskTime = System.currentTimeMillis() - starTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 97db8205ce83..d4f2b2ee4ab1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -21,7 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; @@ -30,7 +29,6 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; @@ -96,10 +94,7 @@ public abstract class AbstractMemTable implements IMemTable { private static final DeviceIDFactory deviceIDFactory = DeviceIDFactory.getInstance(); private boolean shouldFlush = false; - private boolean reachChunkSizeOrPointNumThreshold = false; private volatile FlushStatus flushStatus = FlushStatus.WORKING; - private final int avgSeriesPointNumThreshold = - IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); /** Memory size of data points, including TEXT values. */ private long memSize = 0; @@ -178,7 +173,6 @@ private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet( for (IMeasurementSchema schema : schemaList) { if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) { seriesNumber++; - totalPointsNumThreshold += avgSeriesPointNumThreshold; } } return memChunkGroup; @@ -191,14 +185,12 @@ private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet( deviceId, k -> { seriesNumber += schemaList.size(); - totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) * schemaList.size(); return new AlignedWritableMemChunkGroup( schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList())); }); for (IMeasurementSchema schema : schemaList) { if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) { seriesNumber++; - totalPointsNumThreshold += avgSeriesPointNumThreshold; } } return memChunkGroup; @@ -423,9 +415,7 @@ public void write( Object[] objectValue) { IWritableMemChunkGroup memChunkGroup = createMemChunkGroupIfNotExistAndGet(deviceId, schemaList); - if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) { - reachChunkSizeOrPointNumThreshold = true; - } + memChunkGroup.writeRow(insertTime, objectValue, schemaList); } @Override @@ -436,9 +426,7 @@ public void writeAlignedRow( Object[] objectValue) { IWritableMemChunkGroup memChunkGroup = createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList); - if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) { - reachChunkSizeOrPointNumThreshold = true; - } + memChunkGroup.writeRow(insertTime, objectValue, schemaList); } public void writeTabletNode(InsertTabletNode insertTabletNode, int start, int end) { @@ -452,16 +440,14 @@ public void writeTabletNode(InsertTabletNode insertTabletNode, int start, int en } IWritableMemChunkGroup memChunkGroup = createMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList); - if (memChunkGroup.writeValuesWithFlushCheck( + memChunkGroup.writeTablet( insertTabletNode.getTimes(), insertTabletNode.getColumns(), insertTabletNode.getBitMaps(), schemaList, start, end, - null)) { - reachChunkSizeOrPointNumThreshold = true; - } + null); } public void writeAlignedTablet( @@ -488,16 +474,14 @@ public void writeAlignedTablet( int splitEnd = pair.right; IWritableMemChunkGroup memChunkGroup = createAlignedMemChunkGroupIfNotExistAndGet(deviceID, schemaList); - if (memChunkGroup.writeValuesWithFlushCheck( + memChunkGroup.writeTablet( insertTabletNode.getTimes(), insertTabletNode.getColumns(), insertTabletNode.getBitMaps(), schemaList, splitStart, splitEnd, - results)) { - reachChunkSizeOrPointNumThreshold = true; - } + results); splitStart = splitEnd; } } @@ -544,11 +528,6 @@ public long memSize() { return memSize; } - @Override - public boolean reachChunkSizeOrPointNumThreshold() { - return reachChunkSizeOrPointNumThreshold; - } - @Override public void clear() { memTableMap.clear(); @@ -634,8 +613,7 @@ public void queryForDeviceRegionScan( long ttlLowerBound, Map> chunkMetadataMap, Map> memChunkHandleMap, - List> modsToMemTabled) - throws MetadataException { + List> modsToMemTabled) { Map memTableMap = getMemTableMap(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index c083b541f276..39c1ecdab231 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; @@ -46,6 +48,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.BlockingQueue; public class AlignedWritableMemChunk implements IWritableMemChunk { @@ -54,7 +57,10 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { private AlignedTVList list; private boolean ignoreAllNullRows; - private static final int MAX_NUMBER_OF_POINTS_IN_PAGE = + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize(); + private long MAX_NUMBER_OF_POINTS_IN_CHUNK = CONFIG.getTargetChunkPointNum(); + private final int MAX_NUMBER_OF_POINTS_IN_PAGE = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; @@ -88,101 +94,97 @@ public boolean containsMeasurement(String measurementId) { } @Override - public boolean putLongWithFlushCheck(long t, long v) { + public void putLong(long t, long v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putIntWithFlushCheck(long t, int v) { + public void putInt(long t, int v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putFloatWithFlushCheck(long t, float v) { + public void putFloat(long t, float v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putDoubleWithFlushCheck(long t, double v) { + public void putDouble(long t, double v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putBinaryWithFlushCheck(long t, Binary v) { + public void putBinary(long t, Binary v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putBooleanWithFlushCheck(long t, boolean v) { + public void putBoolean(long t, boolean v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putAlignedValueWithFlushCheck(long t, Object[] v) { + public void putAlignedRow(long t, Object[] v) { list.putAlignedValue(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, int end) { + public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, int end) { + public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int start, int end) { + public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int start, int end) { + public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putBinariesWithFlushCheck( - long[] t, Binary[] v, BitMap bitMap, int start, int end) { + public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putBooleansWithFlushCheck( - long[] t, boolean[] v, BitMap bitMap, int start, int end) { + public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean putAlignedValuesWithFlushCheck( + public void putAlignedTablet( long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] results) { list.putAlignedValues(t, v, bitMaps, start, end, results); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean writeWithFlushCheck(long insertTime, Object objectValue) { + public void writeNonAlignedPoint(long insertTime, Object objectValue) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean writeWithFlushCheck( + public void writeNonAlignedTablet( long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public boolean writeAlignedValueWithFlushCheck( + public void writeAlignedPoints( long insertTime, Object[] objectValue, List schemaList) { Object[] reorderedValue = checkAndReorderColumnValuesInInsertPlan(schemaList, objectValue, null).left; - return putAlignedValueWithFlushCheck(insertTime, reorderedValue); + putAlignedRow(insertTime, reorderedValue); } @Override - public boolean writeAlignedValuesWithFlushCheck( + public void writeAlignedTablet( long[] times, Object[] valueList, BitMap[] bitMaps, @@ -194,8 +196,7 @@ public boolean writeAlignedValuesWithFlushCheck( checkAndReorderColumnValuesInInsertPlan(schemaList, valueList, bitMaps); Object[] reorderedColumnValues = pair.left; BitMap[] reorderedBitMaps = pair.right; - return putAlignedValuesWithFlushCheck( - times, reorderedColumnValues, reorderedBitMaps, start, end, results); + putAlignedTablet(times, reorderedColumnValues, reorderedBitMaps, start, end, results); } /** @@ -329,22 +330,41 @@ public IChunkWriter createIChunkWriter() { @SuppressWarnings({"squid:S6541", "squid:S3776"}) @Override - public void encode(IChunkWriter chunkWriter) { - AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; - + public void encode(BlockingQueue ioTaskQueue) { BitMap rowBitMap = ignoreAllNullRows ? list.getRowBitMap() : null; + int avgPointSizeOfLargestColumn = list.getAvgPointSizeOfLargestColumn(); + MAX_NUMBER_OF_POINTS_IN_CHUNK = + Math.min(MAX_NUMBER_OF_POINTS_IN_CHUNK, (TARGET_CHUNK_SIZE / avgPointSizeOfLargestColumn)); + boolean[] timeDuplicateInfo = null; + + List> chunkRange = new ArrayList<>(); + // Eg. chunkRange: ((0,9,10,12),(13,15)) means this TVList contains 2 chunks, + // 1st chunk contains 2 pages, 2nd chunk contains 1 page. List pageRange = new ArrayList<>(); - int range = 0; + + int pointNumInPage = 0; + int pointNumInChunk = 0; + for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { long time = list.getTime(sortedRowIndex); - if (range == 0) { + if (pointNumInPage == 0) { pageRange.add(sortedRowIndex); } - range++; - if (range == MAX_NUMBER_OF_POINTS_IN_PAGE) { + pointNumInPage++; + pointNumInChunk++; + if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE) { pageRange.add(sortedRowIndex); - range = 0; + pointNumInPage = 0; + } + if (pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK) { + if (pointNumInPage != 0) { + pageRange.add(sortedRowIndex); + pointNumInPage = 0; + } + chunkRange.add(pageRange); + pageRange = new ArrayList<>(); + pointNumInChunk = 0; } int nextRowIndex = sortedRowIndex + 1; @@ -362,21 +382,121 @@ public void encode(IChunkWriter chunkWriter) { sortedRowIndex = nextRowIndex - 1; } - if (range != 0) { + if (pointNumInPage != 0) { pageRange.add(list.rowCount() - 1); } + if (pointNumInChunk != 0) { + chunkRange.add(pageRange); + } + handleEncoding(ioTaskQueue, chunkRange, timeDuplicateInfo, rowBitMap); + } + + private void handleEncoding( + BlockingQueue ioTaskQueue, + List> chunkRange, + boolean[] timeDuplicateInfo, + BitMap rowBitMap) { List dataTypes = list.getTsDataTypes(); Pair[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; + for (List pageRange : chunkRange) { + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList); + for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) { + for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) { + // Pair of Time and Index + if (Objects.nonNull(timeDuplicateInfo) + && lastValidPointIndexForTimeDupCheck[columnIndex] == null) { + lastValidPointIndexForTimeDupCheck[columnIndex] = new Pair<>(Long.MIN_VALUE, null); + } + TSDataType tsDataType = dataTypes.get(columnIndex); + for (int sortedRowIndex = pageRange.get(pageNum * 2); + sortedRowIndex <= pageRange.get(pageNum * 2 + 1); + sortedRowIndex++) { + // skip empty row + if (rowBitMap != null && rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) { + continue; + } + // skip time duplicated rows + long time = list.getTime(sortedRowIndex); + if (Objects.nonNull(timeDuplicateInfo)) { + if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) { + lastValidPointIndexForTimeDupCheck[columnIndex].left = time; + lastValidPointIndexForTimeDupCheck[columnIndex].right = + list.getValueIndex(sortedRowIndex); + } + if (timeDuplicateInfo[sortedRowIndex]) { + continue; + } + } + + // The part of code solves the following problem: + // Time: 1,2,2,3 + // Value: 1,2,null,null + // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1) + // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value + // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2==pair.left:2, write(T:2,V:2) + // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2, + // write(T:3,V:null) + + int originRowIndex; + if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex]) + && (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) { + originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right; + } else { + originRowIndex = list.getValueIndex(sortedRowIndex); + } - for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) { - for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) { - // Pair of Time and Index - if (Objects.nonNull(timeDuplicateInfo) - && lastValidPointIndexForTimeDupCheck[columnIndex] == null) { - lastValidPointIndexForTimeDupCheck[columnIndex] = new Pair<>(Long.MIN_VALUE, null); + boolean isNull = list.isNullValue(originRowIndex, columnIndex); + switch (tsDataType) { + case BOOLEAN: + alignedChunkWriter.writeByColumn( + time, + !isNull && list.getBooleanByValueIndex(originRowIndex, columnIndex), + isNull); + break; + case INT32: + case DATE: + alignedChunkWriter.writeByColumn( + time, + isNull ? 0 : list.getIntByValueIndex(originRowIndex, columnIndex), + isNull); + break; + case INT64: + case TIMESTAMP: + alignedChunkWriter.writeByColumn( + time, + isNull ? 0 : list.getLongByValueIndex(originRowIndex, columnIndex), + isNull); + break; + case FLOAT: + alignedChunkWriter.writeByColumn( + time, + isNull ? 0 : list.getFloatByValueIndex(originRowIndex, columnIndex), + isNull); + break; + case DOUBLE: + alignedChunkWriter.writeByColumn( + time, + isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, columnIndex), + isNull); + break; + case TEXT: + case STRING: + case BLOB: + alignedChunkWriter.writeByColumn( + time, + isNull ? null : list.getBinaryByValueIndex(originRowIndex, columnIndex), + isNull); + break; + default: + break; + } + } + alignedChunkWriter.nextColumn(); } - TSDataType tsDataType = dataTypes.get(columnIndex); + + long[] times = new long[Math.min(MAX_NUMBER_OF_POINTS_IN_PAGE, list.rowCount())]; + int pointsInPage = 0; for (int sortedRowIndex = pageRange.get(pageNum * 2); sortedRowIndex <= pageRange.get(pageNum * 2 + 1); sortedRowIndex++) { @@ -384,88 +504,19 @@ public void encode(IChunkWriter chunkWriter) { if (rowBitMap != null && rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) { continue; } - // skip time duplicated rows - long time = list.getTime(sortedRowIndex); - if (Objects.nonNull(timeDuplicateInfo)) { - if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) { - lastValidPointIndexForTimeDupCheck[columnIndex].left = time; - lastValidPointIndexForTimeDupCheck[columnIndex].right = - list.getValueIndex(sortedRowIndex); - } - if (timeDuplicateInfo[sortedRowIndex]) { - continue; - } - } - - // The part of code solves the following problem: - // Time: 1,2,2,3 - // Value: 1,2,null,null - // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1) - // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value - // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2!=air.left:2, write(T:2,V:2) - // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2, - // write(T:3,V:null) - - int originRowIndex; - if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex]) - && (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) { - originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right; - } else { - originRowIndex = list.getValueIndex(sortedRowIndex); - } - - boolean isNull = list.isNullValue(originRowIndex, columnIndex); - switch (tsDataType) { - case BOOLEAN: - alignedChunkWriter.writeByColumn( - time, list.getBooleanByValueIndex(originRowIndex, columnIndex), isNull); - break; - case INT32: - case DATE: - alignedChunkWriter.writeByColumn( - time, list.getIntByValueIndex(originRowIndex, columnIndex), isNull); - break; - case INT64: - case TIMESTAMP: - alignedChunkWriter.writeByColumn( - time, list.getLongByValueIndex(originRowIndex, columnIndex), isNull); - break; - case FLOAT: - alignedChunkWriter.writeByColumn( - time, list.getFloatByValueIndex(originRowIndex, columnIndex), isNull); - break; - case DOUBLE: - alignedChunkWriter.writeByColumn( - time, list.getDoubleByValueIndex(originRowIndex, columnIndex), isNull); - break; - case TEXT: - case BLOB: - case STRING: - alignedChunkWriter.writeByColumn( - time, list.getBinaryByValueIndex(originRowIndex, columnIndex), isNull); - break; - default: - break; + if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) { + times[pointsInPage++] = list.getTime(sortedRowIndex); } } - alignedChunkWriter.nextColumn(); + alignedChunkWriter.write(times, pointsInPage, 0); } - - long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE]; - int pointsInPage = 0; - for (int sortedRowIndex = pageRange.get(pageNum * 2); - sortedRowIndex <= pageRange.get(pageNum * 2 + 1); - sortedRowIndex++) { - // skip empty row - if (rowBitMap != null && rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) { - continue; - } - if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) { - times[pointsInPage++] = list.getTime(sortedRowIndex); - } + alignedChunkWriter.sealCurrentPage(); + alignedChunkWriter.clearPageWriter(); + try { + ioTaskQueue.put(alignedChunkWriter); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - - alignedChunkWriter.write(times, pointsInPage, 0); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java index 6eae996d4c58..8a8421da80c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java @@ -50,7 +50,7 @@ private AlignedWritableMemChunkGroup() { } @Override - public boolean writeValuesWithFlushCheck( + public void writeTablet( long[] times, Object[] columns, BitMap[] bitMaps, @@ -58,8 +58,7 @@ public boolean writeValuesWithFlushCheck( int start, int end, TSStatus[] results) { - return memChunk.writeAlignedValuesWithFlushCheck( - times, columns, bitMaps, schemaList, start, end, results); + memChunk.writeAlignedTablet(times, columns, bitMaps, schemaList, start, end, results); } @Override @@ -86,9 +85,8 @@ public boolean contains(String measurement) { } @Override - public boolean writeWithFlushCheck( - long insertTime, Object[] objectValue, List schemaList) { - return memChunk.writeAlignedValueWithFlushCheck(insertTime, objectValue, schemaList); + public void writeRow(long insertTime, Object[] objectValue, List schemaList) { + memChunk.writeAlignedPoints(insertTime, objectValue, schemaList); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 958f9fdad9ac..9eb67b32d6ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -84,8 +84,6 @@ void writeAlignedRow( /** only used when mem control enabled */ long getTVListsRamCost(); - boolean reachChunkSizeOrPointNumThreshold(); - int getSeriesNumber(); long getTotalPointsNum(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 38ba0b017239..52f7557786eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -29,51 +29,52 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.List; +import java.util.concurrent.BlockingQueue; public interface IWritableMemChunk extends WALEntryValue { - boolean putLongWithFlushCheck(long t, long v); + void putLong(long t, long v); - boolean putIntWithFlushCheck(long t, int v); + void putInt(long t, int v); - boolean putFloatWithFlushCheck(long t, float v); + void putFloat(long t, float v); - boolean putDoubleWithFlushCheck(long t, double v); + void putDouble(long t, double v); - boolean putBinaryWithFlushCheck(long t, Binary v); + void putBinary(long t, Binary v); - boolean putBooleanWithFlushCheck(long t, boolean v); + void putBoolean(long t, boolean v); - boolean putAlignedValueWithFlushCheck(long t, Object[] v); + void putAlignedRow(long t, Object[] v); - boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, int end); + void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end); - boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, int end); + void putInts(long[] t, int[] v, BitMap bitMap, int start, int end); - boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int start, int end); + void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end); - boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int start, int end); + void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end); - boolean putBinariesWithFlushCheck(long[] t, Binary[] v, BitMap bitMap, int start, int end); + void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end); - boolean putBooleansWithFlushCheck(long[] t, boolean[] v, BitMap bitMap, int start, int end); + void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end); - boolean putAlignedValuesWithFlushCheck( + void putAlignedTablet( long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] results); - boolean writeWithFlushCheck(long insertTime, Object objectValue); + void writeNonAlignedPoint(long insertTime, Object objectValue); - boolean writeAlignedValueWithFlushCheck( + void writeAlignedPoints( long insertTime, Object[] objectValue, List schemaList); /** * write data in the range [start, end). Null value in the valueList will be replaced by the * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5} */ - boolean writeWithFlushCheck( + void writeNonAlignedTablet( long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end); - boolean writeAlignedValuesWithFlushCheck( + void writeAlignedTablet( long[] times, Object[] valueList, BitMap[] bitMaps, @@ -143,7 +144,7 @@ default long getMaxTime() { IChunkWriter createIChunkWriter(); - void encode(IChunkWriter chunkWriter); + void encode(BlockingQueue ioTaskQueue); void release(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java index 883779c23443..b72ad25cf15e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java @@ -31,7 +31,9 @@ public interface IWritableMemChunkGroup extends WALEntryValue { - boolean writeValuesWithFlushCheck( + void writeRow(long insertTime, Object[] objectValue, List schemaList); + + void writeTablet( long[] times, Object[] columns, BitMap[] bitMaps, @@ -46,9 +48,6 @@ boolean writeValuesWithFlushCheck( boolean contains(String measurement); - boolean writeWithFlushCheck( - long insertTime, Object[] objectValue, List schemaList); - Map getMemChunkMap(); int delete( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 56b2efc4c4c5..58fc222d717f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -106,6 +106,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; @@ -1200,10 +1201,6 @@ public boolean shouldFlush() { WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1); return true; } - if (workMemTable.reachChunkSizeOrPointNumThreshold()) { - WritingMetrics.getInstance().recordSeriesFullFlushMemTableCount(1); - return true; - } return false; } @@ -1213,38 +1210,25 @@ public boolean shouldClose() { return fileSize >= fileSizeThreshold; } - public void syncClose() { + @TestOnly + public void syncClose() throws ExecutionException { logger.info( "Sync close file: {}, will firstly async close it", tsFileResource.getTsFile().getAbsolutePath()); if (shouldClose) { return; } - synchronized (flushingMemTables) { - try { - asyncClose(); - logger.info("Start to wait until file {} is closed", tsFileResource); - long startTime = System.currentTimeMillis(); - while (!flushingMemTables.isEmpty()) { - flushingMemTables.wait(60_000); - if (System.currentTimeMillis() - startTime > 60_000 && !flushingMemTables.isEmpty()) { - logger.warn( - "{} has spent {}s for waiting flushing one memtable; {} left (first: {}). FlushingManager info: {}", - this.tsFileResource.getTsFile().getAbsolutePath(), - (System.currentTimeMillis() - startTime) / 1000, - flushingMemTables.size(), - flushingMemTables.getFirst(), - FlushManager.getInstance()); - } - } - } catch (InterruptedException e) { - logger.error( - "{}: {} wait close interrupted", - dataRegionName, - tsFileResource.getTsFile().getName(), - e); - Thread.currentThread().interrupt(); + try { + asyncClose().get(); + logger.info("Start to wait until file {} is closed", tsFileResource); + // if this TsFileProcessor is closing, asyncClose().get() of this thread will return quickly, + // but the TsFileProcessor may be not closed. Therefore, we need to check whether the writer + // is null. + while (writer != null) { + TimeUnit.MILLISECONDS.sleep(10); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } logger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath()); } @@ -1327,6 +1311,7 @@ public Future asyncClose() { * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup Tips: I am * trying to solve this issue by checking whether the table exist before wait() */ + @TestOnly public void syncFlush() throws IOException { IMemTable tmpMemTable; flushQueryLock.writeLock().lock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 97c95e7f13ad..19249b668512 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -29,7 +29,6 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.chunk.ChunkWriterImpl; -import org.apache.tsfile.write.chunk.IChunkWriter; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; @@ -39,15 +38,21 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.BlockingQueue; + +import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; public class WritableMemChunk implements IWritableMemChunk { private IMeasurementSchema schema; private TVList list; private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; + private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunk.class); private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize(); + private final long MAX_NUMBER_OF_POINTS_IN_CHUNK = CONFIG.getTargetChunkPointNum(); public WritableMemChunk(IMeasurementSchema schema) { this.schema = schema; @@ -57,68 +62,80 @@ public WritableMemChunk(IMeasurementSchema schema) { private WritableMemChunk() {} @Override - public boolean writeWithFlushCheck(long insertTime, Object objectValue) { + public void writeNonAlignedPoint(long insertTime, Object objectValue) { switch (schema.getType()) { case BOOLEAN: - return putBooleanWithFlushCheck(insertTime, (boolean) objectValue); + putBoolean(insertTime, (boolean) objectValue); + break; case INT32: case DATE: - return putIntWithFlushCheck(insertTime, (int) objectValue); + putInt(insertTime, (int) objectValue); + break; case INT64: case TIMESTAMP: - return putLongWithFlushCheck(insertTime, (long) objectValue); + putLong(insertTime, (long) objectValue); + break; case FLOAT: - return putFloatWithFlushCheck(insertTime, (float) objectValue); + putFloat(insertTime, (float) objectValue); + break; case DOUBLE: - return putDoubleWithFlushCheck(insertTime, (double) objectValue); + putDouble(insertTime, (double) objectValue); + break; case TEXT: case BLOB: case STRING: - return putBinaryWithFlushCheck(insertTime, (Binary) objectValue); + putBinary(insertTime, (Binary) objectValue); + break; default: throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType().name()); } } @Override - public boolean writeAlignedValueWithFlushCheck( + public void writeAlignedPoints( long insertTime, Object[] objectValue, List schemaList) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); } @Override - public boolean writeWithFlushCheck( + public void writeNonAlignedTablet( long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) { switch (dataType) { case BOOLEAN: boolean[] boolValues = (boolean[]) valueList; - return putBooleansWithFlushCheck(times, boolValues, bitMap, start, end); + putBooleans(times, boolValues, bitMap, start, end); + break; case INT32: case DATE: int[] intValues = (int[]) valueList; - return putIntsWithFlushCheck(times, intValues, bitMap, start, end); + putInts(times, intValues, bitMap, start, end); + break; case INT64: case TIMESTAMP: long[] longValues = (long[]) valueList; - return putLongsWithFlushCheck(times, longValues, bitMap, start, end); + putLongs(times, longValues, bitMap, start, end); + break; case FLOAT: float[] floatValues = (float[]) valueList; - return putFloatsWithFlushCheck(times, floatValues, bitMap, start, end); + putFloats(times, floatValues, bitMap, start, end); + break; case DOUBLE: double[] doubleValues = (double[]) valueList; - return putDoublesWithFlushCheck(times, doubleValues, bitMap, start, end); + putDoubles(times, doubleValues, bitMap, start, end); + break; case TEXT: case BLOB: case STRING: Binary[] binaryValues = (Binary[]) valueList; - return putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end); + putBinaries(times, binaryValues, bitMap, start, end); + break; default: throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType.name()); } } @Override - public boolean writeAlignedValuesWithFlushCheck( + public void writeAlignedTablet( long[] times, Object[] valueList, BitMap[] bitMaps, @@ -130,86 +147,72 @@ public boolean writeAlignedValuesWithFlushCheck( } @Override - public boolean putLongWithFlushCheck(long t, long v) { + public void putLong(long t, long v) { list.putLong(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putIntWithFlushCheck(long t, int v) { + public void putInt(long t, int v) { list.putInt(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putFloatWithFlushCheck(long t, float v) { + public void putFloat(long t, float v) { list.putFloat(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putDoubleWithFlushCheck(long t, double v) { + public void putDouble(long t, double v) { list.putDouble(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putBinaryWithFlushCheck(long t, Binary v) { + public void putBinary(long t, Binary v) { list.putBinary(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putBooleanWithFlushCheck(long t, boolean v) { + public void putBoolean(long t, boolean v) { list.putBoolean(t, v); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putAlignedValueWithFlushCheck(long t, Object[] v) { + public void putAlignedRow(long t, Object[] v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } @Override - public boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, int end) { + public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) { list.putLongs(t, v, bitMap, start, end); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, int end) { + public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) { list.putInts(t, v, bitMap, start, end); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int start, int end) { + public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) { list.putFloats(t, v, bitMap, start, end); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int start, int end) { + public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) { list.putDoubles(t, v, bitMap, start, end); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putBinariesWithFlushCheck( - long[] t, Binary[] v, BitMap bitMap, int start, int end) { + public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) { list.putBinaries(t, v, bitMap, start, end); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putBooleansWithFlushCheck( - long[] t, boolean[] v, BitMap bitMap, int start, int end) { + public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) { list.putBooleans(t, v, bitMap, start, end); - return list.reachChunkSizeOrPointNumThreshold(); } @Override - public boolean putAlignedValuesWithFlushCheck( + public void putAlignedTablet( long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] results) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } @@ -293,7 +296,7 @@ public int delete(long lowerBound, long upperBound) { } @Override - public IChunkWriter createIChunkWriter() { + public ChunkWriterImpl createIChunkWriter() { return new ChunkWriterImpl(schema); } @@ -330,15 +333,15 @@ public String toString() { } @Override - public void encode(IChunkWriter chunkWriter) { - - ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; + public void encode(BlockingQueue ioTaskQueue) { + TSDataType tsDataType = schema.getType(); + ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); + long dataSizeInCurrentChunk = 0; + int pointNumInCurrentChunk = 0; for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { long time = list.getTime(sortedRowIndex); - TSDataType tsDataType = schema.getType(); - // skip duplicated data if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) { continue; @@ -352,30 +355,60 @@ public void encode(IChunkWriter chunkWriter) { switch (tsDataType) { case BOOLEAN: chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex)); + dataSizeInCurrentChunk += 8L + 1L; break; case INT32: case DATE: chunkWriterImpl.write(time, list.getInt(sortedRowIndex)); + dataSizeInCurrentChunk += 8L + 4L; break; case INT64: case TIMESTAMP: chunkWriterImpl.write(time, list.getLong(sortedRowIndex)); + dataSizeInCurrentChunk += 8L + 8L; break; case FLOAT: chunkWriterImpl.write(time, list.getFloat(sortedRowIndex)); + dataSizeInCurrentChunk += 8L + 4L; break; case DOUBLE: chunkWriterImpl.write(time, list.getDouble(sortedRowIndex)); + dataSizeInCurrentChunk += 8L + 8L; break; case TEXT: case BLOB: case STRING: - chunkWriterImpl.write(time, list.getBinary(sortedRowIndex)); + Binary value = list.getBinary(sortedRowIndex); + chunkWriterImpl.write(time, value); + dataSizeInCurrentChunk += 8L + getBinarySize(value); break; default: LOGGER.error("WritableMemChunk does not support data type: {}", tsDataType); break; } + pointNumInCurrentChunk++; + if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK + || dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) { + chunkWriterImpl.sealCurrentPage(); + chunkWriterImpl.clearPageWriter(); + try { + ioTaskQueue.put(chunkWriterImpl); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + chunkWriterImpl = createIChunkWriter(); + dataSizeInCurrentChunk = 0; + pointNumInCurrentChunk = 0; + } + } + if (pointNumInCurrentChunk != 0) { + chunkWriterImpl.sealCurrentPage(); + chunkWriterImpl.clearPageWriter(); + try { + ioTaskQueue.put(chunkWriterImpl); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java index b8a868cecb09..d40e4f99e36f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java @@ -46,7 +46,7 @@ public WritableMemChunkGroup() { } @Override - public boolean writeValuesWithFlushCheck( + public void writeTablet( long[] times, Object[] columns, BitMap[] bitMaps, @@ -54,22 +54,19 @@ public boolean writeValuesWithFlushCheck( int start, int end, TSStatus[] results) { - boolean flushFlag = false; for (int i = 0; i < columns.length; i++) { if (columns[i] == null) { continue; } IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i)); - flushFlag |= - memChunk.writeWithFlushCheck( - times, - columns[i], - bitMaps == null ? null : bitMaps[i], - schemaList.get(i).getType(), - start, - end); + memChunk.writeNonAlignedTablet( + times, + columns[i], + bitMaps == null ? null : bitMaps[i], + schemaList.get(i).getType(), + start, + end); } - return flushFlag; } private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) { @@ -99,17 +96,14 @@ public boolean contains(String measurement) { } @Override - public boolean writeWithFlushCheck( - long insertTime, Object[] objectValue, List schemaList) { - boolean flushFlag = false; + public void writeRow(long insertTime, Object[] objectValue, List schemaList) { for (int i = 0; i < objectValue.length; i++) { if (objectValue[i] == null) { continue; } IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i)); - flushFlag |= memChunk.writeWithFlushCheck(insertTime, objectValue[i]); + memChunk.writeNonAlignedPoint(insertTime, objectValue[i]); } - return flushFlag; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 676831bd2311..2de6a79d7e8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -77,9 +77,6 @@ public abstract class AlignedTVList extends TVList { // Index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex protected List> bitMaps; - // If a sensor chunk size of Text datatype reaches the threshold, this flag will be set true - boolean reachMaxChunkSizeFlag; - // not null when constructed by queries BitMap rowBitMap; @@ -88,7 +85,6 @@ public abstract class AlignedTVList extends TVList { indices = new ArrayList<>(types.size()); dataTypes = types; memoryBinaryChunkSize = new long[dataTypes.size()]; - reachMaxChunkSizeFlag = false; values = new ArrayList<>(types.size()); for (int i = 0; i < types.size(); i++) { @@ -199,9 +195,6 @@ public void putAlignedValue(long timestamp, Object[] value) { columnValue != null ? getBinarySize((Binary) columnValue) : getBinarySize(Binary.EMPTY_VALUE); - if (memoryBinaryChunkSize[i] >= TARGET_CHUNK_SIZE) { - reachMaxChunkSizeFlag = true; - } break; case FLOAT: ((float[]) columnValues.get(arrayIndex))[elementIndex] = @@ -721,11 +714,6 @@ protected void releaseLastValueArray() { } } - @Override - public boolean reachChunkSizeOrPointNumThreshold() { - return reachMaxChunkSizeFlag || rowCount >= MAX_SERIES_POINT_NUMBER; - } - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override public void putAlignedValues( @@ -800,9 +788,6 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex memoryBinaryChunkSize[i] += arrayT[elementIndex + i1] != null ? getBinarySize(arrayT[elementIndex + i1]) : 0; } - if (memoryBinaryChunkSize[i] > TARGET_CHUNK_SIZE) { - reachMaxChunkSizeFlag = true; - } break; case FLOAT: float[] arrayF = ((float[]) columnValues.get(arrayIndex)); @@ -1020,7 +1005,7 @@ public TsBlock buildTsBlock( // Value: 1,2,null,null // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1) // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value - // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2!=air.left:2, write(T:2,V:2) + // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2==pair.left:2, write(T:2,V:2) // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2, write(T:3,V:null) int originRowIndex; if (Objects.nonNull(lastValidPointIndexForTimeDupCheck) @@ -1380,6 +1365,44 @@ public BitMap getRowBitMap() { return new BitMap(rowCount, rowBitsArr); } + public int getAvgPointSizeOfLargestColumn() { + int largestPrimitivePointSize = 8; // TimeColumn or int64,double ValueColumn + long largestBinaryChunkSize = 0; + int largestBinaryColumnIndex = 0; + for (int i = 0; i < memoryBinaryChunkSize.length; i++) { + if (memoryBinaryChunkSize[i] > largestBinaryChunkSize) { + largestBinaryChunkSize = memoryBinaryChunkSize[i]; + largestBinaryColumnIndex = i; + } + } + if (largestBinaryChunkSize == 0) { + return largestPrimitivePointSize; + } + int avgPointSizeOfLargestBinaryColumn = + (int) largestBinaryChunkSize / getColumnValueCnt(largestBinaryColumnIndex); + return Math.max(avgPointSizeOfLargestBinaryColumn, largestPrimitivePointSize); + } + + public int getColumnValueCnt(int columnIndex) { + int pointNum = 0; + if (bitMaps == null || bitMaps.get(columnIndex) == null) { + pointNum = rowCount; + } else { + for (int i = 0; i < rowCount; i++) { + int arrayIndex = i / ARRAY_SIZE; + if (bitMaps.get(columnIndex).get(arrayIndex) == null) { + pointNum++; + } else { + int elementIndex = i % ARRAY_SIZE; + if (!bitMaps.get(columnIndex).get(arrayIndex).isMarked(elementIndex)) { + pointNum++; + } + } + } + } + return pointNum; + } + public List> getBitMaps() { return bitMaps; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index bb8cd6e513cc..8985f6496337 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -39,7 +39,6 @@ import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; -import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; public abstract class BinaryTVList extends TVList { @@ -47,13 +46,9 @@ public abstract class BinaryTVList extends TVList { // index relation: arrayIndex -> elementIndex protected List values; - // record total memory size of binary tvlist - long memoryBinaryChunkSize; - BinaryTVList() { super(); values = new ArrayList<>(); - memoryBinaryChunkSize = 0; } public static BinaryTVList newList() { @@ -71,7 +66,6 @@ public static BinaryTVList newList() { public TimBinaryTVList clone() { TimBinaryTVList cloneList = new TimBinaryTVList(); cloneAs(cloneList); - cloneList.memoryBinaryChunkSize = memoryBinaryChunkSize; for (Binary[] valueArray : values) { cloneList.values.add(cloneValue(valueArray)); } @@ -96,12 +90,6 @@ public void putBinary(long timestamp, Binary value) { if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) { sorted = false; } - memoryBinaryChunkSize += getBinarySize(value); - } - - @Override - public boolean reachChunkSizeOrPointNumThreshold() { - return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE || rowCount >= MAX_SERIES_POINT_NUMBER; } @Override @@ -113,8 +101,6 @@ public int delete(long lowerBound, long upperBound) { if (time < lowerBound || time > upperBound) { set(i, newSize++); maxTime = Math.max(maxTime, time); - } else { - memoryBinaryChunkSize -= getBinarySize(getBinary(i)); } } int deletedNumber = rowCount - newSize; @@ -160,7 +146,6 @@ void clearValue() { } values.clear(); } - memoryBinaryChunkSize = 0; } @Override @@ -223,11 +208,6 @@ public void putBinaries(long[] time, Binary[] value, BitMap bitMap, int start, i updateMaxTimeAndSorted(time, start, end); } - // update raw size - for (int i = idx; i < end; i++) { - memoryBinaryChunkSize += getBinarySize(value[i]); - } - while (idx < end) { int inputRemaining = end - idx; int arrayIdx = rowCount / ARRAY_SIZE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index d75fc857c26c..d9b6bc4e12d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -21,7 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager; import org.apache.iotdb.db.utils.MathUtils; @@ -49,10 +48,6 @@ public abstract class TVList implements WALEntryValue { protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent"; - protected static final long TARGET_CHUNK_SIZE = - IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); - protected static final long MAX_SERIES_POINT_NUMBER = - IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); // list of timestamp array, add 1 when expanded -> data point timestamp array // index relation: arrayIndex -> elementIndex protected List timestamps; @@ -156,10 +151,6 @@ public void putBinary(long time, Binary value) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public boolean reachChunkSizeOrPointNumThreshold() { - return rowCount >= MAX_SERIES_POINT_NUMBER; - } - public void putBoolean(long time, boolean value) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index d7ca89f7637e..19c13529e1e5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -1054,8 +1054,6 @@ public void testInsertUnSequenceRows() QueryProcessException, DataRegionException, TsFileProcessorException { - int defaultAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold(); - config.setAvgSeriesPointNumberThreshold(2); DataRegion dataRegion1 = new DummyDataRegion(systemDir, "root.Rows"); long[] time = new long[] {3, 4, 1, 2}; List indexList = new ArrayList<>(); @@ -1085,7 +1083,6 @@ tmpDeviceId, new MeasurementSchema(measurementId, TSDataType.INT32))), Assert.assertTrue(resource.isClosed()); } dataRegion1.syncDeleteDataFiles(); - config.setAvgSeriesPointNumberThreshold(defaultAvgSeriesPointNumberThreshold); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index 290acc353bda..bee7ed03a54d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -102,7 +102,7 @@ public void memSeriesSortIteratorTest() throws IOException { new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN)); int count = 1000; for (int i = 0; i < count; i++) { - series.writeWithFlushCheck(i, i); + series.writeNonAlignedPoint(i, i); } IPointReader it = series.getSortedTvListForQuery().buildTsBlock().getTsBlockSingleColumnIterator(); @@ -121,11 +121,11 @@ public void memSeriesToStringTest() throws IOException { new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN)); int count = 100; for (int i = 0; i < count; i++) { - series.writeWithFlushCheck(i, i); + series.writeNonAlignedPoint(i, i); } - series.writeWithFlushCheck(0, 21); - series.writeWithFlushCheck(99, 20); - series.writeWithFlushCheck(20, 21); + series.writeNonAlignedPoint(0, 21); + series.writeNonAlignedPoint(99, 20); + series.writeNonAlignedPoint(20, 21); String str = series.toString(); Assert.assertFalse(series.getTVList().isSorted()); Assert.assertEquals( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 7daa2ef837f3..d00ed4d6276f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -22,8 +22,11 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; @@ -40,12 +43,19 @@ import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.commons.io.FileUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.TsFileReader; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.expression.QueryExpression; +import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.read.reader.IPointReader; import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.record.datapoint.DataPoint; @@ -64,6 +74,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import static junit.framework.TestCase.assertTrue; import static org.apache.iotdb.db.storageengine.dataregion.DataRegionTest.buildInsertRowNodeByTSRecord; @@ -72,6 +83,8 @@ public class TsFileProcessorTest { + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private TsFileProcessor processor; private final String storageGroup = "root.vehicle"; private DataRegionInfo sgInfo; @@ -83,6 +96,8 @@ public class TsFileProcessorTest { private final Map props = Collections.emptyMap(); private QueryContext context; private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info"); + private long defaultTargetChunkPointNum; + private long defaultTargetChunkSize; private static final Logger logger = LoggerFactory.getLogger(TsFileProcessorTest.class); public TsFileProcessorTest() {} @@ -93,6 +108,8 @@ public void setUp() throws DataRegionException { if (!file.getParentFile().exists()) { Assert.assertTrue(file.getParentFile().mkdirs()); } + defaultTargetChunkPointNum = config.getTargetChunkPointNum(); + defaultTargetChunkSize = config.getTargetChunkSize(); EnvironmentUtils.envSetUp(); sgInfo = new DataRegionInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup)); context = EnvironmentUtils.TEST_QUERY_CONTEXT; @@ -102,10 +119,22 @@ public void setUp() throws DataRegionException { public void tearDown() throws Exception { EnvironmentUtils.cleanEnv(); EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR); + File file = new File(filePath); + File resource = new File(filePath + ".resource"); + try { + FileUtils.delete(file); + if (resource.exists()) { + FileUtils.delete(resource); + } + } catch (IOException ignored) { + } + config.setTargetChunkPointNum(defaultTargetChunkPointNum); + config.setTargetChunkSize(defaultTargetChunkSize); } @Test - public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException { + public void testWriteAndFlush() + throws IOException, WriteProcessException, MetadataException, ExecutionException { logger.info("testWriteAndFlush begin.."); processor = new TsFileProcessor( @@ -158,12 +187,358 @@ public void testWriteAndFlush() throws IOException, WriteProcessException, Metad tsfileResourcesForQuery.clear(); processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty()); + assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size()); + processor.syncClose(); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath); + TsFileReader readTsFile = new TsFileReader(reader)) { + QueryExpression queryExpression = + QueryExpression.create( + Collections.singletonList(new Path(deviceId, measurementId, false)), null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + int num = 1; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + assertEquals(num, rowRecord.getTimestamp()); + assertEquals(num, rowRecord.getFields().get(0).getIntV()); + num++; + } + assertEquals(101, num); + } + } + + @Test + public void testFlushMultiChunks() + throws IOException, WriteProcessException, MetadataException, ExecutionException { + config.setTargetChunkPointNum(40); + processor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + + TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); + processor.setTsFileProcessorInfo(tsFileProcessorInfo); + this.sgInfo.initTsFileProcessorInfo(processor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); + List tsfileResourcesForQuery = new ArrayList<>(); + NonAlignedFullPath fullPath = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId), + new MeasurementSchema( + measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props)); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.isEmpty()); + + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + } + + // query data in memory + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + + TsFileResource tsFileResource = tsfileResourcesForQuery.get(0); + assertFalse(tsFileResource.getReadOnlyMemChunk(fullPath).isEmpty()); + List memChunks = tsFileResource.getReadOnlyMemChunk(fullPath); + for (ReadOnlyMemChunk chunk : memChunks) { + IPointReader iterator = chunk.getPointReader(); + for (int num = 1; num <= 100; num++) { + iterator.hasNextTimeValuePair(); + TimeValuePair timeValuePair = iterator.nextTimeValuePair(); + assertEquals(num, timeValuePair.getTimestamp()); + assertEquals(num, timeValuePair.getValue().getInt()); + } + } + + // flush synchronously + processor.syncFlush(); + + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty()); + assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size()); + processor.syncClose(); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath); + TsFileReader readTsFile = new TsFileReader(reader)) { + QueryExpression queryExpression = + QueryExpression.create( + Collections.singletonList(new Path(deviceId, measurementId, false)), null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + int num = 1; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + assertEquals(num, rowRecord.getTimestamp()); + assertEquals(num, rowRecord.getFields().get(0).getIntV()); + num++; + } + assertEquals(101, num); + } + } + + @Test + public void testFlushMultiBinaryChunks() + throws IOException, WriteProcessException, MetadataException, ExecutionException { + config.setTargetChunkSize(1536L); + processor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + + TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); + processor.setTsFileProcessorInfo(tsFileProcessorInfo); + this.sgInfo.initTsFileProcessorInfo(processor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); + List tsfileResourcesForQuery = new ArrayList<>(); + NonAlignedFullPath fullPath = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId), + new MeasurementSchema( + measurementId, + TSDataType.TEXT, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, + props)); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.isEmpty()); + + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, measurementId, String.valueOf(i))); + processor.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + } + + // query data in memory + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + + TsFileResource tsFileResource = tsfileResourcesForQuery.get(0); + assertFalse(tsFileResource.getReadOnlyMemChunk(fullPath).isEmpty()); + List memChunks = tsFileResource.getReadOnlyMemChunk(fullPath); + for (ReadOnlyMemChunk chunk : memChunks) { + IPointReader iterator = chunk.getPointReader(); + for (int num = 1; num <= 100; num++) { + iterator.hasNextTimeValuePair(); + TimeValuePair timeValuePair = iterator.nextTimeValuePair(); + assertEquals(num, timeValuePair.getTimestamp()); + assertEquals(String.valueOf(num), timeValuePair.getValue().getStringValue()); + } + } + + // flush synchronously + processor.syncFlush(); + + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty()); + assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size()); processor.syncClose(); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath); + TsFileReader readTsFile = new TsFileReader(reader)) { + QueryExpression queryExpression = + QueryExpression.create( + Collections.singletonList(new Path(deviceId, measurementId, false)), null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + int num = 1; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + assertEquals(num, rowRecord.getTimestamp()); + assertEquals(String.valueOf(num), rowRecord.getFields().get(0).getStringValue()); + num++; + } + assertEquals(101, num); + } + } + + @Test + public void testFlushMultiAlignedChunks() + throws IOException, WriteProcessException, MetadataException, ExecutionException { + config.setTargetChunkPointNum(40); + processor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + + TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); + processor.setTsFileProcessorInfo(tsFileProcessorInfo); + this.sgInfo.initTsFileProcessorInfo(processor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); + List tsfileResourcesForQuery = new ArrayList<>(); + AlignedFullPath fullPath = + new AlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId), + Collections.singletonList(measurementId), + Collections.singletonList( + new MeasurementSchema( + measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props))); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.isEmpty()); + + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.setAligned(true); + processor.insert(rowNode, new long[4]); + } + + // add another point time = 1, value = 1 + TSRecord record = new TSRecord(1, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); + InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.setAligned(true); + processor.insert(rowNode, new long[4]); + + // query data in memory + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + + TsFileResource tsFileResource = tsfileResourcesForQuery.get(0); + assertFalse(tsFileResource.getReadOnlyMemChunk(fullPath).isEmpty()); + List memChunks = tsFileResource.getReadOnlyMemChunk(fullPath); + for (ReadOnlyMemChunk chunk : memChunks) { + IPointReader iterator = chunk.getPointReader(); + for (int num = 1; num <= 100; num++) { + iterator.hasNextTimeValuePair(); + TimeValuePair timeValuePair = iterator.nextTimeValuePair(); + assertEquals(num, timeValuePair.getTimestamp()); + assertEquals(num, timeValuePair.getValue().getVector()[0].getInt()); + } + } + + // flush synchronously + processor.syncFlush(); + + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty()); + assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size()); + processor.syncClose(); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath); + TsFileReader readTsFile = new TsFileReader(reader)) { + QueryExpression queryExpression = + QueryExpression.create( + Collections.singletonList(new Path(deviceId, measurementId, false)), null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + int num = 1; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + assertEquals(num, rowRecord.getTimestamp()); + assertEquals(num, rowRecord.getFields().get(0).getIntV()); + num++; + } + assertEquals(101, num); + } + } + + @Test + public void testFlushMultiAlignedBinaryChunks() + throws IOException, WriteProcessException, MetadataException, ExecutionException { + config.setTargetChunkSize(1536L); + processor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + + TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); + processor.setTsFileProcessorInfo(tsFileProcessorInfo); + this.sgInfo.initTsFileProcessorInfo(processor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); + List tsfileResourcesForQuery = new ArrayList<>(); + AlignedFullPath fullPath = + new AlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId), + Collections.singletonList(measurementId), + Collections.singletonList( + new MeasurementSchema( + measurementId, + TSDataType.TEXT, + encoding, + CompressionType.UNCOMPRESSED, + props))); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.isEmpty()); + + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, measurementId, String.valueOf(i))); + InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.setAligned(true); + processor.insert(rowNode, new long[4]); + } + // add another point time = 1, value = "1" + TSRecord record = new TSRecord(1, deviceId); + record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, measurementId, "1")); + InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.setAligned(true); + processor.insert(rowNode, new long[4]); + + // query data in memory + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + + TsFileResource tsFileResource = tsfileResourcesForQuery.get(0); + assertFalse(tsFileResource.getReadOnlyMemChunk(fullPath).isEmpty()); + List memChunks = tsFileResource.getReadOnlyMemChunk(fullPath); + for (ReadOnlyMemChunk chunk : memChunks) { + IPointReader iterator = chunk.getPointReader(); + for (int num = 1; num <= 100; num++) { + iterator.hasNextTimeValuePair(); + TimeValuePair timeValuePair = iterator.nextTimeValuePair(); + assertEquals(num, timeValuePair.getTimestamp()); + assertEquals(String.valueOf(num), timeValuePair.getValue().getVector()[0].getStringValue()); + } + } + + // flush synchronously + processor.syncFlush(); + + tsfileResourcesForQuery.clear(); + processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery); + assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty()); + assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size()); + processor.syncClose(); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath); + TsFileReader readTsFile = new TsFileReader(reader)) { + QueryExpression queryExpression = + QueryExpression.create( + Collections.singletonList(new Path(deviceId, measurementId, false)), null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + int num = 1; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + assertEquals(num, rowRecord.getTimestamp()); + assertEquals(String.valueOf(num), rowRecord.getFields().get(0).getStringValue()); + num++; + } + assertEquals(101, num); + } } @Test public void testWriteAndRestoreMetadata() - throws IOException, WriteProcessException, MetadataException { + throws IOException, WriteProcessException, MetadataException, ExecutionException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor( @@ -246,8 +621,8 @@ public void testWriteAndRestoreMetadata() } @Test - public void testMultiFlush() throws IOException, WriteProcessException, MetadataException { - logger.info("testWriteAndRestoreMetadata begin.."); + public void testMultiFlush() + throws IOException, WriteProcessException, MetadataException, ExecutionException { processor = new TsFileProcessor( storageGroup, @@ -803,8 +1178,8 @@ public void testRamCostInsertSameDataBy2Ways2() } @Test - public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException { - logger.info("testWriteAndRestoreMetadata begin.."); + public void testWriteAndClose() + throws IOException, WriteProcessException, MetadataException, ExecutionException { processor = new TsFileProcessor( storageGroup, @@ -852,11 +1227,6 @@ public void testWriteAndClose() throws IOException, WriteProcessException, Metad // close synchronously processor.syncClose(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } assertTrue(processor.getTsFileResource().isClosed()); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManagerTest.java index eda5b70d4bd8..11dcf4193e2a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManagerTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -64,7 +65,9 @@ public void test() throws IOException, InterruptedException { for (int i = 1; i <= MAX_FILE_SIZE; i++) { File file = SystemFileFactory.INSTANCE.getFile(filePath + i); - file.createNewFile(); + TsFileIOWriter writer = new TsFileIOWriter(file); + writer.endFile(); + writer.close(); tsFileResources[i] = new TsFileResource(file); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index bef179c8aa4d..e4d133a81cd5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -259,10 +259,6 @@ public static void cleanDir(String dir) throws IOException { public static void envSetUp() { logger.debug("EnvironmentUtil setup..."); config.setThriftServerAwaitTimeForStopService(60); - // we do not start 9091 port in test. - config.setAvgSeriesPointNumberThreshold(Integer.MAX_VALUE); - // use async wal mode in test - config.setAvgSeriesPointNumberThreshold(Integer.MAX_VALUE); createAllDir(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java index 1208ce4878d6..0ac18c375c3c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java @@ -107,35 +107,5 @@ public void testClone() { Assert.assertEquals(tvList.getBinary((int) i), clonedTvList.getBinary((int) i)); Assert.assertEquals(tvList.getTime((int) i), clonedTvList.getTime((int) i)); } - Assert.assertEquals(tvList.memoryBinaryChunkSize, clonedTvList.memoryBinaryChunkSize); - } - - @Test - public void testCalculateChunkSize() { - BinaryTVList tvList = BinaryTVList.newList(); - for (int i = 0; i < 10; i++) { - tvList.putBinary(i, BytesUtils.valueOf(String.valueOf(i))); - } - Assert.assertEquals(tvList.memoryBinaryChunkSize, 360); - - Binary[] binaryList = new Binary[10]; - List timeList = new ArrayList<>(); - BitMap bitMap = new BitMap(10); - for (int i = 0; i < 10; i++) { - timeList.add((long) i + 10); - binaryList[i] = BytesUtils.valueOf(String.valueOf(i)); - if (i % 2 == 0) { - bitMap.mark(i); - } - } - tvList.putBinaries( - ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), binaryList, bitMap, 0, 10); - Assert.assertEquals(tvList.memoryBinaryChunkSize, 540); - - tvList.delete(5, 15); - Assert.assertEquals(tvList.memoryBinaryChunkSize, 252); - - tvList.clear(); - Assert.assertEquals(tvList.memoryBinaryChunkSize, 0); } } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 531891e456da..e396c31161ef 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1124,10 +1124,18 @@ unseq_memtable_flush_check_interval_in_ms=30000 # effectiveMode: restart tvlist_sort_algorithm=TIM -# When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 100000. +# The target point nums in one chunk in flushing and compaction. +# If the point number of a timeseries in memtable exceeds this, the data will be flushed to multiple chunks. # effectiveMode: restart -# Datatype: int -avg_series_point_number_threshold=100000 +# Datatype: long +target_chunk_point_num=100000 + +# The target chunk size in flushing and compaction. +# If the size of a timeseries in memtable exceeds this, the data will be flushed to multiple chunks. +# default is 1.6MB +# effectiveMode: restart +# Datatype: long, Unit: byte +target_chunk_size=1600000 # How many threads can concurrently flush. When <= 0, use CPU core number. # effectiveMode: restart @@ -1258,17 +1266,6 @@ inner_compaction_total_file_num_threshold=100 # Datatype: int max_level_gap_in_inner_compaction=2 -# The target chunk size in compaction and when memtable reaches this threshold, flush the memtable to disk. -# default is 1MB -# effectiveMode: restart -# Datatype: long, Unit: byte -target_chunk_size=1048576 - -# The target point nums in one chunk in compaction -# effectiveMode: restart -# Datatype: long -target_chunk_point_num=100000 - # If the chunk size is lower than this threshold, it will be deserialized into points, default is 128 byte # effectiveMode: restart # Datatype: long, Unit:byte