Skip to content

Commit 4ac8dda

Browse files
committed
feat: datanode gc worker handler
Signed-off-by: discord9 <[email protected]>
1 parent 4fcea0a commit 4ac8dda

File tree

8 files changed

+197
-9
lines changed

8 files changed

+197
-9
lines changed

src/datanode/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ pub enum Error {
315315
location: Location,
316316
},
317317

318+
#[snafu(display("Failed to run gc for mito engine"))]
319+
GcMitoEngine {
320+
region_id: RegionId,
321+
source: mito2::error::Error,
322+
#[snafu(implicit)]
323+
location: Location,
324+
},
325+
318326
#[snafu(display("Failed to list SST entries from storage"))]
319327
ListStorageSsts {
320328
#[snafu(implicit)]
@@ -458,7 +466,7 @@ impl ErrorExt for Error {
458466
StopRegionEngine { source, .. } => source.status_code(),
459467

460468
FindLogicalRegions { source, .. } => source.status_code(),
461-
BuildMitoEngine { source, .. } => source.status_code(),
469+
BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(),
462470
BuildMetricEngine { source, .. } => source.status_code(),
463471
ListStorageSsts { source, .. } => source.status_code(),
464472
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {

src/datanode/src/heartbeat/handler.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ use common_meta::instruction::{Instruction, InstructionReply};
2222
use common_telemetry::error;
2323
use futures::future::BoxFuture;
2424
use snafu::OptionExt;
25-
use store_api::storage::RegionId;
25+
use store_api::storage::{GcReport, RegionId};
2626

2727
mod close_region;
2828
mod downgrade_region;
2929
mod file_ref;
3030
mod flush_region;
31+
mod gc_worker;
3132
mod open_region;
3233
mod upgrade_region;
3334

@@ -41,6 +42,7 @@ pub struct RegionHeartbeatResponseHandler {
4142
catchup_tasks: TaskTracker<()>,
4243
downgrade_tasks: TaskTracker<()>,
4344
flush_tasks: TaskTracker<()>,
45+
gc_tasks: TaskTracker<GcReport>,
4446
}
4547

4648
/// Handler of the instruction.
@@ -53,6 +55,7 @@ pub struct HandlerContext {
5355
catchup_tasks: TaskTracker<()>,
5456
downgrade_tasks: TaskTracker<()>,
5557
flush_tasks: TaskTracker<()>,
58+
gc_tasks: TaskTracker<GcReport>,
5659
}
5760

5861
impl HandlerContext {
@@ -67,6 +70,7 @@ impl HandlerContext {
6770
catchup_tasks: TaskTracker::new(),
6871
downgrade_tasks: TaskTracker::new(),
6972
flush_tasks: TaskTracker::new(),
73+
gc_tasks: TaskTracker::new(),
7074
}
7175
}
7276
}
@@ -79,6 +83,7 @@ impl RegionHeartbeatResponseHandler {
7983
catchup_tasks: TaskTracker::new(),
8084
downgrade_tasks: TaskTracker::new(),
8185
flush_tasks: TaskTracker::new(),
86+
gc_tasks: TaskTracker::new(),
8287
}
8388
}
8489

@@ -106,7 +111,9 @@ impl RegionHeartbeatResponseHandler {
106111
Instruction::GetFileRefs(get_file_refs) => Ok(Box::new(move |handler_context| {
107112
Box::pin(handler_context.handle_get_file_refs_instruction(get_file_refs))
108113
})),
109-
Instruction::GcRegions(_) => InvalidHeartbeatResponseSnafu.fail(),
114+
Instruction::GcRegions(gc_regions) => Ok(Box::new(move |handler_context| {
115+
Box::pin(handler_context.handle_gc_regions_instruction(gc_regions))
116+
})),
110117
}
111118
}
112119
}
@@ -121,6 +128,8 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
121128
| Some((_, Instruction::DowngradeRegion { .. }))
122129
| Some((_, Instruction::UpgradeRegion { .. }))
123130
| Some((_, Instruction::FlushRegions { .. }))
131+
| Some((_, Instruction::GetFileRefs { .. }))
132+
| Some((_, Instruction::GcRegions { .. }))
124133
)
125134
}
126135

@@ -135,13 +144,15 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
135144
let catchup_tasks = self.catchup_tasks.clone();
136145
let downgrade_tasks = self.downgrade_tasks.clone();
137146
let flush_tasks = self.flush_tasks.clone();
147+
let gc_tasks = self.gc_tasks.clone();
138148
let handler = Self::build_handler(instruction)?;
139149
let _handle = common_runtime::spawn_global(async move {
140150
let reply = handler(HandlerContext {
141151
region_server,
142152
catchup_tasks,
143153
downgrade_tasks,
144154
flush_tasks,
155+
gc_tasks,
145156
})
146157
.await;
147158

src/datanode/src/heartbeat/handler/file_ref.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use common_meta::instruction::{GetFileRefs, GetFileRefsReply, InstructionReply};
16+
use common_telemetry::info;
1617
use store_api::storage::FileRefsManifest;
1718

1819
use crate::heartbeat::handler::HandlerContext;
@@ -23,7 +24,7 @@ impl HandlerContext {
2324
get_file_refs: GetFileRefs,
2425
) -> Option<InstructionReply> {
2526
let region_server = self.region_server;
26-
27+
info!("Getting mito Engine");
2728
// Get the MitoEngine
2829
let Some(mito_engine) = region_server.mito_engine() else {
2930
return Some(InstructionReply::GetFileRefs(GetFileRefsReply {
@@ -32,7 +33,10 @@ impl HandlerContext {
3233
error: Some("MitoEngine not found".to_string()),
3334
}));
3435
};
35-
36+
info!(
37+
"Mito Engine found, getting file refs for regions: {:?}",
38+
get_file_refs.region_ids
39+
);
3640
match mito_engine
3741
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
3842
.await
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
16+
use common_telemetry::{info, warn};
17+
use mito2::gc::LocalGcWorker;
18+
use snafu::{OptionExt, ResultExt};
19+
use store_api::storage::{FileRefsManifest, RegionId};
20+
21+
use crate::error::{GcMitoEngineSnafu, RegionNotFoundSnafu, Result, UnexpectedSnafu};
22+
use crate::heartbeat::handler::HandlerContext;
23+
24+
impl HandlerContext {
25+
pub(crate) async fn handle_gc_regions_instruction(
26+
self,
27+
gc_regions: GcRegions,
28+
) -> Option<InstructionReply> {
29+
let region_ids = gc_regions.regions.clone();
30+
info!("Received gc regions instruction: {:?}", region_ids);
31+
32+
let mut table_id = None;
33+
let is_same_table = region_ids.windows(2).all(|w| {
34+
let t1 = w[0].table_id();
35+
let t2 = w[1].table_id();
36+
if table_id.is_none() {
37+
table_id = Some(t1);
38+
}
39+
t1 == t2
40+
});
41+
if !is_same_table {
42+
return Some(InstructionReply::GcRegions(GcRegionsReply {
43+
result: Err(format!(
44+
"Regions to GC should belong to the same table, found: {:?}",
45+
region_ids
46+
)),
47+
}));
48+
}
49+
50+
let (region_id, gc_worker) = match self
51+
.create_gc_worker(region_ids, &gc_regions.file_refs_manifest)
52+
.await
53+
{
54+
Ok(worker) => worker,
55+
Err(e) => {
56+
return Some(InstructionReply::GcRegions(GcRegionsReply {
57+
result: Err(format!("Failed to create GC worker: {}", e)),
58+
}));
59+
}
60+
};
61+
62+
let register_result = self
63+
.gc_tasks
64+
.try_register(
65+
region_id,
66+
Box::pin(async move {
67+
info!("Starting gc worker for region {}", region_id);
68+
let report = gc_worker
69+
.run()
70+
.await
71+
.context(GcMitoEngineSnafu { region_id })?;
72+
info!("Gc worker for region {} finished", region_id);
73+
Ok(report)
74+
}),
75+
)
76+
.await;
77+
if register_result.is_busy() {
78+
warn!("Another gc task is running for the region: {region_id}");
79+
}
80+
let mut watcher = register_result.into_watcher();
81+
let result = self.gc_tasks.wait_until_finish(&mut watcher).await;
82+
match result {
83+
Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
84+
result: Ok(report),
85+
})),
86+
Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
87+
result: Err(format!("{err:?}")),
88+
})),
89+
}
90+
}
91+
92+
async fn create_gc_worker(
93+
&self,
94+
mut region_ids: Vec<RegionId>,
95+
file_ref_manifest: &FileRefsManifest,
96+
) -> Result<(RegionId, LocalGcWorker)> {
97+
// always use the smallest region id on datanode as the target region id
98+
region_ids.sort_by_key(|r| r.region_number());
99+
let mito_engine = self.region_server.mito_engine().context(UnexpectedSnafu {
100+
violated: "MitoEngine not found".to_string(),
101+
})?;
102+
let region_id = *region_ids.first().context(UnexpectedSnafu {
103+
violated: "No region ids provided".to_string(),
104+
})?;
105+
106+
let mito_config = mito_engine.mito_config();
107+
let region = mito_engine
108+
.find_region(region_id)
109+
.context(RegionNotFoundSnafu { region_id })?;
110+
let access_layer = region.access_layer();
111+
112+
let cache_manager = mito_engine.cache_manager();
113+
114+
let gc_worker = LocalGcWorker::try_new(
115+
access_layer.clone(),
116+
Some(cache_manager),
117+
region_ids.into_iter().collect(),
118+
Default::default(),
119+
mito_config.clone().into(),
120+
file_ref_manifest.clone(),
121+
&mito_engine.gc_limiter(),
122+
)
123+
.await
124+
.context(GcMitoEngineSnafu { region_id })?;
125+
126+
Ok((region_id, gc_worker))
127+
}
128+
}

src/mito2/src/engine.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ use common_base::Plugins;
7878
use common_error::ext::BoxedError;
7979
use common_meta::key::SchemaMetadataManagerRef;
8080
use common_recordbatch::SendableRecordBatchStream;
81-
use common_telemetry::{info, tracing};
81+
use common_telemetry::tracing_subscriber::field::debug;
82+
use common_telemetry::{debug, info, tracing};
8283
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
8384
use futures::future::{join_all, try_join_all};
8485
use futures::stream::{self, Stream, StreamExt};
@@ -267,6 +268,8 @@ impl MitoEngine {
267268
) -> Result<FileRefsManifest> {
268269
let file_ref_mgr = self.file_ref_manager();
269270

271+
let region_ids = region_ids.into_iter().collect::<Vec<_>>();
272+
info!("Getting region refs for regions: {:?}", region_ids);
270273
// Convert region IDs to MitoRegionRef objects, error if any region doesn't exist
271274
let regions: Vec<MitoRegionRef> = region_ids
272275
.into_iter()
@@ -275,6 +278,7 @@ impl MitoEngine {
275278
.with_context(|| RegionNotFoundSnafu { region_id })
276279
})
277280
.collect::<Result<_>>()?;
281+
info!("Finding unmanifested file refs for regions: {:?}", regions);
278282

279283
file_ref_mgr
280284
.get_snapshot_of_unmanifested_refs(regions)
@@ -377,7 +381,7 @@ impl MitoEngine {
377381
self.find_region(id)
378382
}
379383

380-
pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
384+
pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
381385
self.inner.workers.get_region(region_id)
382386
}
383387

src/mito2/src/region.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,10 @@ impl MitoRegion {
555555
Ok(())
556556
}
557557

558+
pub fn access_layer(&self) -> AccessLayerRef {
559+
self.access_layer.clone()
560+
}
561+
558562
/// Returns the SST entries of the region.
559563
pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
560564
let table_dir = self.table_dir();

src/mito2/src/sst/file_ref.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::collections::{HashMap, HashSet};
1616
use std::sync::Arc;
1717

18-
use common_telemetry::debug;
18+
use common_telemetry::{debug, info};
1919
use dashmap::{DashMap, Entry};
2020
use store_api::storage::{FileRef, FileRefsManifest, RegionId};
2121

@@ -105,11 +105,13 @@ impl FileReferenceManager {
105105
ref_files.extend(files);
106106
}
107107
}
108+
info!("Unfiltered ref files: {:?}", ref_files);
108109

109110
let mut in_manifest_files = HashSet::new();
110111
let mut manifest_version = HashMap::new();
111112

112113
for r in &regions {
114+
info!("Getting manifest for region {}", r.region_id());
113115
let manifest = r.manifest_ctx.manifest().await;
114116
let files = manifest.files.keys().cloned().collect::<Vec<_>>();
115117
in_manifest_files.extend(files);
@@ -121,6 +123,10 @@ impl FileReferenceManager {
121123
.filter(|f| !in_manifest_files.contains(&f.file_id))
122124
.cloned()
123125
.collect::<HashSet<_>>();
126+
info!(
127+
"Ref files excluding in-manifest files: {:?}, manifest_version: {:?}",
128+
ref_files_excluding_in_manifest, manifest_version
129+
);
124130

125131
Ok(FileRefsManifest {
126132
file_refs: ref_files_excluding_in_manifest,

src/store-api/src/storage/file.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,37 @@ pub struct FileRefsManifest {
9191
pub manifest_version: HashMap<RegionId, ManifestVersion>,
9292
}
9393

94-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94+
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
9595
pub struct GcReport {
9696
/// deleted files per region
9797
pub deleted_files: HashMap<RegionId, Vec<FileId>>,
9898
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
9999
pub need_retry_regions: HashSet<RegionId>,
100100
}
101101

102+
impl GcReport {
103+
pub fn new(
104+
deleted_files: HashMap<RegionId, Vec<FileId>>,
105+
need_retry_regions: HashSet<RegionId>,
106+
) -> Self {
107+
Self {
108+
deleted_files,
109+
need_retry_regions,
110+
}
111+
}
112+
113+
pub fn merge(&mut self, other: GcReport) {
114+
for (region, files) in other.deleted_files {
115+
self.deleted_files
116+
.entry(region)
117+
.or_default()
118+
.extend(files.into_iter());
119+
}
120+
self.need_retry_regions
121+
.extend(other.need_retry_regions.into_iter());
122+
}
123+
}
124+
102125
#[cfg(test)]
103126
mod tests {
104127

0 commit comments

Comments
 (0)