Skip to content

Commit 9b24967

Browse files
authored
Merge pull request #313 from noahisaksen/add-limit-pushdown
Fix #233 Add limit pushdown to attached queries
2 parents 9e9e30c + 1d2c57b commit 9b24967

File tree

4 files changed

+115
-6
lines changed

4 files changed

+115
-6
lines changed

src/include/postgres_scanner.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct PostgresBindData : public FunctionData {
2626
string schema_name;
2727
string table_name;
2828
string sql;
29+
string limit;
2930
idx_t pages_approx = 0;
3031

3132
vector<PostgresType> postgres_types;

src/postgres_scanner.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,15 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
251251
}
252252
if (bind_data->table_name.empty()) {
253253
D_ASSERT(!bind_data->sql.empty());
254-
lstate.sql =
255-
StringUtil::Format(R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)",
256-
col_names, bind_data->sql, filter);
254+
lstate.sql = StringUtil::Format(
255+
R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
256+
col_names, bind_data->sql, filter, bind_data->limit);
257257

258258
} else {
259-
lstate.sql = StringUtil::Format(R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)", col_names,
260-
KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
261-
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter);
259+
lstate.sql = StringUtil::Format(
260+
R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)",
261+
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262+
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
262263
}
263264
lstate.exec = false;
264265
lstate.done = false;

src/storage/postgres_optimizer.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,79 @@
33
#include "storage/postgres_transaction.hpp"
44
#include "storage/postgres_optimizer.hpp"
55
#include "duckdb/planner/operator/logical_get.hpp"
6+
#include "duckdb/planner/operator/logical_limit.hpp"
67
#include "storage/postgres_catalog.hpp"
78
#include "postgres_scanner.hpp"
89

10+
911
namespace duckdb {
1012

1113
struct PostgresOperators {
1214
reference_map_t<PostgresCatalog, vector<reference<LogicalGet>>> scans;
1315
};
1416

17+
static void OptimizePostgresScanLimitPushdown(unique_ptr<LogicalOperator> &op) {
18+
if (op->type == LogicalOperatorType::LOGICAL_LIMIT) {
19+
auto &limit = op->Cast<LogicalLimit>();
20+
reference<LogicalOperator> child = *op->children[0];
21+
22+
while (child.get().type == LogicalOperatorType::LOGICAL_PROJECTION) {
23+
child = *child.get().children[0];
24+
}
25+
26+
if (child.get().type != LogicalOperatorType::LOGICAL_GET) {
27+
OptimizePostgresScanLimitPushdown(op->children[0]);
28+
return;
29+
}
30+
31+
auto &get = child.get().Cast<LogicalGet>();
32+
if (!PostgresCatalog::IsPostgresScan(get.function.name)) {
33+
OptimizePostgresScanLimitPushdown(op->children[0]);
34+
return;
35+
}
36+
37+
switch (limit.limit_val.Type()) {
38+
case LimitNodeType::CONSTANT_VALUE:
39+
case LimitNodeType::UNSET:
40+
break;
41+
default:
42+
// not a constant or unset limit
43+
OptimizePostgresScanLimitPushdown(op->children[0]);
44+
return;
45+
}
46+
switch (limit.offset_val.Type()) {
47+
case LimitNodeType::CONSTANT_VALUE:
48+
case LimitNodeType::UNSET:
49+
break;
50+
default:
51+
// not a constant or unset offset
52+
OptimizePostgresScanLimitPushdown(op->children[0]);
53+
return;
54+
}
55+
56+
auto &bind_data = get.bind_data->Cast<PostgresBindData>();
57+
58+
string generated_limit_clause = "";
59+
if (limit.limit_val.Type() != LimitNodeType::UNSET) {
60+
generated_limit_clause += " LIMIT " + to_string(limit.limit_val.GetConstantValue());
61+
}
62+
if (limit.offset_val.Type() != LimitNodeType::UNSET) {
63+
generated_limit_clause += " OFFSET " + to_string(limit.offset_val.GetConstantValue());
64+
}
65+
66+
if (!generated_limit_clause.empty()) {
67+
bind_data.limit = generated_limit_clause;
68+
69+
op = std::move(op->children[0]);
70+
return;
71+
}
72+
}
73+
74+
for (auto &child : op->children) {
75+
OptimizePostgresScanLimitPushdown(child);
76+
}
77+
}
78+
1579
void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) {
1680
if (op.type == LogicalOperatorType::LOGICAL_GET) {
1781
auto &get = op.Cast<LogicalGet>();
@@ -35,6 +99,8 @@ void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) {
3599
}
36100

37101
void PostgresOptimizer::Optimize(OptimizerExtensionInput &input, unique_ptr<LogicalOperator> &plan) {
102+
// look at query plan and check if we can find LIMIT/OFFSET to pushdown
103+
OptimizePostgresScanLimitPushdown(plan);
38104
// look at the query plan and check if we can enable streaming query scans
39105
PostgresOperators operators;
40106
GatherPostgresScans(*plan, operators);

test/sql/storage/limit.test

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# name: test/sql/storage/limit.test
2+
# description: Test limit on an attached table
3+
# group: [storage]
4+
5+
require postgres_scanner
6+
7+
require-env POSTGRES_TEST_DATABASE_AVAILABLE
8+
9+
statement ok
10+
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)
11+
12+
statement ok
13+
CREATE OR REPLACE TABLE s.large_tbl AS FROM range(100000) t(i)
14+
15+
query I
16+
FROM s.large_tbl LIMIT 5
17+
----
18+
0
19+
1
20+
2
21+
3
22+
4
23+
24+
query I
25+
FROM s.large_tbl LIMIT 5 OFFSET 5
26+
----
27+
5
28+
6
29+
7
30+
8
31+
9
32+
33+
statement ok
34+
set explain_output='optimized_only'
35+
36+
# limit is now not in DuckDB plan, but sent down to Postgres, this checks that
37+
38+
query II
39+
EXPLAIN FROM s.large_tbl LIMIT 5;
40+
----
41+
logical_opt <!REGEX>:.*LIMIT.*

0 commit comments

Comments
 (0)