Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ public class IoTDBConfig {
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;

/** time cost(ms) threshold for slow query. Unit: millisecond */
private long slowQueryThreshold = 30000;
private long slowQueryThreshold = 10000;

private int patternMatchingThreshold = 1000000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DeviceLastCache;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
Expand Down Expand Up @@ -2956,7 +2957,6 @@ private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(

@Override
public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
DATA_NODE_SCHEMA_CACHE.cleanUp();
final PartialPath devicePath = node.getDevicePath();
List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas();
List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
Expand Down Expand Up @@ -2984,7 +2984,7 @@ public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanCon

if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
} else if (timeValuePair.getValue() == null) {
} else if (timeValuePair.getValue() == DeviceLastCache.EMPTY_PRIMITIVE_TYPE) {
// there is no data for this time series, just ignore
} else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
// cached last value is not satisfied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ private Object getAlignedValueForQuery(
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, encodingList);
}

@SuppressWarnings("java:S6541")
private TsPrimitiveType getAlignedValueByValueIndex(
int valueIndex,
int[] validIndexesForTimeDuplicatedRows,
Expand Down Expand Up @@ -920,6 +921,7 @@ public static long valueListArrayMemCost(TSDataType type) {
}

/** Build TsBlock by column. */
@SuppressWarnings("java:S6541")
public TsBlock buildTsBlock(
int floatPrecision, List<TSEncoding> encodingList, List<List<TimeRange>> deletionList) {
TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
Expand Down Expand Up @@ -1155,6 +1157,7 @@ public void serializeToWAL(IWALByteBufferView buffer) {
}
}

@SuppressWarnings("java:S6541")
public static AlignedTVList deserialize(DataInputStream stream) throws IOException {
TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
if (dataType != TSDataType.VECTOR) {
Expand Down Expand Up @@ -1429,6 +1432,7 @@ public AlignedTVListIterator(
}

@Override
@SuppressWarnings("java:S6541")
protected void prepareNext() {
// find the first row that is neither deleted nor empty (all NULL values)
findValidRow = false;
Expand Down Expand Up @@ -1602,6 +1606,7 @@ public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int columnIndex) {
}

@Override
@SuppressWarnings("java:S6541")
public boolean hasNextBatch() {
if (!paginationController.hasCurLimit()) {
return false;
Expand Down Expand Up @@ -1884,11 +1889,18 @@ private boolean needRebuildTsBlock(boolean[] hasAnyNonNullValue) {
}

@Override
@SuppressWarnings("java:S6541")
public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, long[] times) {
int maxRowCountOfCurrentBatch =
Math.min(
rows - index,
Math.min(
(int) encodeInfo.maxNumberOfPointsInChunk - encodeInfo.pointNumInChunk, // NOSONAR
encodeInfo.maxNumberOfPointsInPage - encodeInfo.pointNumInPage));
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;

// duplicated time or deleted time are all invalid, true if we don't need this row
BitMap timeDuplicateInfo = null;
LazyBitMap timeDuplicateInfo = null;

int startIndex = index;
// time column
Expand All @@ -1914,7 +1926,7 @@ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, lo
encodeInfo.pointNumInChunk++;
} else {
if (Objects.isNull(timeDuplicateInfo)) {
timeDuplicateInfo = new BitMap(rows);
timeDuplicateInfo = new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1);
}
timeDuplicateInfo.mark(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;

import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.TVList;

Expand All @@ -41,6 +43,7 @@
import org.apache.tsfile.read.filter.operator.LongFilterOperators;
import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
import org.apache.tsfile.read.reader.series.PaginationController;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.junit.AfterClass;
Expand Down Expand Up @@ -895,4 +898,57 @@ private void testSkipTimeRange(
}
Assert.assertEquals(expectedTimestamps, resultTimestamps);
}

@Test
public void testEncodeBatch() {
testEncodeBatch(largeSingleTvListMap, 400000);
testEncodeBatch(largeOrderedMultiTvListMap, 400000);
testEncodeBatch(largeMergeSortMultiTvListMap, 400000);
}

private void testEncodeBatch(Map<TVList, Integer> tvListMap, long expectedCount) {
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema());
List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
IMeasurementSchema measurementSchema = getMeasurementSchema();
AlignedReadOnlyMemChunk chunk =
new AlignedReadOnlyMemChunk(
fragmentInstanceContext,
columnIdxList,
measurementSchema,
tvListMap,
Arrays.asList(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
chunk.sortTvLists();
chunk.initChunkMetaFromTVListsWithFakeStatistics();
MemPointIterator memPointIterator = chunk.createMemPointIterator(Ordering.ASC, null);
BatchEncodeInfo encodeInfo =
new BatchEncodeInfo(
0, 0, 0, 10000, 100000, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
long[] times = new long[10000];
long count = 0;
while (memPointIterator.hasNextBatch()) {
memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
encodeInfo.pointNumInPage = 0;
}

if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
alignedChunkWriter.sealCurrentPage();
count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
alignedChunkWriter.clearPageWriter();
alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema());
encodeInfo.reset();
}
}
// Handle remaining data in the final unsealed chunk
if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) {
if (encodeInfo.pointNumInPage > 0) {
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
}
alignedChunkWriter.sealCurrentPage();
count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
}
Assert.assertEquals(expectedCount, count);
}
}
Loading