Skip to content

Commit 794d012

Browse files
authored
feat(multi): uniformly distribute addresses for quick search (#1290)
* feat: uniformly distribute addresses for quick search * refactor: rename parameters * fix: incorrect case for dapp client query parameter * feat: add deterministic ordering for address quick search * feat: apply priority ordering for all quick search entities
1 parent b28d9ae commit 794d012

File tree

14 files changed

+298
-73
lines changed

14 files changed

+298
-73
lines changed

multichain-aggregator/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

multichain-aggregator/multichain-aggregator-logic/src/clients/dapp.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use api_client_framework::{
33
};
44
use reqwest::Method;
55
use serde::{Deserialize, Serialize};
6+
use serde_with::{formats::CommaSeparator, serde_as, StringWithSeparator};
67
use url::Url;
78

89
pub fn new_client(url: Url) -> Result<Client, Error> {
@@ -17,11 +18,14 @@ pub mod search_dapps {
1718
pub params: SearchDappsParams,
1819
}
1920

21+
#[serde_as]
2022
#[derive(Serialize, Clone, Debug, Default, PartialEq)]
23+
#[serde(rename_all = "camelCase")]
2124
pub struct SearchDappsParams {
2225
pub title: Option<String>,
2326
pub categories: Option<String>,
24-
pub chain_ids: Option<String>,
27+
#[serde_as(as = "StringWithSeparator::<CommaSeparator, i64>")]
28+
pub chain_ids: Vec<i64>,
2529
}
2630

2731
impl Endpoint for SearchDapps {

multichain-aggregator/multichain-aggregator-logic/src/clients/token_info.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use api_client_framework::{
44
};
55
use reqwest::Method;
66
use serde::{Deserialize, Serialize};
7+
use serde_with::{formats::CommaSeparator, serde_as, StringWithSeparator};
78
use url::Url;
89

910
pub fn new_client(url: Url) -> Result<Client, Error> {
@@ -41,11 +42,14 @@ pub mod search_token_infos {
4142
pub params: SearchTokenInfosParams,
4243
}
4344

45+
#[serde_as]
4446
#[serde_with::skip_serializing_none]
4547
#[derive(Serialize, Clone, Debug, Default, PartialEq)]
4648
pub struct SearchTokenInfosParams {
4749
pub query: String,
48-
pub chain_id: Option<ChainId>,
50+
#[serde_as(as = "StringWithSeparator::<CommaSeparator, ChainId>")]
51+
#[serde(skip_serializing_if = "Vec::is_empty")]
52+
pub chain_id: Vec<ChainId>,
4953
pub page_size: Option<u32>,
5054
pub page_token: Option<String>,
5155
}

multichain-aggregator/multichain-aggregator-logic/src/repository/addresses.rs

+123-23
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use entity::{
77
use regex::Regex;
88
use sea_orm::{
99
prelude::Expr,
10-
sea_query::{Alias, ColumnRef, CommonTableExpression, IntoIden, OnConflict, Query, WithClause},
10+
sea_query::{
11+
Alias, ColumnRef, CommonTableExpression, IntoIden, OnConflict, Query, WindowStatement,
12+
WithClause,
13+
},
1114
ActiveValue::NotSet,
1215
ColumnTrait, ConnectionTrait, DbErr, EntityTrait, FromQueryResult, IntoSimpleExpr, Iterable,
1316
Order, QuerySelect,
@@ -46,50 +49,147 @@ where
4649
Ok(())
4750
}
4851

49-
pub async fn list<C>(
50-
db: &C,
51-
address: Option<AddressAlloy>,
52-
query: Option<String>,
53-
chain_id: Option<ChainId>,
54-
token_types: Option<Vec<db_enum::TokenType>>,
55-
page_size: u64,
56-
page_token: Option<(AddressAlloy, ChainId)>,
57-
) -> Result<(Vec<Model>, Option<(AddressAlloy, ChainId)>), DbErr>
58-
where
59-
C: ConnectionTrait,
60-
{
52+
fn prepare_address_search_cte(
53+
contract_name_query: Option<String>,
54+
cte_name: impl IntoIden,
55+
) -> CommonTableExpression {
6156
// Materialize addresses CTE when searching by contract_name.
6257
// Otherwise, query planner chooses a suboptimal plan.
63-
let is_cte_materialized = query.is_some();
64-
let addresses_cte_iden = Alias::new("addresses").into_iden();
65-
let addresses_cte = CommonTableExpression::new()
58+
// If query is not provided, this CTE will be folded by the optimizer.
59+
let is_cte_materialized = contract_name_query.is_some();
60+
61+
CommonTableExpression::new()
6662
.query(
6763
QuerySelect::query(&mut Entity::find())
68-
.apply_if(query, |q, query| {
64+
.apply_if(contract_name_query, |q, query| {
6965
let ts_query = prepare_ts_query(&query);
7066
q.and_where(Expr::cust_with_expr(
7167
"to_tsvector('english', contract_name) @@ to_tsquery($1)",
7268
ts_query,
7369
));
7470
})
71+
// Apply a hard limit in case we materialize the CTE
72+
.apply_if(is_cte_materialized.then_some(10_000), |q, limit| {
73+
q.limit(limit);
74+
})
7575
.to_owned(),
7676
)
7777
.materialized(is_cte_materialized)
78-
.table_name(addresses_cte_iden.clone())
78+
.table_name(cte_name)
79+
.to_owned()
80+
}
81+
82+
pub async fn uniform_chain_search<C>(
83+
db: &C,
84+
contract_name_query: String,
85+
token_types: Option<Vec<db_enum::TokenType>>,
86+
chain_ids: Vec<ChainId>,
87+
) -> Result<Vec<Model>, DbErr>
88+
where
89+
C: ConnectionTrait,
90+
{
91+
if chain_ids.is_empty() {
92+
return Ok(vec![]);
93+
}
94+
95+
let ts_rank_ordering = Expr::cust_with_expr(
96+
"ts_rank(to_tsvector('english', contract_name), to_tsquery($1))",
97+
prepare_ts_query(&contract_name_query),
98+
);
99+
100+
let addresses_cte_iden = Alias::new("addresses").into_iden();
101+
let addresses_cte =
102+
prepare_address_search_cte(Some(contract_name_query), addresses_cte_iden.clone());
103+
104+
let row_number = Expr::custom_keyword(Alias::new("ROW_NUMBER()"));
105+
let ranked_addresses_iden = Alias::new("ranked_addresses").into_iden();
106+
let ranked_addresses_cte = CommonTableExpression::new()
107+
.query(
108+
Query::select()
109+
.column(ColumnRef::TableAsterisk(addresses_cte_iden.clone()))
110+
.expr_window_as(
111+
row_number,
112+
WindowStatement::partition_by(Column::ChainId)
113+
.order_by_expr(ts_rank_ordering, Order::Desc)
114+
.order_by(Column::Hash, Order::Asc)
115+
.to_owned(),
116+
Alias::new("rn"),
117+
)
118+
.from(addresses_cte_iden.clone())
119+
.and_where(Column::ChainId.is_in(chain_ids.clone()))
120+
.apply_if(token_types, |q, token_types| {
121+
if !token_types.is_empty() {
122+
q.and_where(Column::TokenType.is_in(token_types));
123+
} else {
124+
q.and_where(Column::TokenType.is_null());
125+
}
126+
})
127+
.to_owned(),
128+
)
129+
.table_name(ranked_addresses_iden.clone())
130+
.to_owned();
131+
132+
let limit = chain_ids.len() as u64;
133+
let base_select = Query::select()
134+
.column(ColumnRef::Asterisk)
135+
.from(ranked_addresses_iden)
136+
.and_where(Expr::col(Alias::new("rn")).eq(1))
137+
.order_by_expr(
138+
Expr::cust_with_exprs(
139+
"array_position($1, $2)",
140+
[chain_ids.into(), Expr::col(Column::ChainId).into()],
141+
),
142+
Order::Asc,
143+
)
144+
.limit(limit)
79145
.to_owned();
80146

147+
let query = WithClause::new()
148+
.cte(addresses_cte)
149+
.cte(ranked_addresses_cte)
150+
.to_owned()
151+
.query(base_select);
152+
153+
let addresses = Model::find_by_statement(db.get_database_backend().build(&query))
154+
.all(db)
155+
.await?;
156+
157+
Ok(addresses)
158+
}
159+
160+
pub async fn list<C>(
161+
db: &C,
162+
address: Option<AddressAlloy>,
163+
contract_name_query: Option<String>,
164+
chain_ids: Option<Vec<ChainId>>,
165+
token_types: Option<Vec<db_enum::TokenType>>,
166+
page_size: u64,
167+
page_token: Option<(AddressAlloy, ChainId)>,
168+
) -> Result<(Vec<Model>, Option<(AddressAlloy, ChainId)>), DbErr>
169+
where
170+
C: ConnectionTrait,
171+
{
172+
let addresses_cte_iden = Alias::new("addresses").into_iden();
173+
let addresses_cte = prepare_address_search_cte(contract_name_query, addresses_cte_iden.clone());
174+
81175
let base_select = Query::select()
82176
.column(ColumnRef::Asterisk)
83177
.from(addresses_cte_iden)
84-
.apply_if(chain_id, |q, chain_id| {
85-
q.and_where(Column::ChainId.eq(chain_id));
178+
.apply_if(chain_ids, |q, chain_ids| {
179+
if !chain_ids.is_empty() {
180+
q.and_where(Column::ChainId.is_in(chain_ids));
181+
}
182+
})
183+
.apply_if(token_types, |q, token_types| {
184+
if !token_types.is_empty() {
185+
q.and_where(Column::TokenType.is_in(token_types));
186+
} else {
187+
q.and_where(Column::TokenType.is_null());
188+
}
86189
})
87190
.apply_if(address, |q, address| {
88191
q.and_where(Column::Hash.eq(address.as_slice()));
89192
})
90-
.apply_if(token_types, |q, token_types| {
91-
q.and_where(Column::TokenType.is_in(token_types));
92-
})
93193
.apply_if(page_token, |q, page_token| {
94194
q.and_where(
95195
Expr::tuple([

multichain-aggregator/multichain-aggregator-logic/src/repository/block_ranges.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::types::{block_ranges::BlockRange, ChainId};
33
use entity::block_ranges::{ActiveModel, Column, Entity, Model};
44
use sea_orm::{
55
prelude::Expr, sea_query::OnConflict, ActiveValue::NotSet, ColumnTrait, ConnectionTrait, DbErr,
6-
EntityTrait, QueryFilter,
6+
EntityTrait, QueryFilter, QueryTrait,
77
};
88

99
pub async fn upsert_many<C>(db: &C, block_ranges: Vec<BlockRange>) -> Result<(), DbErr>
@@ -56,13 +56,21 @@ where
5656
pub async fn list_matching_block_ranges_paginated<C>(
5757
db: &C,
5858
block_number: u64,
59+
chain_ids: Option<Vec<ChainId>>,
5960
page_size: u64,
6061
page_token: Option<ChainId>,
6162
) -> Result<(Vec<Model>, Option<ChainId>), DbErr>
6263
where
6364
C: ConnectionTrait,
6465
{
6566
let mut c = Entity::find()
67+
.apply_if(chain_ids, |q, chain_ids| {
68+
if !chain_ids.is_empty() {
69+
q.filter(Column::ChainId.is_in(chain_ids))
70+
} else {
71+
q
72+
}
73+
})
6674
.filter(Column::MinBlockNumber.lte(block_number))
6775
.filter(Column::MaxBlockNumber.gte(block_number))
6876
.cursor_by(Column::ChainId);

multichain-aggregator/multichain-aggregator-logic/src/repository/hashes.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub async fn list<C>(
4040
db: &C,
4141
hash: BlockHash,
4242
hash_type: Option<db_enum::HashType>,
43-
chain_id: Option<ChainId>,
43+
chain_ids: Option<Vec<ChainId>>,
4444
page_size: u64,
4545
page_token: Option<ChainId>,
4646
) -> Result<(Vec<Model>, Option<ChainId>), DbErr>
@@ -52,8 +52,12 @@ where
5252
.apply_if(hash_type, |q, hash_type| {
5353
q.filter(Column::HashType.eq(hash_type))
5454
})
55-
.apply_if(chain_id, |q, chain_id| {
56-
q.filter(Column::ChainId.eq(chain_id))
55+
.apply_if(chain_ids, |q, chain_ids| {
56+
if !chain_ids.is_empty() {
57+
q.filter(Column::ChainId.is_in(chain_ids))
58+
} else {
59+
q
60+
}
5761
})
5862
.cursor_by(Column::ChainId);
5963

0 commit comments

Comments
 (0)