50
50
import io .prestosql .spi .type .TypeManager ;
51
51
import org .apache .hadoop .conf .Configuration ;
52
52
import org .apache .hadoop .fs .Path ;
53
+ import org .apache .hadoop .hive .serde .serdeConstants ;
54
+ import org .apache .hadoop .hive .serde2 .SerDeUtils ;
53
55
import org .eclipse .jetty .util .URIUtil ;
54
56
55
57
import javax .inject .Inject ;
56
58
57
59
import java .net .URI ;
58
60
import java .util .ArrayList ;
61
+ import java .util .Arrays ;
59
62
import java .util .HashSet ;
60
63
import java .util .List ;
61
64
import java .util .Map ;
75
78
import static io .prestosql .plugin .hive .HivePageSourceProvider .ColumnMapping .toColumnHandles ;
76
79
import static io .prestosql .plugin .hive .HiveUtil .isPartitionFiltered ;
77
80
import static io .prestosql .plugin .hive .coercions .HiveCoercer .createCoercer ;
81
+ import static io .prestosql .plugin .hive .metastore .MetastoreUtil .META_PARTITION_COLUMNS ;
78
82
import static java .util .Objects .requireNonNull ;
79
83
import static java .util .stream .Collectors .toList ;
84
+ import static org .apache .hadoop .hive .metastore .api .hive_metastoreConstants .META_TABLE_COLUMNS ;
80
85
81
86
public class HivePageSourceProvider
82
87
implements ConnectorPageSourceProvider
@@ -170,6 +175,24 @@ private ConnectorPageSource createPageSourceInternal(ConnectorSession session,
170
175
Configuration configuration = hdfsEnvironment .getConfiguration (
171
176
new HdfsEnvironment .HdfsContext (session , hiveSplit .getDatabase (), hiveSplit .getTable ()), path );
172
177
178
+ Properties schema = hiveSplit .getSchema ();
179
+ String columnNameDelimiter = schema .containsKey (serdeConstants .COLUMN_NAME_DELIMITER ) ? schema
180
+ .getProperty (serdeConstants .COLUMN_NAME_DELIMITER ) : String .valueOf (SerDeUtils .COMMA );
181
+ List <String > partitionColumnNames ;
182
+ if (schema .containsKey (META_PARTITION_COLUMNS )) {
183
+ partitionColumnNames = Arrays .asList (schema .getProperty (META_PARTITION_COLUMNS ).split (columnNameDelimiter ));
184
+ }
185
+ else if (schema .containsKey (META_TABLE_COLUMNS )) {
186
+ partitionColumnNames = Arrays .asList (schema .getProperty (META_TABLE_COLUMNS ).split (columnNameDelimiter ));
187
+ }
188
+ else {
189
+ partitionColumnNames = new ArrayList <>();
190
+ }
191
+
192
+ List <String > tableColumns = hiveColumns .stream ().map (cols -> cols .getName ()).collect (toList ());
193
+
194
+ List <String > missingColumns = tableColumns .stream ().filter (cols -> !partitionColumnNames .contains (cols )).collect (toList ());
195
+
173
196
List <IndexMetadata > indexes = new ArrayList <>();
174
197
if (indexCache != null && session .isHeuristicIndexFilterEnabled ()) {
175
198
indexes .addAll (this .indexCache .getIndices (session
@@ -221,7 +244,7 @@ session, hiveSplit, assignUniqueIndicesToPartitionColumns(hiveColumns), typeMana
221
244
hiveTable .getDisjunctCompactEffectivePredicate (),
222
245
hiveSplit .getBucketConversion (),
223
246
hiveSplit .getBucketNumber (),
224
- hiveSplit .getLastModifiedTime ());
247
+ hiveSplit .getLastModifiedTime (), missingColumns );
225
248
}
226
249
227
250
Optional <ConnectorPageSource > pageSource = createHivePageSource (
@@ -249,7 +272,7 @@ session, hiveSplit, assignUniqueIndicesToPartitionColumns(hiveColumns), typeMana
249
272
splitMetadata ,
250
273
hiveSplit .isCacheable (),
251
274
hiveSplit .getLastModifiedTime (),
252
- hiveSplit .getCustomSplitInfo ());
275
+ hiveSplit .getCustomSplitInfo (), missingColumns );
253
276
if (pageSource .isPresent ()) {
254
277
return pageSource .get ();
255
278
}
@@ -291,6 +314,7 @@ private static List<HiveColumnHandle> assignUniqueIndicesToPartitionColumns(List
291
314
* @param predicateColumns Map of all columns handles being part of predicate
292
315
* @param additionPredicates Predicates related to OR clause.
293
316
* Remaining columns are same as for createHivePageSource.
317
+ * @param missingColumns
294
318
* @return
295
319
*/
296
320
private static ConnectorPageSource createSelectivePageSource (
@@ -310,7 +334,7 @@ private static ConnectorPageSource createSelectivePageSource(
310
334
Optional <List <TupleDomain <HiveColumnHandle >>> additionPredicates ,
311
335
Optional <HiveSplit .BucketConversion > bucketConversion ,
312
336
OptionalInt bucketNumber ,
313
- long dataSourceLastModifiedTime )
337
+ long dataSourceLastModifiedTime , List < String > missingColumns )
314
338
{
315
339
Set <HiveColumnHandle > interimColumns = ImmutableSet .<HiveColumnHandle >builder ()
316
340
.addAll (predicateColumns .values ())
@@ -325,7 +349,7 @@ private static ConnectorPageSource createSelectivePageSource(
325
349
split .getColumnCoercions (),
326
350
path ,
327
351
bucketNumber ,
328
- true );
352
+ true , missingColumns );
329
353
330
354
List <ColumnMapping > regularAndInterimColumnMappings = ColumnMapping .extractRegularAndInterimColumnMappings (
331
355
columnMappings );
@@ -411,7 +435,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
411
435
SplitMetadata splitMetadata ,
412
436
boolean splitCacheable ,
413
437
long dataSourceLastModifiedTime ,
414
- Map <String , String > customSplitInfo )
438
+ Map <String , String > customSplitInfo , List < String > missingColumns )
415
439
{
416
440
List <ColumnMapping > columnMappings = ColumnMapping .buildColumnMappings (
417
441
partitionKeys ,
@@ -420,7 +444,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
420
444
columnCoercions ,
421
445
path ,
422
446
bucketNumber ,
423
- true );
447
+ true , missingColumns );
424
448
List <ColumnMapping > regularAndInterimColumnMappings = ColumnMapping .extractRegularAndInterimColumnMappings (
425
449
columnMappings );
426
450
@@ -603,7 +627,7 @@ public ColumnMappingKind getKind()
603
627
public String getPrefilledValue ()
604
628
{
605
629
checkState (kind == ColumnMappingKind .PREFILLED );
606
- return prefilledValue .get ();
630
+ return prefilledValue .isPresent () ? prefilledValue . get () : HIVE_DEFAULT_PARTITION_VALUE ;
607
631
}
608
632
609
633
public HiveColumnHandle getHiveColumnHandle ()
@@ -628,6 +652,7 @@ public Optional<HiveType> getCoercionFrom()
628
652
* @param requiredInterimColumns columns that are needed for processing, but shouldn't be returned to engine (may overlaps with columns)
629
653
* @param columnCoercions map from hive column index to hive type
630
654
* @param bucketNumber empty if table is not bucketed, a number within [0, # bucket in table) otherwise
655
+ * @param missingColumns
631
656
*/
632
657
public static List <ColumnMapping > buildColumnMappings (
633
658
List <HivePartitionKey > partitionKeys ,
@@ -636,7 +661,7 @@ public static List<ColumnMapping> buildColumnMappings(
636
661
Map <Integer , HiveType > columnCoercions ,
637
662
Path path ,
638
663
OptionalInt bucketNumber ,
639
- boolean filterPushDown )
664
+ boolean filterPushDown , List < String > missingColumns )
640
665
{
641
666
Map <String , HivePartitionKey > partitionKeysByName = uniqueIndex (partitionKeys , HivePartitionKey ::getName );
642
667
int regularIndex = 0 ;
@@ -645,6 +670,11 @@ public static List<ColumnMapping> buildColumnMappings(
645
670
for (HiveColumnHandle column : columns ) {
646
671
Optional <HiveType > coercionFrom = Optional .ofNullable (columnCoercions .get (column .getHiveColumnIndex ()));
647
672
if (column .getColumnType () == REGULAR ) {
673
+ if (missingColumns .contains (column .getColumnName ())) {
674
+ columnMappings .add (new ColumnMapping (ColumnMappingKind .PREFILLED , column , Optional .empty (),
675
+ OptionalInt .empty (), coercionFrom ));
676
+ continue ;
677
+ }
648
678
checkArgument (regularColumnIndices .add (column .getHiveColumnIndex ()), "duplicate hiveColumnIndex in columns list" );
649
679
650
680
columnMappings .add (regular (column , regularIndex , coercionFrom ));
0 commit comments