Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/meta/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
file_removal_rate: u64,
},
Metric {
data_manifest_version: u64,
Expand Down Expand Up @@ -271,9 +272,11 @@ impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
store_api::region_engine::RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removal_rate,
} => RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removal_rate,
},
store_api::region_engine::RegionManifestInfo::Metric {
data_manifest_version,
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/tests/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn alter_request_handler(_peer: Peer, request: RegionRequest) -> Result<RegionRe
let region_id = RegionId::from(req.region_id);
response.extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::mito(1, 1))])
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::mito(1, 1, 0))])
.unwrap(),
);
response.extensions.insert(
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl Display for GetFileRefs {
/// Instruction to trigger garbage collection for a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GcRegions {
/// The region ID to perform GC on.
/// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
pub regions: Vec<RegionId>,
/// The file references manifest containing temporary file references.
pub file_refs_manifest: FileRefsManifest,
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/region_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl LeaderRegionManifestInfo {
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removal_rate: _,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
Expand Down
1 change: 0 additions & 1 deletion src/datanode/src/heartbeat/handler/file_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl InstructionHandler for GetFileRefsHandler {
error: Some("MitoEngine not found".to_string()),
}));
};

match mito_engine
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
.await
Expand Down
74 changes: 51 additions & 23 deletions src/datanode/src/heartbeat/handler/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
use common_telemetry::{debug, warn};
use mito2::gc::LocalGcWorker;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{FileRefsManifest, RegionId};

use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
Expand All @@ -35,20 +35,6 @@ impl InstructionHandler for GcRegionsHandler {
let region_ids = gc_regions.regions.clone();
debug!("Received gc regions instruction: {:?}", region_ids);

let is_same_table = region_ids.windows(2).all(|w| {
let t1 = w[0].table_id();
let t2 = w[1].table_id();
t1 == t2
});
if !is_same_table {
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!(
"Regions to GC should belong to the same table, found: {:?}",
region_ids
)),
}));
}

let (region_id, gc_worker) = match self
.create_gc_worker(
ctx,
Expand Down Expand Up @@ -103,6 +89,8 @@ impl InstructionHandler for GcRegionsHandler {
}

impl GcRegionsHandler {
/// Create a GC worker for the given region IDs.
/// Return the first region ID(after sort by given region id) and the GC worker.
async fn create_gc_worker(
&self,
ctx: &HandlerContext,
Expand All @@ -112,22 +100,51 @@ impl GcRegionsHandler {
) -> Result<(RegionId, LocalGcWorker)> {
// always use the smallest region id on datanode as the target region id
region_ids.sort_by_key(|r| r.region_number());

ensure!(
region_ids.windows(2).all(|w| {
let t1 = w[0].table_id();
let t2 = w[1].table_id();
t1 == t2
}),
InvalidGcArgsSnafu {
msg: format!(
"Regions to GC should belong to the same table, found: {:?}",
region_ids
),
}
);

let mito_engine = ctx
.region_server
.mito_engine()
.with_context(|| UnexpectedSnafu {
violated: "MitoEngine not found".to_string(),
})?;
let region_id = *region_ids.first().with_context(|| UnexpectedSnafu {
violated: "No region ids provided".to_string(),

let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
msg: "No region ids provided".to_string(),
})?;

let mito_config = mito_engine.mito_config();
// also need to ensure all regions are on this datanode
ensure!(
region_ids
.iter()
.all(|rid| mito_engine.find_region(*rid).is_some()),
InvalidGcArgsSnafu {
msg: format!(
"Some regions are not on current datanode:{:?}",
region_ids
.iter()
.filter(|rid| mito_engine.find_region(**rid).is_none())
.collect::<Vec<_>>()
),
}
);

// Find the access layer from one of the regions that exists on this datanode
let access_layer = region_ids
.iter()
.find_map(|rid| mito_engine.find_region(*rid))
let access_layer = mito_engine
.find_region(region_id)
.with_context(|| InvalidGcArgsSnafu {
msg: format!(
"None of the regions is on current datanode:{:?}",
Expand All @@ -136,14 +153,25 @@ impl GcRegionsHandler {
})?
.access_layer();

let mito_regions = region_ids
.iter()
.map(|rid| {
mito_engine
.find_region(*rid)
.map(|r| (*rid, r))
.with_context(|| InvalidGcArgsSnafu {
msg: format!("Region {} not found on datanode", rid),
})
})
.collect::<Result<_>>()?;

let cache_manager = mito_engine.cache_manager();

let gc_worker = LocalGcWorker::try_new(
access_layer.clone(),
Some(cache_manager),
region_ids.into_iter().collect(),
mito_regions,
Default::default(),
mito_config.clone().into(),
file_ref_manifest.clone(),
&mito_engine.gc_limiter(),
full_file_listing,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl RegionServer {

let manifest_info = match manifest_info {
ManifestInfo::MitoManifestInfo(info) => {
RegionManifestInfo::mito(info.data_manifest_version, 0)
RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
}
ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
info.data_manifest_version,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
itertools.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
ordered-float.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
Expand Down
8 changes: 6 additions & 2 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,12 @@ pub enum Error {
impl Error {
/// Returns `true` if the error is retryable.
pub fn is_retryable(&self) -> bool {
matches!(self, Error::RetryLater { .. })
|| matches!(self, Error::RetryLaterWithSource { .. })
matches!(
self,
Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::MailboxTimeout { .. }
)
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/meta-srv/src/gc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod candidate;
mod handler;
mod mailbox;
mod options;
mod scheduler;
mod tracker;

pub(crate) use options::GcSchedulerOptions;
pub(crate) use scheduler::{GcScheduler, GcTickerRef};
123 changes: 123 additions & 0 deletions src/meta-srv/src/gc/candidate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::time::Instant;

use common_meta::datanode::{RegionManifestInfo, RegionStat};
use common_telemetry::{debug, info};
use ordered_float::OrderedFloat;
use store_api::storage::RegionId;
use table::metadata::TableId;

use crate::error::Result;
use crate::gc::scheduler::GcScheduler;

/// Represents a region candidate for GC with its priority score.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct GcCandidate {
pub(crate) region_id: RegionId,
pub(crate) score: OrderedFloat<f64>,
pub(crate) region_stat: RegionStat,
}

impl GcCandidate {
fn new(region_id: RegionId, score: f64, region_stat: RegionStat) -> Self {
Self {
region_id,
score: OrderedFloat(score),
region_stat,
}
}

#[allow(unused)]
fn score_f64(&self) -> f64 {
self.score.into_inner()
}
}

impl GcScheduler {
/// Calculate GC priority score for a region based on various metrics.
fn calculate_gc_score(&self, region_stat: &RegionStat) -> f64 {
let sst_count_score = region_stat.sst_num as f64 * self.config.sst_count_weight;

let file_removal_rate_score = match &region_stat.region_manifest {
RegionManifestInfo::Mito {
file_removal_rate, ..
} => *file_removal_rate as f64 * self.config.file_removal_rate_weight,
// Metric engine doesn't have file_removal_rate, also this should be unreachable since metrics engine doesn't support gc
RegionManifestInfo::Metric { .. } => 0.0,
};

sst_count_score + file_removal_rate_score
}

/// Filter and score regions that are candidates for GC, grouped by table.
pub(crate) async fn select_gc_candidates(
&self,
table_to_region_stats: &HashMap<TableId, Vec<RegionStat>>,
) -> Result<HashMap<TableId, Vec<GcCandidate>>> {
let mut table_candidates: HashMap<TableId, Vec<GcCandidate>> = HashMap::new();
let now = Instant::now();

for (table_id, region_stats) in table_to_region_stats {
let mut candidates = Vec::new();

for region_stat in region_stats {
// Skip regions that are too small
if region_stat.approximate_bytes < self.config.min_region_size_threshold {
continue;
}

// Skip regions that are in cooldown period
if let Some(gc_info) = self.region_gc_tracker.lock().await.get(&region_stat.id)
&& now.duration_since(gc_info.last_gc_time) < self.config.gc_cooldown_period
{
debug!("Skipping region {} due to cooldown", region_stat.id);
continue;
}

let score = self.calculate_gc_score(region_stat);

// Only consider regions with a meaningful score
if score > 0.0 {
candidates.push(GcCandidate::new(region_stat.id, score, region_stat.clone()));
}
}

// Sort candidates by score in descending order and take top N
candidates.sort_by(|a, b| b.score.cmp(&a.score));
let top_candidates: Vec<GcCandidate> = candidates
.into_iter()
.take(self.config.regions_per_table_threshold)
.collect();

if !top_candidates.is_empty() {
info!(
"Selected {} GC candidates for table {} (top {} out of all qualified)",
top_candidates.len(),
table_id,
self.config.regions_per_table_threshold
);
table_candidates.insert(*table_id, top_candidates);
}
}

info!(
"Selected GC candidates for {} tables",
table_candidates.len()
);
Ok(table_candidates)
}
}
Loading
Loading