Skip to content

Commit 6bfa276

Browse files
committed
refactor: use region ids
Signed-off-by: discord9 <[email protected]>
1 parent b7e1590 commit 6bfa276

File tree

3 files changed

+66
-43
lines changed

3 files changed

+66
-43
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/mito2/src/engine.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ use store_api::region_engine::{
9898
};
9999
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
100100
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
101-
use store_api::storage::{FileRefsManifest, RegionId, ScanRequest, SequenceNumber, TableId};
101+
use store_api::storage::{FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
102102
use tokio::sync::{Semaphore, oneshot};
103103

104104
use crate::cache::{CacheManagerRef, CacheStrategy};
@@ -260,7 +260,20 @@ impl MitoEngine {
260260
&self,
261261
region_ids: impl IntoIterator<Item = RegionId>,
262262
) -> Result<FileRefsManifest> {
263-
todo!()
263+
let file_ref_mgr = self.file_ref_manager();
264+
265+
// Convert region IDs to MitoRegionRef objects, error if any region doesn't exist
266+
let regions: Vec<MitoRegionRef> = region_ids
267+
.into_iter()
268+
.map(|region_id| {
269+
self.find_region(region_id)
270+
.with_context(|| RegionNotFoundSnafu { region_id })
271+
})
272+
.collect::<Result<_>>()?;
273+
274+
file_ref_mgr
275+
.get_snapshot_of_unmanifested_refs(regions)
276+
.await
264277
}
265278

266279
/// Returns true if the specific region exists.

src/mito2/src/sst/file_ref.rs

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@ use std::sync::Arc;
1717

1818
use common_telemetry::debug;
1919
use dashmap::{DashMap, Entry};
20-
use serde::{Deserialize, Serialize};
21-
use store_api::ManifestVersion;
22-
use store_api::storage::{FileId, FileRef, FileRefsManifest, RegionId, TableId};
20+
use store_api::storage::{FileRef, FileRefsManifest, RegionId};
2321

2422
use crate::error::Result;
2523
use crate::metrics::GC_REF_FILE_CNT;
26-
use crate::region::{MitoRegionRef, RegionMapRef};
24+
use crate::region::MitoRegionRef;
2725
use crate::sst::file::FileMeta;
2826

29-
/// File references for a table.
30-
/// It contains all files referenced by the table.
27+
/// File references for a region.
28+
/// It contains all files referenced by the region.
3129
#[derive(Debug, Clone, Default)]
32-
pub struct TableFileRefs {
30+
pub struct RegionFileRefs {
3331
/// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
3432
pub files: HashMap<FileRef, usize>,
3533
}
@@ -43,7 +41,7 @@ pub struct FileReferenceManager {
4341
/// Datanode id. used to determine tmp ref file name.
4442
node_id: Option<u64>,
4543
/// TODO(discord9): use no hash hasher since table id is sequential.
46-
files_per_table: DashMap<TableId, TableFileRefs>,
44+
files_per_region: DashMap<RegionId, RegionFileRefs>,
4745
}
4846

4947
pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
@@ -52,12 +50,12 @@ impl FileReferenceManager {
5250
pub fn new(node_id: Option<u64>) -> Self {
5351
Self {
5452
node_id,
55-
files_per_table: Default::default(),
53+
files_per_region: Default::default(),
5654
}
5755
}
5856

59-
fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
60-
let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
57+
fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58+
let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
6159
file_refs.clone()
6260
} else {
6361
// table_id not found.
@@ -73,8 +71,8 @@ impl FileReferenceManager {
7371
let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
7472

7573
debug!(
76-
"Get file refs for table {}, node {:?}, {} files",
77-
table_id,
74+
"Get file refs for region {}, node {:?}, {} files",
75+
region_id,
7876
self.node_id,
7977
ref_file_set.len(),
8078
);
@@ -101,13 +99,9 @@ impl FileReferenceManager {
10199
regions: impl IntoIterator<Item = MitoRegionRef>,
102100
) -> Result<FileRefsManifest> {
103101
let regions: Vec<MitoRegionRef> = regions.into_iter().collect();
104-
let all_table_ids = regions
105-
.iter()
106-
.map(|r| r.region_id().table_id())
107-
.collect::<HashSet<_>>();
108102
let mut ref_files = HashSet::new();
109-
for table_id in all_table_ids {
110-
if let Some(files) = self.ref_file_set(table_id) {
103+
for region_id in regions.iter().map(|r| r.region_id()) {
104+
if let Some(files) = self.ref_file_set(region_id) {
111105
ref_files.extend(files);
112106
}
113107
}
@@ -138,12 +132,12 @@ impl FileReferenceManager {
138132
/// Also records the access layer for the table if not exists.
139133
/// The access layer will be used to upload ref file to object storage.
140134
pub fn add_file(&self, file_meta: &FileMeta) {
141-
let table_id = file_meta.region_id.table_id();
135+
let region_id = file_meta.region_id;
142136
let mut is_new = false;
143137
{
144138
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
145-
self.files_per_table
146-
.entry(table_id)
139+
self.files_per_region
140+
.entry(region_id)
147141
.and_modify(|refs| {
148142
refs.files
149143
.entry(file_ref.clone())
@@ -153,7 +147,7 @@ impl FileReferenceManager {
153147
1
154148
});
155149
})
156-
.or_insert_with(|| TableFileRefs {
150+
.or_insert_with(|| RegionFileRefs {
157151
files: HashMap::from_iter([(file_ref, 1)]),
158152
});
159153
}
@@ -165,14 +159,14 @@ impl FileReferenceManager {
165159
/// Removes a file reference.
166160
/// If the reference count reaches zero, the file reference will be removed from the manager.
167161
pub fn remove_file(&self, file_meta: &FileMeta) {
168-
let table_id = file_meta.region_id.table_id();
162+
let region_id = file_meta.region_id;
169163
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
170164

171165
let mut remove_table_entry = false;
172166
let mut remove_file_ref = false;
173167
let mut file_cnt = 0;
174168

175-
let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
169+
let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
176170
let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
177171
if *count > 0 {
178172
*count -= 1;
@@ -194,7 +188,7 @@ impl FileReferenceManager {
194188
}
195189
});
196190

197-
if let Entry::Occupied(o) = table_ref
191+
if let Entry::Occupied(o) = region_ref
198192
&& remove_table_entry
199193
{
200194
o.remove_entry();
@@ -214,7 +208,7 @@ mod tests {
214208
use std::num::NonZeroU64;
215209

216210
use smallvec::SmallVec;
217-
use store_api::storage::RegionId;
211+
use store_api::storage::{FileId, RegionId};
218212

219213
use super::*;
220214
use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
@@ -244,54 +238,69 @@ mod tests {
244238
file_ref_mgr.add_file(&file_meta);
245239

246240
assert_eq!(
247-
file_ref_mgr.files_per_table.get(&0).unwrap().files,
241+
file_ref_mgr
242+
.files_per_region
243+
.get(&file_meta.region_id)
244+
.unwrap()
245+
.files,
248246
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
249247
);
250248

251249
file_ref_mgr.add_file(&file_meta);
252250

253-
let expected_table_ref_manifest =
251+
let expected_region_ref_manifest =
254252
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
255253

256254
assert_eq!(
257-
file_ref_mgr.ref_file_set(0).unwrap(),
258-
expected_table_ref_manifest
255+
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
256+
expected_region_ref_manifest
259257
);
260258

261259
assert_eq!(
262-
file_ref_mgr.files_per_table.get(&0).unwrap().files,
260+
file_ref_mgr
261+
.files_per_region
262+
.get(&file_meta.region_id)
263+
.unwrap()
264+
.files,
263265
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
264266
);
265267

266268
assert_eq!(
267-
file_ref_mgr.ref_file_set(0).unwrap(),
268-
expected_table_ref_manifest
269+
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
270+
expected_region_ref_manifest
269271
);
270272

271273
file_ref_mgr.remove_file(&file_meta);
272274

273275
assert_eq!(
274-
file_ref_mgr.files_per_table.get(&0).unwrap().files,
276+
file_ref_mgr
277+
.files_per_region
278+
.get(&file_meta.region_id)
279+
.unwrap()
280+
.files,
275281
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
276282
);
277283

278284
assert_eq!(
279-
file_ref_mgr.ref_file_set(0).unwrap(),
280-
expected_table_ref_manifest
285+
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
286+
expected_region_ref_manifest
281287
);
282288

283289
file_ref_mgr.remove_file(&file_meta);
284290

285291
assert!(
286-
file_ref_mgr.files_per_table.get(&0).is_none(),
292+
file_ref_mgr
293+
.files_per_region
294+
.get(&file_meta.region_id)
295+
.is_none(),
287296
"{:?}",
288-
file_ref_mgr.files_per_table
297+
file_ref_mgr.files_per_region
289298
);
290299

291300
assert!(
292-
file_ref_mgr.ref_file_set(0).is_none(),
301+
file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
293302
"{:?}",
294-
file_ref_mgr.files_per_table
303+
file_ref_mgr.files_per_region
295304
);
296305
}
297306
}

0 commit comments

Comments
 (0)