Skip to content

Commit 4728de8

Browse files
committed
feat: Add scan plan compilation and fix a couple of bugs here and there
1 parent eee9da6 commit 4728de8

File tree

13 files changed

+175
-21
lines changed

13 files changed

+175
-21
lines changed

glint/src/main/java/co/clflushopt/glint/query/compiler/QueryCompiler.java

+128-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22

33
import java.io.StringReader;
44
import java.lang.reflect.Method;
5+
import java.util.List;
6+
import java.util.stream.Collectors;
57

68
import org.codehaus.commons.compiler.ISimpleCompiler;
79
import org.codehaus.janino.CompilerFactory;
810

11+
import co.clflushopt.glint.query.logical.plan.LogicalPlan;
12+
import co.clflushopt.glint.query.logical.plan.Scan;
13+
914
/**
1015
* Core Query Compiler using Janino for runtime code generation. This class
1116
* provides the fundamental mechanism for generating and compiling executable
@@ -64,13 +69,134 @@ public long execute() {
6469
long totalRecords = 0;
6570
6671
for (RecordBatch batch : dataSource.scan(List.of())) {
67-
totalRecords += batch.getRowSize();
68-
System.out.println("Batch size: " + batch.getRowSize());
72+
totalRecords += batch.getRowCount();
73+
System.out.println("Batch size: " + batch.getRowCount());
6974
}
7075
7176
return totalRecords;
7277
}
7378
}
7479
""", filename);
7580
}
81+
82+
/**
83+
* Compiles and executes a logical plan.
84+
*
85+
* @param logicalPlan The root of the logical plan to compile
86+
* @return The result of executing the compiled plan
87+
* @throws Exception If compilation or execution fails
88+
*/
89+
public Object compile(LogicalPlan logicalPlan) throws Exception {
90+
// Generate source code for the entire logical plan
91+
String sourceCode = generateSourceCode(logicalPlan);
92+
93+
// Create a new simple compiler
94+
ISimpleCompiler compiler = compilerFactory.newSimpleCompiler();
95+
96+
// Set the source version to Java 21
97+
compiler.setSourceVersion(21);
98+
compiler.setTargetVersion(21);
99+
100+
// Cook (compile) the source code
101+
compiler.cook(new StringReader(sourceCode));
102+
103+
// Load the compiled class
104+
Class<?> compiledClass = compiler.getClassLoader()
105+
.loadClass("co.clflushopt.glint.generated.LogicalPlanExecutor");
106+
107+
// Create an instance
108+
Object instance = compiledClass.getDeclaredConstructor().newInstance();
109+
110+
// Find and invoke the execute method
111+
Method executeMethod = compiledClass.getMethod("execute");
112+
return executeMethod.invoke(instance);
113+
}
114+
115+
/**
116+
* Generates source code for a given logical plan.
117+
*
118+
* @param logicalPlan The logical plan to generate code for
119+
* @return Generated Java source code as a string
120+
*/
121+
private String generateSourceCode(LogicalPlan logicalPlan) {
122+
return String.format(
123+
"""
124+
package co.clflushopt.glint.generated;
125+
126+
import co.clflushopt.glint.query.logical.plan.LogicalPlan;
127+
import co.clflushopt.glint.query.logical.plan.Scan;
128+
import co.clflushopt.glint.datasource.DataSource;
129+
import co.clflushopt.glint.types.RecordBatch;
130+
import co.clflushopt.glint.types.Schema;
131+
import java.util.List;
132+
133+
public class LogicalPlanExecutor {
134+
public long execute() {
135+
return executePlan(%s);
136+
}
137+
138+
private long executePlan(LogicalPlan plan) {
139+
// Handle different logical plan types
140+
if (plan instanceof Scan) {
141+
Scan scan = (Scan) plan;
142+
return executeScan(scan);
143+
}
144+
145+
// TODO: Add support for other logical plan types
146+
throw new UnsupportedOperationException("Unsupported logical plan type: " + plan.getClass().getSimpleName());
147+
}
148+
149+
private long executeScan(Scan scan) {
150+
DataSource dataSource = scan.getDataSource();
151+
List<String> projections = scan.getProjections();
152+
153+
long totalRecords = 0;
154+
for (RecordBatch batch : dataSource.scan(projections)) {
155+
totalRecords += batch.getRowCount();
156+
System.out.println("Scan batch size: " + batch.getRowCount() +
157+
" Path: " + scan.getPath());
158+
}
159+
160+
return totalRecords;
161+
}
162+
}
163+
""",
164+
generatePlanArgument(logicalPlan));
165+
}
166+
167+
/**
168+
* Generates a string representation of the logical plan to be used as a method
169+
* argument.
170+
*
171+
* @param plan The logical plan to convert
172+
* @return A string that can be used to reconstruct the plan in the generated
173+
* code
174+
*/
175+
private String generatePlanArgument(LogicalPlan plan) {
176+
if (plan instanceof Scan scan) {
177+
return String.format("new Scan(\"%s\", new %s(\"%s\"), %s)", scan.getPath(),
178+
scan.getDataSource().getClass().getName(), scan.getPath(),
179+
formatProjections(scan.getProjections()));
180+
}
181+
182+
// TODO: Add support for other logical plan types
183+
throw new UnsupportedOperationException(
184+
"Unsupported logical plan type: " + plan.getClass().getSimpleName());
185+
}
186+
187+
/**
188+
* Formats the list of projections as a Java code string.
189+
*
190+
* @param projections List of projection column names
191+
* @return A Java code representation of the projections list
192+
*/
193+
private String formatProjections(List<String> projections) {
194+
if (projections == null || projections.isEmpty()) {
195+
return "List.of()";
196+
}
197+
198+
return "List.of("
199+
+ projections.stream().map(p -> "\"" + p + "\"").collect(Collectors.joining(", "))
200+
+ ")";
201+
}
76202
}

glint/src/main/java/co/clflushopt/glint/query/logical/plan/Scan.java

-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public List<LogicalPlan> getChildren() {
4848

4949
private Schema infer() {
5050
var schema = this.dataSource.getSchema();
51-
assert schema != null;
52-
assert schema.getFields().size() > 0;
5351

5452
if (projections.isEmpty()) {
5553
return schema;

glint/src/main/java/co/clflushopt/glint/query/physical/expr/LiteralDoubleExpr.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public String toString() {
1919

2020
@Override
2121
public ColumnVector eval(RecordBatch input) {
22-
return new LiteralValueVector(ArrowTypes.DoubleType, value, input.getRowSize());
22+
return new LiteralValueVector(ArrowTypes.DoubleType, value, input.getRowCount());
2323
}
2424

2525
}

glint/src/main/java/co/clflushopt/glint/query/physical/expr/LiteralIntExpr.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public String toString() {
1919

2020
@Override
2121
public ColumnVector eval(RecordBatch input) {
22-
return new LiteralValueVector(ArrowTypes.Int32Type, value, input.getRowSize());
22+
return new LiteralValueVector(ArrowTypes.Int32Type, value, input.getRowCount());
2323
}
2424

2525
}

glint/src/main/java/co/clflushopt/glint/query/physical/expr/LiteralLongExpr.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public String toString() {
1919

2020
@Override
2121
public ColumnVector eval(RecordBatch input) {
22-
return new LiteralValueVector(ArrowTypes.Int64Type, value, input.getRowSize());
22+
return new LiteralValueVector(ArrowTypes.Int64Type, value, input.getRowCount());
2323
}
2424

2525
}

glint/src/main/java/co/clflushopt/glint/query/physical/expr/LiteralStringExpr.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public String toString() {
1919

2020
@Override
2121
public ColumnVector eval(RecordBatch input) {
22-
return new LiteralValueVector(ArrowTypes.StringType, value.getBytes(), input.getRowSize());
22+
return new LiteralValueVector(ArrowTypes.StringType, value.getBytes(), input.getRowCount());
2323
}
2424

2525
}

glint/src/main/java/co/clflushopt/glint/query/physical/expr/PhysicalCastExpr.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public String toString() {
3636
@Override
3737
public ColumnVector eval(RecordBatch input) {
3838
ColumnVector value = expression.eval(input);
39-
FieldVector fieldVector = FieldVectorFactory.create(type, input.getRowSize());
39+
FieldVector fieldVector = FieldVectorFactory.create(type, input.getRowCount());
4040
ArrowVectorBuilder builder = new ArrowVectorBuilder(fieldVector);
4141

4242
if (type.equals(ArrowTypes.Int8Type)) {

glint/src/main/java/co/clflushopt/glint/query/physical/plan/HashAggregateOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Iterator<RecordBatch> execute() {
6969
.map(expr -> expr.getInputExpr().eval(batch)).collect(Collectors.toList());
7070

7171
// Process each row in the batch
72-
for (int rowIndex = 0; rowIndex < batch.getRowSize(); rowIndex++) {
72+
for (int rowIndex = 0; rowIndex < batch.getRowCount(); rowIndex++) {
7373
// Create final variable for lambda.
7474
final int currentRow = rowIndex;
7575
// Create key for hash map

glint/src/main/java/co/clflushopt/glint/types/RecordBatch.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public ColumnVector getField(int i) {
3636
*
3737
* @return `int`.
3838
*/
39-
public int getRowSize() {
39+
public int getRowCount() {
4040
return fields.getFirst().getSize();
4141
}
4242

@@ -45,7 +45,7 @@ public int getRowSize() {
4545
*
4646
* @return `int`.
4747
*/
48-
public int getColumnSize() {
48+
public int getColumnCount() {
4949
return fields.size();
5050
}
5151

@@ -64,10 +64,10 @@ public Schema getSchema() {
6464
*/
6565
public String toCsv() {
6666
StringBuilder sb = new StringBuilder();
67-
for (int i = 0; i < getRowSize(); i++) {
68-
for (int j = 0; j < getColumnSize(); j++) {
67+
for (int i = 0; i < getRowCount(); i++) {
68+
for (int j = 0; j < getColumnCount(); j++) {
6969
sb.append(getField(j).getValue(i));
70-
if (j < getColumnSize() - 1) {
70+
if (j < getColumnCount() - 1) {
7171
sb.append(",");
7272
}
7373
}

glint/src/test/java/co/clflushopt/glint/datasource/CsvDataSourceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void canProcessSimpleCSvFileWithHeaderAndSchema() {
133133
// First batch should contain first two rows
134134
assertTrue("Should have first batch", batchIterator.hasNext());
135135
RecordBatch batch1 = batchIterator.next();
136-
assertEquals("First batch should have 2 rows", 2, batch1.getRowSize());
136+
assertEquals("First batch should have 2 rows", 2, batch1.getRowCount());
137137

138138
// Verify first row
139139
assertEquals(1L, getLongValue(batch1, "id", 0));
@@ -154,7 +154,7 @@ public void canProcessSimpleCSvFileWithHeaderAndSchema() {
154154
// Second batch should contain remaining two rows
155155
assertTrue("Should have second batch", batchIterator.hasNext());
156156
RecordBatch batch2 = batchIterator.next();
157-
assertEquals("Second batch should have 2 rows", 2, batch2.getRowSize());
157+
assertEquals("Second batch should have 2 rows", 2, batch2.getRowCount());
158158

159159
// Verify third row
160160
assertEquals(3L, getLongValue(batch2, "id", 0));
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,47 @@
11
package co.clflushopt.glint.query.compiler;
22

3+
import static org.junit.Assert.fail;
4+
35
import java.nio.file.Path;
6+
import java.util.Collections;
47

58
import org.junit.Test;
69

10+
import co.clflushopt.glint.datasource.ParquetDataSource;
11+
import co.clflushopt.glint.query.logical.plan.Scan;
12+
713
public class QueryCompilerTest {
814
@Test
9-
public void canCompileBasicScanOperator() throws Exception {
15+
public void canCompileInlinedParquetScan() throws Exception {
1016
Path path = Path.of("../datasets/yellow_tripdata_2019-01.parquet");
1117
String filename = path.toAbsolutePath().toString();
1218
QueryCompiler c = new QueryCompiler();
1319
String scanCompiledQuery = c.generateParquetScanSourceCode(filename);
1420
c.compileAndRun(scanCompiledQuery);
1521
}
1622

23+
@Test
24+
public void canCompileScanPlan() throws Exception {
25+
try {
26+
Path path = Path.of("../datasets/yellow_tripdata_2019-01.parquet");
27+
String filename = path.toAbsolutePath().toString();
28+
// Create a sample schema
29+
// Create a Parquet data source
30+
ParquetDataSource dataSource = new ParquetDataSource(filename);
31+
32+
// Create a Scan logical plan
33+
Scan scanPlan = new Scan(filename, dataSource, Collections.emptyList());
34+
35+
// Instantiate the logical plan compiler
36+
QueryCompiler compiler = new QueryCompiler();
37+
38+
// Compile and execute the logical plan
39+
Object result = compiler.compile(scanPlan);
40+
System.out.println("Total Records Read: " + result);
41+
42+
} catch (Exception e) {
43+
fail(e.getMessage() + "\n" + e.getStackTrace());
44+
}
45+
}
46+
1747
}

glint/src/test/java/co/clflushopt/glint/query/physical/FilterOperatorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void testFilter() {
3737
// Verify results
3838
assertTrue(result.hasNext());
3939
RecordBatch batch = result.next();
40-
assertEquals(1, batch.getRowSize());
40+
assertEquals(1, batch.getRowCount());
4141
assertEquals("Charlie", batch.getField(1).getValue(0));
4242
assertEquals(35, Integer.parseInt((String) batch.getField(2).getValue(0)));
4343
}

glint/src/test/java/co/clflushopt/glint/query/physical/ScanOperatorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void testScanAllColumns() {
3434
assertTrue(result.hasNext());
3535
RecordBatch batch = result.next();
3636
assertEquals(3, batch.getSchema().getFields().size());
37-
assertEquals(3, batch.getRowSize());
37+
assertEquals(3, batch.getRowCount());
3838
assertEquals("Alice", batch.getField(1).getValue(0));
3939
assertFalse(result.hasNext());
4040
}
@@ -58,7 +58,7 @@ public void testScanWithProjection() {
5858
assertEquals(3, batch.getSchema().getFields().size());
5959
assertEquals("id", batch.getSchema().getFields().get(0).name());
6060
assertEquals("name", batch.getSchema().getFields().get(1).name());
61-
assertEquals(3, batch.getRowSize());
61+
assertEquals(3, batch.getRowCount());
6262
assertEquals(Integer.valueOf(1), batch.getField(0).getValue(0));
6363
assertEquals(25, batch.getField(2).getValue(0));
6464
assertFalse(result.hasNext());

0 commit comments

Comments
 (0)