2424import org .apache .doris .catalog .Table ;
2525import org .apache .doris .common .AnalysisException ;
2626import org .apache .doris .common .Config ;
27+ import org .apache .doris .common .Pair ;
2728import org .apache .doris .common .UserException ;
2829import org .apache .doris .common .util .BrokerUtil ;
2930import org .apache .doris .common .util .Util ;
5354
5455import java .net .URI ;
5556import java .util .ArrayList ;
57+ import java .util .Comparator ;
5658import java .util .List ;
59+ import java .util .PriorityQueue ;
60+ import java .util .stream .Collectors ;
61+ import java .util .stream .IntStream ;
5762
5863/**
5964 * FileTable encapsulates a set of files to be scanned into a Table like structure,
@@ -84,6 +89,7 @@ public enum JobType {
8489 private boolean strictMode ;
8590 private int loadParallelism ;
8691 // set by getFileStatusAndCalcInstance
92+ private int numInstances = 1 ;
8793 private long bytesPerInstance = 0 ;
8894 // used for stream load, FILE_LOCAL or FILE_STREAM
8995 private TFileType fileType ;
@@ -189,7 +195,6 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
189195 throw new UserException ("No source file in this table(" + targetTable .getName () + ")." );
190196 }
191197
192- int numInstances = 1 ;
193198 if (jobType == JobType .BULK_LOAD ) {
194199 long totalBytes = 0 ;
195200 for (TBrokerFileStatus fileStatus : fileStatuses ) {
@@ -208,6 +213,7 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
208213 }
209214 } else {
210215 // stream load, not need to split
216+ numInstances = 1 ;
211217 bytesPerInstance = Long .MAX_VALUE ;
212218 }
213219 LOG .info ("number instance of file scan node is: {}, bytes per instance: {}" , numInstances , bytesPerInstance );
@@ -216,6 +222,75 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
216222 public void createScanRangeLocations (FileLoadScanNode .ParamCreateContext context ,
217223 FederationBackendPolicy backendPolicy ,
218224 List <TScanRangeLocations > scanRangeLocations ) throws UserException {
225+ // Currently, we do not support mixed file types (or compress types).
226+ // If any of the file is unsplittable, all files will be treated as unsplittable.
227+ boolean isSplittable = true ;
228+ for (TBrokerFileStatus fileStatus : fileStatuses ) {
229+ TFileFormatType formatType = formatType (context .fileGroup .getFileFormat (), fileStatus .path );
230+ TFileCompressType compressType =
231+ Util .getOrInferCompressType (context .fileGroup .getCompressType (), fileStatus .path );
232+ // Now only support split plain text
233+ if (compressType == TFileCompressType .PLAIN
234+ && ((formatType == TFileFormatType .FORMAT_CSV_PLAIN && fileStatus .isSplitable )
235+ || formatType == TFileFormatType .FORMAT_JSON )) {
236+ // is splittable
237+ } else {
238+ isSplittable = false ;
239+ break ;
240+ }
241+ }
242+
243+ if (isSplittable ) {
244+ createScanRangeLocationsSplittable (context , backendPolicy , scanRangeLocations );
245+ } else {
246+ createScanRangeLocationsUnsplittable (context , backendPolicy , scanRangeLocations );
247+ }
248+ }
249+
250+ public void createScanRangeLocationsUnsplittable (FileLoadScanNode .ParamCreateContext context ,
251+ FederationBackendPolicy backendPolicy ,
252+ List <TScanRangeLocations > scanRangeLocations )
253+ throws UserException {
254+ List <Long > fileSizes = fileStatuses .stream ().map (x -> x .size ).collect (Collectors .toList ());
255+ List <List <Integer >> groups = assignFilesToInstances (fileSizes , numInstances );
256+ for (List <Integer > group : groups ) {
257+ TScanRangeLocations locations = newLocations (context .params , brokerDesc , backendPolicy );
258+ for (int i : group ) {
259+ TBrokerFileStatus fileStatus = fileStatuses .get (i );
260+ TFileFormatType formatType = formatType (context .fileGroup .getFileFormat (), fileStatus .path );
261+ context .params .setFormatType (formatType );
262+ TFileCompressType compressType =
263+ Util .getOrInferCompressType (context .fileGroup .getCompressType (), fileStatus .path );
264+ context .params .setCompressType (compressType );
265+ List <String > columnsFromPath = BrokerUtil .parseColumnsFromPath (fileStatus .path ,
266+ context .fileGroup .getColumnNamesFromPath ());
267+ TFileRangeDesc rangeDesc = createFileRangeDesc (0 , fileStatus , fileStatus .size , columnsFromPath );
268+ locations .getScanRange ().getExtScanRange ().getFileScanRange ().addToRanges (rangeDesc );
269+ }
270+ scanRangeLocations .add (locations );
271+ }
272+ }
273+
274+ public static List <List <Integer >> assignFilesToInstances (List <Long > fileSizes , int instances ) {
275+ int n = Math .min (fileSizes .size (), instances );
276+ PriorityQueue <Pair <Long , List <Integer >>> pq = new PriorityQueue <>(n , Comparator .comparingLong (Pair ::key ));
277+ for (int i = 0 ; i < n ; i ++) {
278+ pq .add (Pair .of (0L , new ArrayList <>()));
279+ }
280+ List <Integer > index = IntStream .range (0 , fileSizes .size ()).boxed ().collect (Collectors .toList ());
281+ index .sort ((i , j ) -> Long .compare (fileSizes .get (j ), fileSizes .get (i )));
282+ for (int i : index ) {
283+ Pair <Long , List <Integer >> p = pq .poll ();
284+ p .value ().add (i );
285+ pq .add (Pair .of (p .key () + fileSizes .get (i ), p .value ()));
286+ }
287+ return pq .stream ().map (Pair ::value ).collect (Collectors .toList ());
288+ }
289+
290+ public void createScanRangeLocationsSplittable (FileLoadScanNode .ParamCreateContext context ,
291+ FederationBackendPolicy backendPolicy ,
292+ List <TScanRangeLocations > scanRangeLocations ) throws UserException {
293+
219294 TScanRangeLocations curLocations = newLocations (context .params , brokerDesc , backendPolicy );
220295 long curInstanceBytes = 0 ;
221296 long curFileOffset = 0 ;
@@ -234,27 +309,16 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context
234309 // Assign scan range locations only for broker load.
235310 // stream load has only one file, and no need to set multi scan ranges.
236311 if (tmpBytes > bytesPerInstance && jobType != JobType .STREAM_LOAD ) {
237- // Now only support split plain text
238- if (compressType == TFileCompressType .PLAIN
239- && ((formatType == TFileFormatType .FORMAT_CSV_PLAIN && fileStatus .isSplitable )
240- || formatType == TFileFormatType .FORMAT_JSON )) {
241- long rangeBytes = bytesPerInstance - curInstanceBytes ;
242- TFileRangeDesc rangeDesc = createFileRangeDesc (curFileOffset , fileStatus , rangeBytes ,
243- columnsFromPath );
244- curLocations .getScanRange ().getExtScanRange ().getFileScanRange ().addToRanges (rangeDesc );
245- curFileOffset += rangeBytes ;
246- } else {
247- TFileRangeDesc rangeDesc = createFileRangeDesc (0 , fileStatus , leftBytes ,
248- columnsFromPath );
249- curLocations .getScanRange ().getExtScanRange ().getFileScanRange ().addToRanges (rangeDesc );
250- i ++;
251- }
312+ long rangeBytes = bytesPerInstance - curInstanceBytes ;
313+ TFileRangeDesc rangeDesc = createFileRangeDesc (curFileOffset , fileStatus , rangeBytes ,
314+ columnsFromPath );
315+ curLocations .getScanRange ().getExtScanRange ().getFileScanRange ().addToRanges (rangeDesc );
316+ curFileOffset += rangeBytes ;
252317
253318 // New one scan
254319 scanRangeLocations .add (curLocations );
255320 curLocations = newLocations (context .params , brokerDesc , backendPolicy );
256321 curInstanceBytes = 0 ;
257-
258322 } else {
259323 TFileRangeDesc rangeDesc = createFileRangeDesc (curFileOffset , fileStatus , leftBytes , columnsFromPath );
260324 curLocations .getScanRange ().getExtScanRange ().getFileScanRange ().addToRanges (rangeDesc );
0 commit comments