@@ -1151,80 +1151,19 @@ impl EventCacheStore for SqliteEventCacheStore {
11511151
11521152 let event_id = event_id. to_owned ( ) ;
11531153 let filters = filters. map ( ToOwned :: to_owned) ;
1154- let this = self . clone ( ) ;
1154+ let store = self . clone ( ) ;
11551155
11561156 self . acquire ( )
11571157 . await ?
11581158 . with_transaction ( move |txn| -> Result < _ > {
1159- let get_rows = |row : & rusqlite:: Row < ' _ > | {
1160- Ok ( (
1161- row. get :: < _ , Vec < u8 > > ( 0 ) ?,
1162- row. get :: < _ , Option < u64 > > ( 1 ) ?,
1163- row. get :: < _ , Option < usize > > ( 2 ) ?,
1164- ) )
1165- } ;
1166-
1167- // Collect related events.
1168- let collect_results = |transaction| {
1169- let mut related = Vec :: new ( ) ;
1170-
1171- for result in transaction {
1172- let ( event_blob, chunk_id, index) : ( Vec < u8 > , Option < u64 > , _ ) = result?;
1173-
1174- let event: Event = serde_json:: from_slice ( & this. decode_value ( & event_blob) ?) ?;
1175-
1176- // Only build the position if both the chunk_id and position were present; in
1177- // theory, they should either be present at the same time, or not at all.
1178- let pos = chunk_id. zip ( index) . map ( |( chunk_id, index) | {
1179- Position :: new ( ChunkIdentifier :: new ( chunk_id) , index)
1180- } ) ;
1181-
1182- related. push ( ( event, pos) ) ;
1183- }
1184-
1185- Ok ( related)
1186- } ;
1187-
1188- let related = if let Some ( filters) = compute_filters_string ( filters. as_deref ( ) ) {
1189- let question_marks = repeat_vars ( filters. len ( ) ) ;
1190- let query = format ! (
1191- "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1192- FROM events
1193- LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1194- WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1195- ) ;
1196-
1197- let filters: Vec < _ > = filters. iter ( ) . map ( |f| f. to_sql ( ) . unwrap ( ) ) . collect ( ) ;
1198- let parameters = params_from_iter (
1199- [
1200- hashed_linked_chunk_id. to_sql ( ) . expect ( "We should be able to convert a hashed linked chunk ID to a SQLite value" ) ,
1201- event_id. as_str ( ) . to_sql ( ) . expect ( "We should be able to convert an event ID to a SQLite value" ) ,
1202- hashed_room_id. to_sql ( ) . expect ( "We should be able to convert a room ID to a SQLite value" )
1203- ]
1204- . into_iter ( )
1205- . chain ( filters)
1206- ) ;
1207-
1208- let mut transaction = txn. prepare ( & query) ?;
1209- let transaction = transaction. query_map ( parameters, get_rows) ?;
1210-
1211- collect_results ( transaction)
1212-
1213- } else {
1214- let query =
1215- "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1216- FROM events
1217- LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1218- WHERE relates_to = ? AND room_id = ?" ;
1219- let parameters = ( hashed_linked_chunk_id, event_id. as_str ( ) , hashed_room_id) ;
1220-
1221- let mut transaction = txn. prepare ( query) ?;
1222- let transaction = transaction. query_map ( parameters, get_rows) ?;
1223-
1224- collect_results ( transaction)
1225- } ;
1226-
1227- related
1159+ find_event_relations_transaction (
1160+ store,
1161+ hashed_room_id,
1162+ hashed_linked_chunk_id,
1163+ event_id,
1164+ filters,
1165+ txn,
1166+ )
12281167 } )
12291168 . await
12301169 }
@@ -1598,6 +1537,91 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
15981537 }
15991538}
16001539
1540+ fn find_event_relations_transaction (
1541+ store : SqliteEventCacheStore ,
1542+ hashed_room_id : Key ,
1543+ hashed_linked_chunk_id : Key ,
1544+ event_id : OwnedEventId ,
1545+ filters : Option < Vec < RelationType > > ,
1546+ txn : & Transaction < ' _ > ,
1547+ ) -> Result < Vec < ( Event , Option < Position > ) > > {
1548+ let get_rows = |row : & rusqlite:: Row < ' _ > | {
1549+ Ok ( (
1550+ row. get :: < _ , Vec < u8 > > ( 0 ) ?,
1551+ row. get :: < _ , Option < u64 > > ( 1 ) ?,
1552+ row. get :: < _ , Option < usize > > ( 2 ) ?,
1553+ ) )
1554+ } ;
1555+
1556+ // Collect related events.
1557+ let collect_results = |transaction| {
1558+ let mut related = Vec :: new ( ) ;
1559+
1560+ for result in transaction {
1561+ let ( event_blob, chunk_id, index) : ( Vec < u8 > , Option < u64 > , _ ) = result?;
1562+
1563+ let event: Event = serde_json:: from_slice ( & store. decode_value ( & event_blob) ?) ?;
1564+
1565+ // Only build the position if both the chunk_id and position were present; in
1566+ // theory, they should either be present at the same time, or not at all.
1567+ let pos = chunk_id
1568+ . zip ( index)
1569+ . map ( |( chunk_id, index) | Position :: new ( ChunkIdentifier :: new ( chunk_id) , index) ) ;
1570+
1571+ related. push ( ( event, pos) ) ;
1572+ }
1573+
1574+ Ok ( related)
1575+ } ;
1576+
1577+ let related = if let Some ( filters) = compute_filters_string ( filters. as_deref ( ) ) {
1578+ let question_marks = repeat_vars ( filters. len ( ) ) ;
1579+ let query = format ! (
1580+ "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1581+ FROM events
1582+ LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1583+ WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1584+ ) ;
1585+
1586+ let filters: Vec < _ > = filters. iter ( ) . map ( |f| f. to_sql ( ) . unwrap ( ) ) . collect ( ) ;
1587+ let parameters = params_from_iter (
1588+ [
1589+ hashed_linked_chunk_id. to_sql ( ) . expect (
1590+ "We should be able to convert a hashed linked chunk ID to a SQLite value" ,
1591+ ) ,
1592+ event_id
1593+ . as_str ( )
1594+ . to_sql ( )
1595+ . expect ( "We should be able to convert an event ID to a SQLite value" ) ,
1596+ hashed_room_id
1597+ . to_sql ( )
1598+ . expect ( "We should be able to convert a room ID to a SQLite value" ) ,
1599+ ]
1600+ . into_iter ( )
1601+ . chain ( filters) ,
1602+ ) ;
1603+
1604+ let mut transaction = txn. prepare ( & query) ?;
1605+ let transaction = transaction. query_map ( parameters, get_rows) ?;
1606+
1607+ collect_results ( transaction)
1608+ } else {
1609+ let query =
1610+ "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1611+ FROM events
1612+ LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1613+ WHERE relates_to = ? AND room_id = ?" ;
1614+ let parameters = ( hashed_linked_chunk_id, event_id. as_str ( ) , hashed_room_id) ;
1615+
1616+ let mut transaction = txn. prepare ( query) ?;
1617+ let transaction = transaction. query_map ( parameters, get_rows) ?;
1618+
1619+ collect_results ( transaction)
1620+ } ;
1621+
1622+ related
1623+ }
1624+
16011625/// Like `deadpool::managed::Object::with_transaction`, but starts the
16021626/// transaction in immediate (write) mode from the beginning, precluding errors
16031627/// of the kind SQLITE_BUSY from happening, for transactions that may involve
0 commit comments