1
1
package co .clflushopt .glint ;
2
2
3
3
import java .io .FileNotFoundException ;
4
+ import java .util .Arrays ;
4
5
import java .util .Iterator ;
6
+ import java .util .List ;
7
+ import java .util .Optional ;
5
8
6
9
import org .apache .arrow .vector .types .pojo .ArrowType ;
7
10
11
+ import co .clflushopt .glint .core .CsvReaderOptions ;
8
12
import co .clflushopt .glint .core .ExecutionContext ;
9
13
import co .clflushopt .glint .dataframe .DataFrame ;
10
14
import co .clflushopt .glint .query .logical .expr .AggregateExpr ;
13
17
import co .clflushopt .glint .query .logical .expr .LogicalExpr ;
14
18
import co .clflushopt .glint .query .logical .plan .LogicalPlan ;
15
19
import co .clflushopt .glint .query .optimizer .QueryOptimizer ;
20
+ import co .clflushopt .glint .types .ArrowTypes ;
21
+ import co .clflushopt .glint .types .Field ;
16
22
import co .clflushopt .glint .types .RecordBatch ;
23
+ import co .clflushopt .glint .types .Schema ;
17
24
18
25
/**
19
26
* Hello world!
@@ -35,9 +42,32 @@ public static void nycTripsBenchmark(String[] args) throws FileNotFoundException
35
42
36
43
long startTime = System .currentTimeMillis ();
37
44
try {
38
-
45
+ // Define the schema for NYC Taxi dataset
46
+ Schema schema = new Schema (Arrays .asList (new Field ("VendorID" , ArrowTypes .Int32Type ),
47
+ new Field ("tpep_pickup_datetime" , ArrowTypes .StringType ), // Could be Timestamp
48
+ new Field ("tpep_dropoff_datetime" , ArrowTypes .StringType ), // Could be Timestamp
49
+ new Field ("passenger_count" , ArrowTypes .Int32Type ),
50
+ new Field ("trip_distance" , ArrowTypes .DoubleType ),
51
+ new Field ("pickup_longitude" , ArrowTypes .DoubleType ),
52
+ new Field ("pickup_latitude" , ArrowTypes .DoubleType ),
53
+ new Field ("RatecodeID" , ArrowTypes .Int32Type ),
54
+ new Field ("store_and_fwd_flag" , ArrowTypes .StringType ),
55
+ new Field ("dropoff_longitude" , ArrowTypes .DoubleType ),
56
+ new Field ("dropoff_latitude" , ArrowTypes .DoubleType ),
57
+ new Field ("payment_type" , ArrowTypes .Int32Type ),
58
+ new Field ("fare_amount" , ArrowTypes .DoubleType ),
59
+ new Field ("extra" , ArrowTypes .DoubleType ),
60
+ new Field ("mta_tax" , ArrowTypes .DoubleType ),
61
+ new Field ("tip_amount" , ArrowTypes .DoubleType ),
62
+ new Field ("tolls_amount" , ArrowTypes .DoubleType ),
63
+ new Field ("improvement_surcharge" , ArrowTypes .DoubleType ),
64
+ new Field ("total_amount" , ArrowTypes .DoubleType )));
39
65
// Create DataFrame and apply transformations
40
- DataFrame df = ctx .readParquet ("./datasets/yellow_tripdata_2019-01.parquet" , null );
66
+ DataFrame df = ctx
67
+ .readCsv ("./datasets/yellow_tripdata_example.csv" , Optional .of (schema ),
68
+ CsvReaderOptions .builder ().delimiter (',' ).hasHeader (true ).build ())
69
+ .aggregate (List .of (col ("passenger_count" )),
70
+ List .of (max (cast (col ("fare_amount" ), ArrowTypes .FloatType ))));
41
71
42
72
System .out .println ("Logical Plan:\t " + LogicalPlan .format (df .getLogicalPlan ()));
43
73
System .out .println ("Schema:\t " + df .getSchema ());
0 commit comments