|
| 1 | +# Dynamic filtering |
| 2 | + |
| 3 | +Dynamic filtering optimizations significantly improve the performance of queries |
| 4 | +with selective joins by avoiding reading of data that would be filtered by join condition. |
| 5 | + |
| 6 | +Consider the following query which captures a common pattern of a fact table `store_sales` |
| 7 | +joined with a filtered dimension table `date_dim`: |
| 8 | + |
| 9 | +> SELECT count(\*) |
| 10 | +> FROM store_sales |
| 11 | +> JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk |
| 12 | +> WHERE d_following_holiday='Y' AND d_year = 2000; |
| 13 | + |
| 14 | +Without dynamic filtering, Trino pushes predicates for the dimension table to the |
| 15 | +table scan on `date_dim`, and it scans all the data in the fact table since there |
| 16 | +are no filters on `store_sales` in the query. The join operator ends up throwing away |
| 17 | +most of the probe-side rows as the join criteria is highly selective. |
| 18 | + |
| 19 | +When dynamic filtering is enabled, Trino collects candidate values for join condition |
| 20 | +from the processed dimension table on the right side of join. In the case of broadcast joins, |
| 21 | +the runtime predicates generated from this collection are pushed into the local table scan |
| 22 | +on the left side of the join running on the same worker. |
| 23 | + |
| 24 | +Additionally, these runtime predicates are communicated to the coordinator over the network |
| 25 | +so that dynamic filtering can also be performed on the coordinator during enumeration of |
| 26 | +table scan splits. |
| 27 | + |
| 28 | +For example, in the case of the Hive connector, dynamic filters are used |
| 29 | +to skip loading of partitions which don't match the join criteria. |
| 30 | +This is known as **dynamic partition pruning**. |
| 31 | + |
| 32 | +After completing the collection of dynamic filters, the coordinator also distributes them |
| 33 | +to worker nodes over the network for partitioned joins. This allows push down of dynamic |
| 34 | +filters from partitioned joins into the table scans on the left side of that join. |
| 35 | + |
| 36 | +The results of dynamic filtering optimization can include the following benefits: |
| 37 | + |
| 38 | +- improved overall query performance |
| 39 | +- reduced network traffic between Trino and the data source |
| 40 | +- reduced load on the remote data source |
| 41 | + |
| 42 | +Dynamic filtering is enabled by default. It can be disabled by setting either the |
| 43 | +`enable-dynamic-filtering` configuration property, or the session property |
| 44 | +`enable_dynamic_filtering` to `false`. |
| 45 | + |
| 46 | +Support for push down of dynamic filters is specific to each connector, |
| 47 | +and the relevant underlying database or storage system. The documentation for |
| 48 | +specific connectors with support for dynamic filtering includes further details, |
| 49 | +for example the {ref}`Hive connector <hive-dynamic-filtering>` |
| 50 | +or the {ref}`Memory connector <memory-dynamic-filtering>`. |
| 51 | + |
| 52 | +## Analysis and confirmation |
| 53 | + |
| 54 | +Dynamic filtering depends on a number of factors: |
| 55 | + |
| 56 | +- Planner support for dynamic filtering for a given join operation in Trino. |
| 57 | + Currently inner and right joins with `=`, `<`, `<=`, `>`, `>=` or |
| 58 | + `IS NOT DISTINCT FROM` join conditions, and |
| 59 | + semi-joins with `IN` conditions are supported. |
| 60 | +- Connector support for utilizing dynamic filters pushed into the table scan at runtime. |
| 61 | + For example, the Hive connector can push dynamic filters into ORC and Parquet readers |
| 62 | + to perform stripe or row-group pruning. |
| 63 | +- Connector support for utilizing dynamic filters at the splits enumeration stage. |
| 64 | +- Size of right (build) side of the join. |
| 65 | + |
| 66 | +You can take a closer look at the {doc}`EXPLAIN plan </sql/explain>` of the query |
| 67 | +to analyze if the planner is adding dynamic filters to a specific query's plan. |
| 68 | +For example, the explain plan for the above query can be obtained by running |
| 69 | +the following statement: |
| 70 | + |
| 71 | +``` |
| 72 | +EXPLAIN |
| 73 | +SELECT count(*) |
| 74 | +FROM store_sales |
| 75 | +JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk |
| 76 | +WHERE d_following_holiday='Y' AND d_year = 2000; |
| 77 | +``` |
| 78 | + |
| 79 | +The explain plan for this query shows `dynamicFilterAssignments` in the |
| 80 | +`InnerJoin` node with dynamic filter `df_370` collected from build symbol `d_date_sk`. |
| 81 | +You can also see the `dynamicFilter` predicate as part of the Hive `ScanFilterProject` |
| 82 | +operator where `df_370` is associated with probe symbol `ss_sold_date_sk`. |
| 83 | +This shows you that the planner is successful in pushing dynamic filters |
| 84 | +down to the connector in the query plan. |
| 85 | + |
| 86 | +```text |
| 87 | +... |
| 88 | + |
| 89 | +Fragment 1 [SOURCE] |
| 90 | + Output layout: [count_3] |
| 91 | + Output partitioning: SINGLE [] |
| 92 | + Aggregate(PARTIAL) |
| 93 | + │ Layout: [count_3:bigint] |
| 94 | + │ count_3 := count(*) |
| 95 | + └─ InnerJoin[(""ss_sold_date_sk"" = ""d_date_sk"")][$hashvalue, $hashvalue_4] |
| 96 | + │ Layout: [] |
| 97 | + │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} |
| 98 | + │ Distribution: REPLICATED |
| 99 | + │ dynamicFilterAssignments = {d_date_sk -> #df_370} |
| 100 | + ├─ ScanFilterProject[table = hive:default:store_sales, grouped = false, filterPredicate = true, dynamicFilters = {""ss_sold_date_sk"" = #df_370}] |
| 101 | + │ Layout: [ss_sold_date_sk:bigint, $hashvalue:bigint] |
| 102 | + │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} |
| 103 | + │ $hashvalue := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""ss_sold_date_sk""), 0)) |
| 104 | + │ ss_sold_date_sk := ss_sold_date_sk:bigint:REGULAR |
| 105 | + └─ LocalExchange[HASH][$hashvalue_4] (""d_date_sk"") |
| 106 | + │ Layout: [d_date_sk:bigint, $hashvalue_4:bigint] |
| 107 | + │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} |
| 108 | + └─ RemoteSource[2] |
| 109 | + Layout: [d_date_sk:bigint, $hashvalue_5:bigint] |
| 110 | + |
| 111 | +Fragment 2 [SOURCE] |
| 112 | + Output layout: [d_date_sk, $hashvalue_6] |
| 113 | + Output partitioning: BROADCAST [] |
| 114 | + ScanFilterProject[table = hive:default:date_dim, grouped = false, filterPredicate = ((""d_following_holiday"" = CAST('Y' AS char(1))) AND (""d_year"" = 2000))] |
| 115 | + Layout: [d_date_sk:bigint, $hashvalue_6:bigint] |
| 116 | + Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} |
| 117 | + $hashvalue_6 := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""d_date_sk""), 0)) |
| 118 | + d_following_holiday := d_following_holiday:char(1):REGULAR |
| 119 | + d_date_sk := d_date_sk:bigint:REGULAR |
| 120 | + d_year := d_year:int:REGULAR |
| 121 | +``` |
| 122 | + |
| 123 | +During execution of a query with dynamic filters, Trino populates statistics |
| 124 | +about dynamic filters in the QueryInfo JSON available through the |
| 125 | +{doc}`/admin/web-interface`. |
| 126 | +In the `queryStats` section, statistics about dynamic filters collected |
| 127 | +by the coordinator can be found in the `dynamicFiltersStats` structure. |
| 128 | + |
| 129 | +```text |
| 130 | +"dynamicFiltersStats" : { |
| 131 | + "dynamicFilterDomainStats" : [ { |
| 132 | + "dynamicFilterId" : "df_370", |
| 133 | + "simplifiedDomain" : "[ SortedRangeSet[type=bigint, ranges=3, {[2451546], ..., [2451905]}] ]", |
| 134 | + "collectionDuration" : "2.34s" |
| 135 | + } ], |
| 136 | + "lazyDynamicFilters" : 1, |
| 137 | + "replicatedDynamicFilters" : 1, |
| 138 | + "totalDynamicFilters" : 1, |
| 139 | + "dynamicFiltersCompleted" : 1 |
| 140 | +} |
| 141 | +``` |
| 142 | + |
| 143 | +Push down of dynamic filters into a table scan on the worker nodes can be |
| 144 | +verified by looking at the operator statistics for that table scan. |
| 145 | +`dynamicFilterSplitsProcessed` records the number of splits |
| 146 | +processed after a dynamic filter is pushed down to the table scan. |
| 147 | + |
| 148 | +```text |
| 149 | +"operatorType" : "ScanFilterAndProjectOperator", |
| 150 | +"totalDrivers" : 1, |
| 151 | +"addInputCalls" : 762, |
| 152 | +"addInputWall" : "0.00ns", |
| 153 | +"addInputCpu" : "0.00ns", |
| 154 | +"physicalInputDataSize" : "0B", |
| 155 | +"physicalInputPositions" : 28800991, |
| 156 | +"inputPositions" : 28800991, |
| 157 | +"dynamicFilterSplitsProcessed" : 1, |
| 158 | +``` |
| 159 | + |
| 160 | +Dynamic filters are reported as a part of the |
| 161 | +{doc}`EXPLAIN ANALYZE plan </sql/explain-analyze>` in the statistics for |
| 162 | +`ScanFilterProject` nodes. |
| 163 | + |
| 164 | +```text |
| 165 | +... |
| 166 | + |
| 167 | + └─ InnerJoin[("ss_sold_date_sk" = "d_date_sk")][$hashvalue, $hashvalue_4] |
| 168 | + │ Layout: [] |
| 169 | + │ Estimates: {rows: 11859 (0B), cpu: 8.84M, memory: 3.19kB, network: 3.19kB} |
| 170 | + │ CPU: 78.00ms (30.00%), Scheduled: 295.00ms (47.05%), Output: 296 rows (0B) |
| 171 | + │ Left (probe) Input avg.: 120527.00 rows, Input std.dev.: 0.00% |
| 172 | + │ Right (build) Input avg.: 0.19 rows, Input std.dev.: 208.17% |
| 173 | + │ Distribution: REPLICATED |
| 174 | + │ dynamicFilterAssignments = {d_date_sk -> #df_370} |
| 175 | + ├─ ScanFilterProject[table = hive:default:store_sales, grouped = false, filterPredicate = true, dynamicFilters = {"ss_sold_date_sk" = #df_370}] |
| 176 | + │ Layout: [ss_sold_date_sk:bigint, $hashvalue:bigint] |
| 177 | + │ Estimates: {rows: 120527 (2.03MB), cpu: 1017.64k, memory: 0B, network: 0B}/{rows: 120527 (2.03MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: 120527 (2.03MB), cpu: 4.02M, memory: 0B, network: 0B} |
| 178 | + │ CPU: 49.00ms (18.85%), Scheduled: 123.00ms (19.62%), Output: 120527 rows (2.07MB) |
| 179 | + │ Input avg.: 120527.00 rows, Input std.dev.: 0.00% |
| 180 | + │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ss_sold_date_sk"), 0)) |
| 181 | + │ ss_sold_date_sk := ss_sold_date_sk:bigint:REGULAR |
| 182 | + │ Input: 120527 rows (1.03MB), Filtered: 0.00% |
| 183 | + │ Dynamic filters: |
| 184 | + │ - df_370, [ SortedRangeSet[type=bigint, ranges=3, {[2451546], ..., [2451905]}] ], collection time=2.34s |
| 185 | + | |
| 186 | +... |
| 187 | +``` |
| 188 | + |
| 189 | +## Dynamic filter collection thresholds |
| 190 | + |
| 191 | +In order for dynamic filtering to work, the smaller dimension table |
| 192 | +needs to be chosen as a join’s build side. The cost-based optimizer can automatically |
| 193 | +do this using table statistics provided by connectors. Therefore, it is recommended |
| 194 | +to keep {doc}`table statistics </optimizer/statistics>` up to date and rely on the |
| 195 | +CBO to correctly choose the smaller table on the build side of join. |
| 196 | + |
| 197 | +Collection of values of the join key columns from the build side for |
| 198 | +dynamic filtering may incur additional CPU overhead during query execution. |
| 199 | +Therefore, to limit the overhead of collecting dynamic filters |
| 200 | +to the cases where the join operator is likely to be selective, |
| 201 | +Trino defines thresholds on the size of dynamic filters collected from build side tasks. |
| 202 | +Collection of dynamic filters for joins with large build sides can be enabled |
| 203 | +using the `enable-large-dynamic-filters` configuration property or the |
| 204 | +`enable_large_dynamic_filters` session property. |
| 205 | + |
| 206 | +When large dynamic filters are enabled, limits on the size of dynamic filters can |
| 207 | +be configured using the configuration properties |
| 208 | +`dynamic-filtering.large.max-distinct-values-per-driver`, |
| 209 | +`dynamic-filtering.large.max-size-per-driver` , |
| 210 | +`dynamic-filtering.large.range-row-limit-per-driver`, |
| 211 | +`dynamic-filtering.large-partitioned.max-distinct-values-per-driver`, |
| 212 | +`dynamic-filtering.large-partitioned.max-size-per-driver` and |
| 213 | +`dynamic-filtering.large-partitioned.range-row-limit-per-driver`. |
| 214 | + |
| 215 | +Similarly, limits for dynamic filters when `enable-large-dynamic-filters` |
| 216 | +is not enabled can be configured using configuration properties like |
| 217 | +`dynamic-filtering.small.max-distinct-values-per-driver`, |
| 218 | +`dynamic-filtering.small.max-size-per-driver` , |
| 219 | +`dynamic-filtering.small.range-row-limit-per-driver`, |
| 220 | +`dynamic-filtering.small-partitioned.max-distinct-values-per-driver`, |
| 221 | +`dynamic-filtering.small-partitioned.max-size-per-driver` and |
| 222 | +`dynamic-filtering.small-partitioned.range-row-limit-per-driver`. |
| 223 | + |
| 224 | +The `dynamic-filtering.large.*` and `dynamic-filtering.small.*` limits are applied |
| 225 | +when dynamic filters are collected before build side is partitioned on join |
| 226 | +keys (when broadcast join is chosen or when fault tolerant execution is enabled). The |
| 227 | +`dynamic-filtering.large-partitioned.*` and `dynamic-filtering.small-partitioned.*` |
| 228 | +limits are applied when dynamic filters are collected after build side is partitioned |
| 229 | +on join keys (when partitioned join is chosen and fault tolerant execution is disabled). |
| 230 | + |
| 231 | +The properties based on `max-distinct-values-per-driver` and `max-size-per-driver` |
| 232 | +define thresholds for the size up to which dynamic filters are collected in a |
| 233 | +distinct values data structure. When the build side exceeds these thresholds, |
| 234 | +Trino switches to collecting min and max values per column to reduce overhead. |
| 235 | +This min-max filter has much lower granularity than the distinct values filter. |
| 236 | +However, it may still be beneficial in filtering some data from the probe side, |
| 237 | +especially when a range of values is selected from the build side of the join. |
| 238 | +The limits for min-max filters collection are defined by the properties |
| 239 | +based on `range-row-limit-per-driver`. |
| 240 | + |
| 241 | +## Dimension tables layout |
| 242 | + |
| 243 | +Dynamic filtering works best for dimension tables where |
| 244 | +table keys are correlated with columns. |
| 245 | + |
| 246 | +For example, a date dimension key column should be correlated with a date column, |
| 247 | +so the table keys monotonically increase with date values. |
| 248 | +An address dimension key can be composed of other columns such as |
| 249 | +`COUNTRY-STATE-ZIP-ADDRESS_ID` with an example value of `US-NY-10001-1234`. |
| 250 | +This usage allows dynamic filtering to succeed even with a large number |
| 251 | +of selected rows from the dimension table. |
| 252 | + |
| 253 | +## Limitations |
| 254 | + |
| 255 | +- Min-max dynamic filter collection is not supported for `DOUBLE`, `REAL` and unorderable data types. |
| 256 | +- Dynamic filtering is not supported for `DOUBLE` and `REAL` data types when using `IS NOT DISTINCT FROM` predicate. |
| 257 | +- Dynamic filtering is supported when the join key contains a cast from the build key type to the |
| 258 | + probe key type. Dynamic filtering is also supported in limited scenarios when there is an implicit |
| 259 | + cast from the probe key type to the build key type. For example, dynamic filtering is supported when |
| 260 | + the build side key is of `DOUBLE` type and the probe side key is of `REAL` or `INTEGER` type. |
0 commit comments