Skip to content

No predicate / filter pushdown when querying arrow streams #392

@jonathanswenson

Description

@jonathanswenson

When an arrow stream is registered and queried, it does not use predicate / filter pushdown.

repro code
package com.acme;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.duckdb.DuckDBConnection;
import org.duckdb.DuckDBDriver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;

public class DuckDBStreamIngestTest {

    private static byte[] createStream(BufferAllocator allocator) throws Exception {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        IntVector intVector = new IntVector("id", allocator);
        VarCharVector stringVector = new VarCharVector("value", allocator);

        try (
                VectorSchemaRoot vsr = new VectorSchemaRoot(List.of(intVector, stringVector));
                ArrowStreamWriter writer = new ArrowStreamWriter(vsr, null, outputStream)
        ) {
            vsr.setRowCount(5);
            for (int i = 0; i < 5; i++) {
                intVector.setSafe(i, i);
                stringVector.setSafe(i, ("v " + Integer.valueOf(i).toString()).getBytes(StandardCharsets.UTF_8));
            }
            writer.writeBatch();
        }

        return outputStream.toByteArray();
    }

    public static void main(final String[] args) throws Exception {
        BufferAllocator allocator = new RootAllocator();
        byte[] bytes = createStream(allocator);

        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);

        ArrowStreamReader arrowReader = new ArrowStreamReader(inputStream, allocator);
        ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(allocator);
        Data.exportArrayStream(allocator, arrowReader, arrowArrayStream);
        DuckDBDriver driver = new DuckDBDriver();
        try (Connection connection = driver.connect("jdbc:duckdb:", new Properties())) {
            DuckDBConnection conn = connection.unwrap(DuckDBConnection.class);
            conn.registerArrowStream("arrow_table", arrowArrayStream);

            try (
                    Statement statement = connection.createStatement();
                    ResultSet resultSet = statement.executeQuery("explain select id from arrow_table where id < 3");
            ) {
                resultSet.next();
                System.out.println(resultSet.getString(2));
            }

        }
    }
}

This generates a query plan that uses ARROW_SCAN_DUMB (and then later does projections and filters).

┌───────────────────────────┐
│         PROJECTION        │
│    ────────────────────   │
│             #0            │
│                           │
│          ~1 Rows          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│           FILTER          │
│    ────────────────────   │
│          (id < 3)         │
│                           │
│          ~1 Rows          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│      ARROW_SCAN_DUMB      │
│    ────────────────────   │
│         Function:         │
│      ARROW_SCAN_DUMB      │
│                           │
│          ~1 Rows          │
└───────────────────────────┘

If you run an equivalent python script:

python
import pyarrow as pa
import duckdb
import io


def create_arrow_stream(table):
    buffer = io.BytesIO()

    with pa.ipc.new_stream(buffer, table.schema) as writer:
        writer.write(table)

    buffer.seek(0)
    return buffer


def main():
    data = {
        'id': [1, 2, 3, 4, 5],
        'value': ['one', 'two', 'three', 'four', 'five']
    }

    table = pa.table(data)

    stream_buffer1 = create_arrow_stream(table)

    with pa.ipc.open_stream(stream_buffer1) as stream1:
        duckdb.register("arrow_stream", stream1)

        sql = "SELECT id FROM arrow_stream where id < 3"

        explain_sql = "EXPLAIN " + sql
        print("Query Plan:")
        print(duckdb.sql(explain_sql).fetchall()[0][1])                                                                                                                                                                                                                                                        

if __name__ == "__main__":
    main()

it produces a query plan that pushes down the projection / filters:

Query Plan:
┌───────────────────────────┐
│        ARROW_SCAN         │
│    ────────────────────   │
│    Function: ARROW_SCAN   │
│      Projections: id      │
│       Filters: id<3       │
│                           │
│          ~1 Rows          │
└───────────────────────────┘

In the duckdb-python code it set the table function to call arrow_scan (and only call arrow_scan_dumb for certain data types) here

In the jdbc driver it appears to simply set it to arrow_scan_dumb:

conn->TableFunction("arrow_scan_dumb", parameters)->CreateView(name, true, true);

I was hoping to avoid pulling the entire stream into memory when not necessary, and to be able to take advantage of predicate / filter pushdown where possible. But it seems as though it is required to pull the entire result into the system (via a CTAS) to utilize these features.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions