|
20 | 20 | package org.apache.iotdb.db.storageengine.dataregion.memtable; |
21 | 21 |
|
22 | 22 | import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; |
| 23 | +import org.apache.iotdb.db.conf.IoTDBDescriptor; |
23 | 24 | import org.apache.iotdb.db.exception.query.QueryProcessException; |
24 | 25 | import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; |
25 | 26 | import org.apache.iotdb.db.queryengine.common.PlanFragmentId; |
26 | 27 | import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; |
27 | 28 | import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; |
28 | 29 | import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; |
29 | 30 | import org.apache.iotdb.db.utils.datastructure.AlignedTVList; |
| 31 | +import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo; |
30 | 32 | import org.apache.iotdb.db.utils.datastructure.MemPointIterator; |
31 | 33 | import org.apache.iotdb.db.utils.datastructure.TVList; |
32 | 34 |
|
|
41 | 43 | import org.apache.tsfile.read.filter.operator.LongFilterOperators; |
42 | 44 | import org.apache.tsfile.read.filter.operator.TimeFilterOperators; |
43 | 45 | import org.apache.tsfile.read.reader.series.PaginationController; |
| 46 | +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; |
44 | 47 | import org.apache.tsfile.write.schema.IMeasurementSchema; |
45 | 48 | import org.apache.tsfile.write.schema.VectorMeasurementSchema; |
46 | 49 | import org.junit.AfterClass; |
@@ -952,4 +955,58 @@ private void testSkipTimeRange( |
952 | 955 | } |
953 | 956 | Assert.assertEquals(expectedTimestamps, resultTimestamps); |
954 | 957 | } |
| 958 | + |
| 959 | + @Test |
| 960 | + public void testEncodeBatch() { |
| 961 | + testEncodeBatch(largeSingleTvListMap, 400000); |
| 962 | + testEncodeBatch(largeOrderedMultiTvListMap, 400000); |
| 963 | + testEncodeBatch(largeMergeSortMultiTvListMap, 400000); |
| 964 | + } |
| 965 | + |
| 966 | + private void testEncodeBatch(Map<TVList, Integer> tvListMap, long expectedCount) { |
| 967 | + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema()); |
| 968 | + List<Integer> columnIdxList = Arrays.asList(0, 1, 2); |
| 969 | + IMeasurementSchema measurementSchema = getMeasurementSchema(); |
| 970 | + AlignedReadOnlyMemChunk chunk = |
| 971 | + new AlignedReadOnlyMemChunk( |
| 972 | + fragmentInstanceContext, |
| 973 | + columnIdxList, |
| 974 | + measurementSchema, |
| 975 | + tvListMap, |
| 976 | + Collections.emptyList(), |
| 977 | + Arrays.asList( |
| 978 | + Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); |
| 979 | + chunk.sortTvLists(); |
| 980 | + chunk.initChunkMetaFromTVListsWithFakeStatistics(); |
| 981 | + MemPointIterator memPointIterator = chunk.createMemPointIterator(Ordering.ASC, null); |
| 982 | + BatchEncodeInfo encodeInfo = |
| 983 | + new BatchEncodeInfo( |
| 984 | + 0, 0, 0, 10000, 100000, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()); |
| 985 | + long[] times = new long[10000]; |
| 986 | + long count = 0; |
| 987 | + while (memPointIterator.hasNextBatch()) { |
| 988 | + memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times); |
| 989 | + if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) { |
| 990 | + alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0); |
| 991 | + encodeInfo.pointNumInPage = 0; |
| 992 | + } |
| 993 | + |
| 994 | + if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) { |
| 995 | + alignedChunkWriter.sealCurrentPage(); |
| 996 | + alignedChunkWriter.clearPageWriter(); |
| 997 | + count += alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount(); |
| 998 | + alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema()); |
| 999 | + encodeInfo.reset(); |
| 1000 | + } |
| 1001 | + } |
| 1002 | + // Handle remaining data in the final unsealed chunk |
| 1003 | + if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) { |
| 1004 | + if (encodeInfo.pointNumInPage > 0) { |
| 1005 | + alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0); |
| 1006 | + } |
| 1007 | + alignedChunkWriter.sealCurrentPage(); |
| 1008 | + count += alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount(); |
| 1009 | + } |
| 1010 | + Assert.assertEquals(expectedCount, count); |
| 1011 | + } |
955 | 1012 | } |
0 commit comments