Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the logic of memtable flush controlled by memory series size #13653

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
de38a7d
Split non_aligned charge text chunk
HTHou Apr 25, 2024
abe456f
merge master
HTHou Jul 5, 2024
ca7b6ea
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Sep 27, 2024
5ee3c89
dev non_aligned
HTHou Sep 27, 2024
3a994a3
dev aligned chunk split
HTHou Sep 29, 2024
85bd62c
new type
HTHou Sep 29, 2024
2dcd204
dev aligned binary chunk split
HTHou Sep 29, 2024
fb8f929
Fix binary size calculatation
HTHou Sep 29, 2024
41bc88b
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Sep 29, 2024
80e63f7
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Sep 29, 2024
0b5ad66
fix IT
HTHou Sep 29, 2024
7b463c8
update IoTDBDuplicateTimeIT.java
HTHou Sep 29, 2024
28aa427
fix pipe IT
HTHou Sep 29, 2024
1042cfc
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 10, 2024
fbf19e5
change method names
HTHou Oct 10, 2024
252d6e0
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 11, 2024
aeeef0d
add ut
HTHou Oct 11, 2024
506fceb
add UT
HTHou Oct 11, 2024
a0d46df
remove useless methods
HTHou Oct 11, 2024
96eb7e7
fix UT
HTHou Oct 11, 2024
082d9f6
fix /FileReaderManagerTest
HTHou Oct 11, 2024
b564ca2
fix win UT
HTHou Oct 12, 2024
c8207da
add binary test
HTHou Oct 12, 2024
b56aec9
Add Aligned UTs
HTHou Oct 12, 2024
2e84d21
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 12, 2024
da3eee2
fix win ut
HTHou Oct 12, 2024
99327ff
improve coverage
HTHou Oct 12, 2024
4434cd5
fix comments
HTHou Oct 12, 2024
31e27bf
fix windows UT
HTHou Oct 12, 2024
e0ac04c
fix review
HTHou Oct 15, 2024
ef55416
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 15, 2024
9881eb1
fix review
HTHou Oct 16, 2024
4339899
fix review
HTHou Oct 16, 2024
2780865
target chunk size count non binary
HTHou Oct 16, 2024
0393c75
Merge branch 'master' into split_text_chunk
HTHou Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) {
}

@Override
public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
setProperty("avg_series_point_number_threshold", String.valueOf(avgSeriesPointNumberThreshold));
public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) {
setProperty("target_chunk_point_num", String.valueOf(targetChunkPointNum));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) {
}

@Override
public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
cnConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
dnConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) {
cnConfig.setTargetChunkPointNum(targetChunkPointNum);
dnConfig.setTargetChunkPointNum(targetChunkPointNum);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) {
}

@Override
public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface CommonConfig {

CommonConfig setPrimitiveArraySize(int primitiveArraySize);

CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold);
CommonConfig setTargetChunkPointNum(int targetChunkPointNum);

CommonConfig setMaxTsBlockLineNumber(int maxTsBlockLineNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class IoTDBDuplicateTimeIT {

@Before
public void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getCommonConfig().setAvgSeriesPointNumberThreshold(2);
// Adjust memstable threshold size to make it flush automatically
EnvFactory.getEnv().initClusterEnvironment();
}

Expand All @@ -62,16 +60,19 @@ public void testDuplicateTime() throws SQLException {
// version-1 tsfile
statement.execute("insert into root.db.d1(time,s1) values (2,2)");
statement.execute("insert into root.db.d1(time,s1) values (3,3)");
statement.execute("flush");

// version-2 unseq work memtable
statement.execute("insert into root.db.d1(time,s1) values (2,20)");

// version-3 tsfile
statement.execute("insert into root.db.d1(time,s1) values (5,5)");
statement.execute("insert into root.db.d1(time,s1) values (6,6)");
statement.execute("flush root.db true");

// version-2 unseq work memtable -> unseq tsfile
statement.execute("insert into root.db.d1(time,s1) values (5,50)");
statement.execute("flush");

try (ResultSet set = statement.executeQuery("SELECT s1 FROM root.db.d1 where time = 5")) {
int cnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ public void testRecoverWALDeleteSchema() throws Exception {
@Test
public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
config.setAvgSeriesPointNumberThreshold(2);
long tsFileSize = config.getSeqTsFileSize();
long unFsFileSize = config.getSeqTsFileSize();
config.setSeqTsFileSize(10000000);
Expand All @@ -320,6 +318,7 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
statement.execute("create timeseries root.turbine1.d1.s1 with datatype=INT64");
statement.execute("insert into root.turbine1.d1(timestamp,s1) values(1,1)");
statement.execute("insert into root.turbine1.d1(timestamp,s1) values(2,1)");
statement.execute("flush");
statement.execute("create timeseries root.turbine1.d1.s2 with datatype=BOOLEAN");
statement.execute("insert into root.turbine1.d1(timestamp,s2) values(3,true)");
statement.execute("insert into root.turbine1.d1(timestamp,s2) values(4,true)");
Expand All @@ -342,7 +341,6 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
assertEquals(2, cnt);
}

config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
config.setSeqTsFileSize(tsFileSize);
config.setUnSeqTsFileSize(unFsFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void test2() {
.getConfig()
.getCommonConfig()
.setMaxNumberOfPointsInPage(4)
.setAvgSeriesPointNumberThreshold(2);
.setTargetChunkPointNum(2);
EnvFactory.getEnv().initClusterEnvironment();
String[] expectedHeader = new String[] {TIMESTAMP_STR, count("root.sg2.d1.s1")};
String[] retArray = new String[] {"5,1,", "10,1,", "15,2,", "20,0,", "25,1,"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,6 @@ public void testRecoverWALDeleteSchema() throws Exception {
@Test
public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
config.setAvgSeriesPointNumberThreshold(2);
long tsFileSize = config.getSeqTsFileSize();
long unFsFileSize = config.getSeqTsFileSize();
config.setSeqTsFileSize(10000000);
Expand All @@ -351,6 +349,7 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
"create table turbine1 (id1 string id, s1 int64 measurement, s2 boolean measurement)");
statement.execute("insert into turbine1(time,id1,s1) values(1,\'d1\',1)");
statement.execute("insert into turbine1(time,id1,s1) values(2,\'d1\',2)");
statement.execute("flush");
statement.execute("insert into turbine1(time,id1,s2) values(3,\'d1\',true)");
statement.execute("insert into turbine1(time,id1,s2) values(4,\'d1\',true)");
}
Expand All @@ -373,7 +372,6 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
assertEquals(2, cnt);
}

config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
config.setSeqTsFileSize(tsFileSize);
config.setUnSeqTsFileSize(unFsFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,6 @@ public class IoTDBConfig {
/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;

/** When average series point number reaches this, flush the memtable to disk */
private int avgSeriesPointNumberThreshold = 100000;

/** Enable inner space compaction for sequence files */
private volatile boolean enableSeqSpaceCompaction = true;

Expand Down Expand Up @@ -488,10 +485,10 @@ public class IoTDBConfig {
/** The target tsfile size in compaction, 2 GB by default */
private long targetCompactionFileSize = 2147483648L;

/** The target chunk size in compaction. */
private long targetChunkSize = 1048576L;
/** The target chunk size in compaction and flushing. */
private long targetChunkSize = 1600000L;

/** The target chunk point num in compaction. */
/** The target chunk point num in compaction and flushing. */
private long targetChunkPointNum = 100000L;

/**
Expand Down Expand Up @@ -2279,14 +2276,6 @@ public void setTvListSortAlgorithm(TVListSortAlgorithm tvListSortAlgorithm) {
this.tvListSortAlgorithm = tvListSortAlgorithm;
}

public int getAvgSeriesPointNumberThreshold() {
return avgSeriesPointNumberThreshold;
}

public void setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold;
}

public boolean isRpcThriftCompressionEnable() {
return rpcThriftCompressionEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
properties.getProperty(
"tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString())));

conf.setAvgSeriesPointNumberThreshold(
Integer.parseInt(
properties.getProperty(
"avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));

conf.setCheckPeriodWhenInsertBlocked(
Integer.parseInt(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.metrics.utils.MetricType;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class WritingMetrics implements IMetricSet {
Expand Down Expand Up @@ -363,7 +364,7 @@ private void unbindWALCostMetrics(AbstractMetricService metricService) {
MAKE_CHECKPOINT,
Tag.TYPE.toString(),
type));
Arrays.asList(SERIALIZE_WAL_ENTRY_TOTAL)
Collections.singletonList(SERIALIZE_WAL_ENTRY_TOTAL)
.forEach(
type ->
metricService.remove(
Expand Down Expand Up @@ -407,7 +408,6 @@ private void unbindWALCostMetrics(AbstractMetricService metricService) {
public static final String WAL_FLUSH_MEMTABLE_COUNT = "wal_flush_memtable_count";
public static final String MANUAL_FLUSH_MEMTABLE_COUNT = "manual_flush_memtable_count";
public static final String MEM_CONTROL_FLUSH_MEMTABLE_COUNT = "mem_control_flush_memtable_count";
public static final String SERIES_FULL_FLUSH_MEMTABLE = "series_full_flush_memtable";

private Gauge flushThreholdGauge = DoNothingMetricManager.DO_NOTHING_GAUGE;
private Gauge rejectThreholdGauge = DoNothingMetricManager.DO_NOTHING_GAUGE;
Expand All @@ -416,7 +416,6 @@ private void unbindWALCostMetrics(AbstractMetricService metricService) {

private Counter walFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter timedFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter seriesFullFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter manualFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter memControlFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;

Expand All @@ -429,7 +428,6 @@ public void bindDataRegionMetrics() {
createActiveTimePartitionCounterMetrics();
walFlushMemtableCounter = createWalFlushMemTableCounterMetrics();
timedFlushMemtableCounter = createTimedFlushMemTableCounterMetrics();
seriesFullFlushMemtableCounter = createSeriesFullFlushMemTableCounterMetrics();
manualFlushMemtableCounter = createManualFlushMemTableCounterMetrics();
memControlFlushMemtableCounter = createMemControlFlushMemTableCounterMetrics();

Expand Down Expand Up @@ -462,7 +460,6 @@ public void unbindDataRegionMetrics() {
removeActiveMemtableCounterMetrics(dataRegionId);
});
removeActiveTimePartitionCounterMetrics();
removeSeriesFullFlushMemTableCounterMetrics();
removeTimedFlushMemTableCounterMetrics();
removeWalFlushMemTableCounterMetrics();
removeManualFlushMemTableCounterMetrics();
Expand Down Expand Up @@ -575,15 +572,6 @@ public Counter createTimedFlushMemTableCounterMetrics() {
TIMED_FLUSH_MEMTABLE_COUNT);
}

public Counter createSeriesFullFlushMemTableCounterMetrics() {
return MetricService.getInstance()
.getOrCreateCounter(
Metric.FLUSH_MEMTABLE_COUNT.toString(),
MetricLevel.IMPORTANT,
Tag.TYPE.toString(),
SERIES_FULL_FLUSH_MEMTABLE);
}

public Counter createManualFlushMemTableCounterMetrics() {
return MetricService.getInstance()
.getOrCreateCounter(
Expand Down Expand Up @@ -616,15 +604,6 @@ public void createActiveTimePartitionCounterMetrics() {
.getOrCreateCounter(Metric.ACTIVE_TIME_PARTITION_COUNT.toString(), MetricLevel.IMPORTANT);
}

public void removeSeriesFullFlushMemTableCounterMetrics() {
MetricService.getInstance()
.remove(
MetricType.COUNTER,
Metric.FLUSH_MEMTABLE_COUNT.toString(),
Tag.TYPE.toString(),
SERIES_FULL_FLUSH_MEMTABLE);
}

public void removeTimedFlushMemTableCounterMetrics() {
MetricService.getInstance()
.remove(
Expand Down Expand Up @@ -929,10 +908,6 @@ public void recordWalFlushMemTableCount(int number) {
walFlushMemtableCounter.inc(number);
}

public void recordSeriesFullFlushMemTableCount(int number) {
seriesFullFlushMemtableCounter.inc(number);
}

public void recordManualFlushMemTableCount(int number) {
manualFlushMemtableCounter.inc(number);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3063,86 +3063,6 @@ private long computeMaxVersion(Long oldVersion, Long newVersion) {
return Math.max(oldVersion, newVersion);
}

/**
* If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
* reduce unnecessary merge. Only used when the file sender and the receiver share the same file
* close policy. Warning: DO NOT REMOVE
*/
@SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
writeLock("removeFullyOverlapFiles");
try {
Iterator<TsFileResource> iterator = tsFileManager.getIterator(true);
removeFullyOverlapFiles(resource, iterator, true);

iterator = tsFileManager.getIterator(false);
removeFullyOverlapFiles(resource, iterator, false);
} finally {
writeUnlock();
}
}

private void removeFullyOverlapFiles(
TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) {
while (iterator.hasNext()) {
TsFileResource existingTsFile = iterator.next();
if (newTsFile.isPlanRangeCovers(existingTsFile)
&& !newTsFile.getTsFile().equals(existingTsFile.getTsFile())
&& existingTsFile.tryWriteLock()) {
logger.info(
"{} is covered by {}: [{}, {}], [{}, {}], remove it",
existingTsFile,
newTsFile,
existingTsFile.minPlanIndex,
existingTsFile.maxPlanIndex,
newTsFile.minPlanIndex,
newTsFile.maxPlanIndex);
// if we fail to lock the file, it means it is being queried or merged and we will not
// wait until it is free, we will just leave it to the next merge
try {
removeFullyOverlapFile(existingTsFile, iterator, isSeq);
} catch (Exception e) {
logger.error(
"Something gets wrong while removing FullyOverlapFiles: {}",
existingTsFile.getTsFile().getAbsolutePath(),
e);
} finally {
existingTsFile.writeUnlock();
}
}
}
}

/**
* Remove the given {@link TsFileResource}. If the corresponding {@link TsFileProcessor} is in the
* working status, close it before remove the related resource files. maybe time-consuming for
* closing a tsfile.
*/
private void removeFullyOverlapFile(
TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) {
logger.info(
"Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed());
if (!tsFileResource.isClosed()) {
try {
// also remove the TsFileProcessor if the overlapped file is not closed
long timePartition = tsFileResource.getTimePartition();
Map<Long, TsFileProcessor> fileProcessorMap =
isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;
TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition);
if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) {
// have to take some time to close the tsFileProcessor
tsFileProcessor.syncClose();
fileProcessorMap.remove(timePartition);
}
} catch (Exception e) {
logger.error("Cannot close {}", tsFileResource, e);
}
}
tsFileManager.remove(tsFileResource, isSeq);
iterator.remove();
tsFileResource.remove();
}

private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) {
long version =
partitionMaxFileVersions.compute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -67,8 +68,8 @@ public class MemTableFlushTask {
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;

private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Object> ioTaskQueue =
private final BlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<Object> ioTaskQueue =
(SystemInfo.getInstance().isEncodingFasterThanIo())
? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
: new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -247,16 +248,7 @@ public void run() {
} else {
long starTime = System.currentTimeMillis();
IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
IChunkWriter seriesWriter = writableMemChunk.createIChunkWriter();
writableMemChunk.encode(seriesWriter);
seriesWriter.sealCurrentPage();
seriesWriter.clearPageWriter();
try {
ioTaskQueue.put(seriesWriter);
} catch (InterruptedException e) {
LOGGER.error("Put task into ioTaskQueue Interrupted");
Thread.currentThread().interrupt();
}
writableMemChunk.encode(ioTaskQueue);
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
memSerializeTime += subTaskTime;
Expand Down
Loading
Loading