Skip to content

Commit abc5876

Browse files
committed
Scenario:
the number of data rows does not change. The data of each column is increasing and changing. At this scenario, it is faster load full data each time using load command. In this case, the query only needs to query the latest segment Need a way to control the table do like this.
1 parent 1e2fc4c commit abc5876

File tree

6 files changed

+492
-14
lines changed

6 files changed

+492
-14
lines changed

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java

+3
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,9 @@ private CarbonCommonConstants() {
484484
// default blocklet size value in MB
485485
public static final String TABLE_BLOCKLET_SIZE_DEFAULT = "64";
486486

487+
// does query with latest segment
488+
public static final String TABLE_QUERY_LATEST_SEGMENT = "query_latest_segment";
489+
487490
/**
488491
* set in column level to disable inverted index
489492
* @Deprecated :This property is deprecated, it is kept just for compatibility

hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java

+62-8
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,7 @@
1818
package org.apache.carbondata.hadoop.api;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayList;
22-
import java.util.Arrays;
23-
import java.util.HashMap;
24-
import java.util.HashSet;
25-
import java.util.LinkedList;
26-
import java.util.List;
27-
import java.util.Map;
21+
import java.util.*;
2822
import java.util.concurrent.ExecutionException;
2923
import java.util.stream.Collectors;
3024

@@ -222,7 +216,9 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
222216
*/
223217
private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments,
224218
boolean validationRequired, ReadCommittedScope readCommittedScope) throws IOException {
225-
Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
219+
// first check for mapreduce.input.carboninputformat.segmentnumbers
220+
// second check for table property of latest_segment for query
221+
Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope, validSegments);
226222
if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) {
227223
return validSegments;
228224
}
@@ -421,9 +417,11 @@ public void updateLoadMetaDataDetailsToSegments(List<Segment> validSegments,
421417

422418
/**
423419
* return valid segment to access
420+
* check for SET carbon.input.segments.<database_name>.<table_name>
424421
*/
425422
public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) {
426423
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
424+
427425
if (segmentString.trim().isEmpty()) {
428426
return new Segment[0];
429427
}
@@ -601,4 +599,60 @@ public String getSegmentIdFromFilePath(String filePath) {
601599
}
602600
return CarbonCommonConstants.INVALID_SEGMENT_ID;
603601
}
602+
603+
/**
604+
* return valid segment to access
605+
* first check for mapreduce.input.carboninputformat.segmentnumbers"
606+
* second check for table property of latest_segment for query
607+
*/
608+
public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope,
609+
List<Segment> validSegments) {
610+
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
611+
boolean queryLatestSegment = false;
612+
if (null != carbonTable) {
613+
queryLatestSegment = Boolean.parseBoolean(carbonTable.getTableInfo()
614+
.getFactTable().getTableProperties()
615+
.getOrDefault(CarbonCommonConstants.TABLE_QUERY_LATEST_SEGMENT, "false"));
616+
}
617+
if (segmentString.trim().isEmpty()) {
618+
if (!queryLatestSegment) {
619+
return new Segment[0];
620+
} else {
621+
List<Segment> segments = getLatestSegment(validSegments);
622+
return segments.toArray(new Segment[0]);
623+
}
624+
} else {
625+
List<Segment> segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope);
626+
if (!queryLatestSegment) {
627+
return segments.toArray(new Segment[0]);
628+
} else {
629+
List<Segment> latestSegment;
630+
if (segments.size() > 0 && segments.get(0).getSegmentNo().equalsIgnoreCase("*")) {
631+
latestSegment = getLatestSegment(validSegments);
632+
} else {
633+
latestSegment = getLatestSegment(segments);
634+
}
635+
return latestSegment.toArray(new Segment[0]);
636+
}
637+
}
638+
}
639+
640+
/**
641+
* get the latest segment
642+
* @param validSegments the in put segment for search
643+
* @return the latest segment for query
644+
*/
645+
public List<Segment> getLatestSegment(List<Segment> validSegments) {
646+
if (validSegments.isEmpty()) {
647+
return Collections.emptyList();
648+
} else {
649+
Segment segment = validSegments.stream().max((a, b) -> {
650+
double aNo = Double.parseDouble(a.getSegmentNo());
651+
double bNo = Double.parseDouble(b.getSegmentNo());
652+
return Double.compare(aNo, bNo);
653+
}).get();
654+
return Collections.singletonList(segment);
655+
}
656+
}
657+
604658
}

integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala

+8-6
Original file line numberDiff line numberDiff line change
@@ -273,17 +273,19 @@ object BroadCastSIFilterPushJoin {
273273
val identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
274274
val readCommittedScope: ReadCommittedScope = carbonTableInputFormat.getReadCommitted(job,
275275
identifier)
276-
val segmentsToAccess: Array[Segment] = carbonTableInputFormat.getSegmentsToAccess(job,
277-
readCommittedScope)
278-
val segmentsToAccessSet: util.Set[Segment] = new util.HashSet[Segment]
279-
for (segId <- segmentsToAccess) {
280-
segmentsToAccessSet.add(segId)
281-
}
282276
// get all valid segments and set them into the configuration
283277
val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
284278
val segments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = segmentStatusManager
285279
.getValidAndInvalidSegments(carbonTable.isMV)
286280
val validSegments: util.List[Segment] = segments.getValidSegments
281+
282+
val segmentsToAccess: Array[Segment] = carbonTableInputFormat.getSegmentsToAccess (job,
283+
readCommittedScope, validSegments)
284+
val segmentsToAccessSet: util.Set[Segment] = new util.HashSet[Segment]
285+
for (segId <- segmentsToAccess) {
286+
segmentsToAccessSet.add(segId)
287+
}
288+
287289
// if no segments in table
288290
val validSegmentsToAccess: util.List[Segment] = new util.ArrayList[Segment]
289291
if (validSegments.size == 0) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData
2+
1,10,1100,48.4,,,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world'
3+
5,17,1140,43.4,,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world'
4+
1,11,1100,44.4,,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world'
5+
1,10,1150,43.4,spark,,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world'
6+
1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world'
7+
3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world'
8+
2,,,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world'
9+
,10,1100,43.4,spark,,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world'
10+
4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,
11+
1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,,'foo'#'bar'#'world'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData
2+
1,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world'
3+
5,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world'
4+
1,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world'
5+
1,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world'
6+
1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world'
7+
3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world'
8+
2,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world'
9+
1,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world'
10+
4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world'
11+
1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world'

0 commit comments

Comments
 (0)