Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the logic of memtable flush controlled by memory series size #13653

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
de38a7d
Split non_aligned charge text chunk
HTHou Apr 25, 2024
abe456f
merge master
HTHou Jul 5, 2024
ca7b6ea
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Sep 27, 2024
5ee3c89
dev non_aligned
HTHou Sep 27, 2024
3a994a3
dev aligned chunk split
HTHou Sep 29, 2024
85bd62c
new type
HTHou Sep 29, 2024
2dcd204
dev aligned binary chunk split
HTHou Sep 29, 2024
fb8f929
Fix binary size calculatation
HTHou Sep 29, 2024
41bc88b
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Sep 29, 2024
80e63f7
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Sep 29, 2024
0b5ad66
fix IT
HTHou Sep 29, 2024
7b463c8
update IoTDBDuplicateTimeIT.java
HTHou Sep 29, 2024
28aa427
fix pipe IT
HTHou Sep 29, 2024
1042cfc
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 10, 2024
fbf19e5
change method names
HTHou Oct 10, 2024
252d6e0
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 11, 2024
aeeef0d
add ut
HTHou Oct 11, 2024
506fceb
add UT
HTHou Oct 11, 2024
a0d46df
remove useless methods
HTHou Oct 11, 2024
96eb7e7
fix UT
HTHou Oct 11, 2024
082d9f6
fix /FileReaderManagerTest
HTHou Oct 11, 2024
b564ca2
fix win UT
HTHou Oct 12, 2024
c8207da
add binary test
HTHou Oct 12, 2024
b56aec9
Add Aligned UTs
HTHou Oct 12, 2024
2e84d21
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 12, 2024
da3eee2
fix win ut
HTHou Oct 12, 2024
99327ff
improve coverage
HTHou Oct 12, 2024
4434cd5
fix comments
HTHou Oct 12, 2024
31e27bf
fix windows UT
HTHou Oct 12, 2024
e0ac04c
fix review
HTHou Oct 15, 2024
ef55416
Merge branch 'master' of github.com:apache/iotdb into split_text_chunk
HTHou Oct 15, 2024
9881eb1
fix review
HTHou Oct 16, 2024
4339899
fix review
HTHou Oct 16, 2024
2780865
target chunk size count non binary
HTHou Oct 16, 2024
0393c75
Merge branch 'master' into split_text_chunk
HTHou Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) {
}

@Override
public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
setProperty("avg_series_point_number_threshold", String.valueOf(avgSeriesPointNumberThreshold));
public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) {
setProperty("target_chunk_point_num", String.valueOf(targetChunkPointNum));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) {
}

@Override
public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
cnConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
dnConfig.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) {
cnConfig.setTargetChunkPointNum(targetChunkPointNum);
dnConfig.setTargetChunkPointNum(targetChunkPointNum);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public CommonConfig setPrimitiveArraySize(int primitiveArraySize) {
}

@Override
public CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
public CommonConfig setTargetChunkPointNum(int targetChunkPointNum) {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface CommonConfig {

CommonConfig setPrimitiveArraySize(int primitiveArraySize);

CommonConfig setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold);
CommonConfig setTargetChunkPointNum(int targetChunkPointNum);

CommonConfig setMaxTsBlockLineNumber(int maxTsBlockLineNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ public void testRecoverWALDeleteSchema() throws Exception {
@Test
public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
config.setAvgSeriesPointNumberThreshold(2);
long tsFileSize = config.getSeqTsFileSize();
long unFsFileSize = config.getSeqTsFileSize();
config.setSeqTsFileSize(10000000);
Expand All @@ -320,6 +318,7 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
statement.execute("create timeseries root.turbine1.d1.s1 with datatype=INT64");
statement.execute("insert into root.turbine1.d1(timestamp,s1) values(1,1)");
statement.execute("insert into root.turbine1.d1(timestamp,s1) values(2,1)");
statement.execute("flush");
statement.execute("create timeseries root.turbine1.d1.s2 with datatype=BOOLEAN");
statement.execute("insert into root.turbine1.d1(timestamp,s2) values(3,true)");
statement.execute("insert into root.turbine1.d1(timestamp,s2) values(4,true)");
Expand All @@ -342,7 +341,6 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
assertEquals(2, cnt);
}

config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
config.setSeqTsFileSize(tsFileSize);
config.setUnSeqTsFileSize(unFsFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void test2() {
.getConfig()
.getCommonConfig()
.setMaxNumberOfPointsInPage(4)
.setAvgSeriesPointNumberThreshold(2);
.setTargetChunkPointNum(2);
EnvFactory.getEnv().initClusterEnvironment();
String[] expectedHeader = new String[] {TIMESTAMP_STR, count("root.sg2.d1.s1")};
String[] retArray = new String[] {"5,1,", "10,1,", "15,2,", "20,0,", "25,1,"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,6 @@ public void testRecoverWALDeleteSchema() throws Exception {
@Test
public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
config.setAvgSeriesPointNumberThreshold(2);
long tsFileSize = config.getSeqTsFileSize();
long unFsFileSize = config.getSeqTsFileSize();
config.setSeqTsFileSize(10000000);
Expand All @@ -351,6 +349,7 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
"create table turbine1 (id1 string id, s1 int64 measurement, s2 boolean measurement)");
statement.execute("insert into turbine1(time,id1,s1) values(1,\'d1\',1)");
statement.execute("insert into turbine1(time,id1,s1) values(2,\'d1\',2)");
statement.execute("flush");
statement.execute("insert into turbine1(time,id1,s2) values(3,\'d1\',true)");
statement.execute("insert into turbine1(time,id1,s2) values(4,\'d1\',true)");
}
Expand All @@ -373,7 +372,6 @@ public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
assertEquals(2, cnt);
}

config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
config.setSeqTsFileSize(tsFileSize);
config.setUnSeqTsFileSize(unFsFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,6 @@ public class IoTDBConfig {
/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;

/** When average series point number reaches this, flush the memtable to disk */
private int avgSeriesPointNumberThreshold = 100000;

/** Enable inner space compaction for sequence files */
private volatile boolean enableSeqSpaceCompaction = true;

Expand Down Expand Up @@ -488,10 +485,10 @@ public class IoTDBConfig {
/** The target tsfile size in compaction, 2 GB by default */
private long targetCompactionFileSize = 2147483648L;

/** The target chunk size in compaction. */
private long targetChunkSize = 1048576L;
/** The target chunk size in compaction and flushing. */
private long targetChunkSize = 1600000L;

/** The target chunk point num in compaction. */
/** The target chunk point num in compaction and flushing. */
private long targetChunkPointNum = 100000L;

/**
Expand Down Expand Up @@ -2279,14 +2276,6 @@ public void setTvListSortAlgorithm(TVListSortAlgorithm tvListSortAlgorithm) {
this.tvListSortAlgorithm = tvListSortAlgorithm;
}

public int getAvgSeriesPointNumberThreshold() {
return avgSeriesPointNumberThreshold;
}

public void setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold;
}

public boolean isRpcThriftCompressionEnable() {
return rpcThriftCompressionEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
properties.getProperty(
"tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString())));

conf.setAvgSeriesPointNumberThreshold(
Integer.parseInt(
properties.getProperty(
"avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));

conf.setCheckPeriodWhenInsertBlocked(
Integer.parseInt(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
Expand Down Expand Up @@ -96,8 +95,6 @@ public abstract class AbstractMemTable implements IMemTable {

private boolean shouldFlush = false;
private volatile FlushStatus flushStatus = FlushStatus.WORKING;
private final int avgSeriesPointNumThreshold =
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();

/** Memory size of data points, including TEXT values. */
private long memSize = 0;
Expand Down Expand Up @@ -176,7 +173,6 @@ private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet(
for (IMeasurementSchema schema : schemaList) {
if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
}
return memChunkGroup;
Expand All @@ -189,14 +185,12 @@ private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
deviceId,
k -> {
seriesNumber += schemaList.size();
totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) * schemaList.size();
return new AlignedWritableMemChunkGroup(
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()));
});
for (IMeasurementSchema schema : schemaList) {
if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
}
return memChunkGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
private final long MAX_NUMBER_OF_POINTS_IN_CHUNK = CONFIG.getAvgSeriesPointNumberThreshold();
private long MAX_NUMBER_OF_POINTS_IN_CHUNK = CONFIG.getTargetChunkPointNum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that it is not final, its capitalization would better be changed.

private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();

Expand Down Expand Up @@ -330,9 +330,9 @@ public IChunkWriter createIChunkWriter() {
@Override
public void encode(BlockingQueue<Object> ioTaskQueue) {
BitMap rowBitMap = list.getRowBitMap();
int avgBinaryPointSize = list.getAvgBinaryPointSize();
int maxNumberOfPointsInBinaryChunk =
avgBinaryPointSize > 0 ? (int) (TARGET_CHUNK_SIZE / avgBinaryPointSize) : Integer.MAX_VALUE;
int avgPointSizeOfLargestColumn = list.getAvgPointSizeOfLargestColumn();
MAX_NUMBER_OF_POINTS_IN_CHUNK =
Math.min(MAX_NUMBER_OF_POINTS_IN_CHUNK, (TARGET_CHUNK_SIZE / avgPointSizeOfLargestColumn));

boolean[] timeDuplicateInfo = null;

Expand All @@ -355,8 +355,7 @@ public void encode(BlockingQueue<Object> ioTaskQueue) {
pageRange.add(sortedRowIndex);
pointNumInPage = 0;
}
if (pointNumInChunk == MAX_NUMBER_OF_POINTS_IN_CHUNK
|| pointNumInChunk >= maxNumberOfPointsInBinaryChunk) {
if (pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK) {
if (pointNumInPage != 0) {
pageRange.add(sortedRowIndex);
pointNumInPage = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class WritableMemChunk implements IWritableMemChunk {

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
private final long MAX_NUMBER_OF_POINTS_IN_CHUNK = CONFIG.getAvgSeriesPointNumberThreshold();
private final long MAX_NUMBER_OF_POINTS_IN_CHUNK = CONFIG.getTargetChunkPointNum();

public WritableMemChunk(IMeasurementSchema schema) {
this.schema = schema;
Expand Down Expand Up @@ -337,7 +337,7 @@ public void encode(BlockingQueue<Object> ioTaskQueue) {

TSDataType tsDataType = schema.getType();
ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
long binarySizeInCurrentChunk = 0;
long dataSizeInCurrentChunk = 0;
int pointNumInCurrentChunk = 0;
for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) {
long time = list.getTime(sortedRowIndex);
Expand All @@ -355,35 +355,40 @@ public void encode(BlockingQueue<Object> ioTaskQueue) {
switch (tsDataType) {
case BOOLEAN:
chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 1L;
break;
case INT32:
case DATE:
chunkWriterImpl.write(time, list.getInt(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 4L;
break;
case INT64:
case TIMESTAMP:
chunkWriterImpl.write(time, list.getLong(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 8L;
break;
case FLOAT:
chunkWriterImpl.write(time, list.getFloat(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 4L;
break;
case DOUBLE:
chunkWriterImpl.write(time, list.getDouble(sortedRowIndex));
dataSizeInCurrentChunk += 8L + 8L;
break;
case TEXT:
case BLOB:
case STRING:
Binary value = list.getBinary(sortedRowIndex);
chunkWriterImpl.write(time, value);
binarySizeInCurrentChunk += getBinarySize(value);
dataSizeInCurrentChunk += 8L + getBinarySize(value);
break;
default:
LOGGER.error("WritableMemChunk does not support data type: {}", tsDataType);
break;
}
pointNumInCurrentChunk++;
if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK
|| binarySizeInCurrentChunk > TARGET_CHUNK_SIZE) {
|| dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) {
chunkWriterImpl.sealCurrentPage();
chunkWriterImpl.clearPageWriter();
try {
Expand All @@ -392,7 +397,7 @@ public void encode(BlockingQueue<Object> ioTaskQueue) {
Thread.currentThread().interrupt();
}
chunkWriterImpl = createIChunkWriter();
binarySizeInCurrentChunk = 0;
dataSizeInCurrentChunk = 0;
pointNumInCurrentChunk = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1365,20 +1365,22 @@ public BitMap getRowBitMap() {
return new BitMap(rowCount, rowBitsArr);
}

public int getAvgBinaryPointSize() {
long maxSize = 0;
int index = 0;
public int getAvgPointSizeOfLargestColumn() {
int largestPrimitivePointSize = 8; // TimeColumn or int64,double ValueColumn
long largestBinaryChunkSize = 0;
int largestBinaryColumnIndex = 0;
for (int i = 0; i < memoryBinaryChunkSize.length; i++) {
if (memoryBinaryChunkSize[i] > maxSize) {
maxSize = memoryBinaryChunkSize[i];
index = i;
if (memoryBinaryChunkSize[i] > largestBinaryChunkSize) {
largestBinaryChunkSize = memoryBinaryChunkSize[i];
largestBinaryColumnIndex = i;
}
}
if (maxSize == 0) {
return 0;
if (largestBinaryChunkSize == 0) {
return largestPrimitivePointSize;
}
int pointNum = getColumnValueCnt(index);
return (int) (maxSize / pointNum);
int avgPointSizeOfLargestBinaryColumn =
(int) largestBinaryChunkSize / getColumnValueCnt(largestBinaryColumnIndex);
return Math.max(avgPointSizeOfLargestBinaryColumn, largestPrimitivePointSize);
}

public int getColumnValueCnt(int columnIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,8 +1050,6 @@ public void testInsertUnSequenceRows()
QueryProcessException,
DataRegionException,
TsFileProcessorException {
int defaultAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
config.setAvgSeriesPointNumberThreshold(2);
DataRegion dataRegion1 = new DummyDataRegion(systemDir, "root.Rows");
long[] time = new long[] {3, 4, 1, 2};
List<Integer> indexList = new ArrayList<>();
Expand Down Expand Up @@ -1081,7 +1079,6 @@ tmpDeviceId, new MeasurementSchema(measurementId, TSDataType.INT32))),
Assert.assertTrue(resource.isClosed());
}
dataRegion1.syncDeleteDataFiles();
config.setAvgSeriesPointNumberThreshold(defaultAvgSeriesPointNumberThreshold);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class TsFileProcessorTest {
private final Map<String, String> props = Collections.emptyMap();
private QueryContext context;
private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
private int defaultAvgSeriesPointNumberThreshold;
private long defaultTargetChunkPointNum;
private long defaultTargetChunkSize;
private static final Logger logger = LoggerFactory.getLogger(TsFileProcessorTest.class);

Expand All @@ -108,7 +108,7 @@ public void setUp() throws DataRegionException {
if (!file.getParentFile().exists()) {
Assert.assertTrue(file.getParentFile().mkdirs());
}
defaultAvgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
defaultTargetChunkPointNum = config.getTargetChunkPointNum();
defaultTargetChunkSize = config.getTargetChunkSize();
EnvironmentUtils.envSetUp();
sgInfo = new DataRegionInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
Expand All @@ -128,7 +128,7 @@ public void tearDown() throws Exception {
}
} catch (IOException ignored) {
}
config.setAvgSeriesPointNumberThreshold(defaultAvgSeriesPointNumberThreshold);
config.setTargetChunkPointNum(defaultTargetChunkPointNum);
config.setTargetChunkSize(defaultTargetChunkSize);
}

Expand Down Expand Up @@ -210,7 +210,7 @@ public void testWriteAndFlush()
@Test
public void testFlushMultiChunks()
throws IOException, WriteProcessException, MetadataException, ExecutionException {
config.setAvgSeriesPointNumberThreshold(40);
config.setTargetChunkPointNum(40);
processor =
new TsFileProcessor(
storageGroup,
Expand Down Expand Up @@ -364,7 +364,7 @@ public void testFlushMultiBinaryChunks()
@Test
public void testFlushMultiAlignedChunks()
throws IOException, WriteProcessException, MetadataException, ExecutionException {
config.setAvgSeriesPointNumberThreshold(40);
config.setTargetChunkPointNum(40);
processor =
new TsFileProcessor(
storageGroup,
Expand Down
Loading
Loading