Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions src/common/meta/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use store_api::region_engine::{RegionRole, RegionStatistic};
use store_api::storage::RegionId;
use table::metadata::TableId;

use crate::error;
use crate::error::Result;
use crate::error::{self, DeserializeFromJsonSnafu, Result};
use crate::heartbeat::utils::get_datanode_workloads;

const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
Expand Down Expand Up @@ -66,10 +65,12 @@ pub struct Stat {
pub node_epoch: u64,
/// The datanode workloads.
pub datanode_workloads: DatanodeWorkloads,
/// The GC statistics of the datanode.
pub gc_stat: Option<GcStat>,
}

/// The statistics of a region.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionStat {
/// The region_id.
pub id: RegionId,
Expand Down Expand Up @@ -126,7 +127,7 @@ pub trait TopicStatsReporter: Send + Sync {
fn reportable_topics(&mut self) -> Vec<TopicStat>;
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
Expand Down Expand Up @@ -222,18 +223,27 @@ impl TryFrom<&HeartbeatRequest> for Stat {
node_epoch,
node_workloads,
topic_stats,
extensions,
..
} = value;

match (header, peer) {
(Some(_header), Some(peer)) => {
(Some(header), Some(peer)) => {
let region_stats = region_stats
.iter()
.map(RegionStat::from)
.collect::<Vec<_>>();
let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();

let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());

let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
common_telemetry::error!(
"Failed to deserialize GcStat from extensions: {}",
err
);
header.clone()
})?;
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
// datanode id
Expand All @@ -247,6 +257,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
topic_stats,
node_epoch: *node_epoch,
datanode_workloads,
gc_stat,
})
}
(header, _) => Err(header.clone()),
Expand Down Expand Up @@ -319,6 +330,43 @@ impl From<&api::v1::meta::TopicStat> for TopicStat {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GcStat {
/// Number of GC tasks currently running on the datanode.
pub running_gc_tasks: u32,
/// The maximum number of concurrent GC tasks the datanode can handle.
pub gc_concurrency: u32,
}

impl GcStat {
pub const GC_STAT_KEY: &str = "__gc_stat";

pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
Self {
running_gc_tasks,
gc_concurrency,
}
}

pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
let bytes = serde_json::to_vec(self).unwrap_or_default();
extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
}

pub fn from_extensions(
extensions: &std::collections::HashMap<String, Vec<u8>>,
) -> Result<Option<Self>> {
extensions
.get(Self::GC_STAT_KEY)
.map(|bytes| {
serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(bytes).to_string(),
})
})
.transpose()
}
}

/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-0-{node_id}`.
Expand Down
122 changes: 121 additions & 1 deletion src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};
use std::time::Duration;

use serde::{Deserialize, Deserializer, Serialize};
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
use table::table_name::TableName;
Expand Down Expand Up @@ -413,6 +413,88 @@ where
})
}

/// Instruction to get file references for specified regions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GetFileRefs {
/// List of region IDs to get file references for.
pub region_ids: Vec<RegionId>,
}

impl Display for GetFileRefs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "GetFileRefs(region_ids={:?})", self.region_ids)
}
}

/// Instruction to trigger garbage collection for a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GcRegions {
/// The region ID to perform GC on.
pub regions: Vec<RegionId>,
/// The file references manifest containing temporary file references.
pub file_refs_manifest: FileRefsManifest,
/// Whether to perform a full file listing to find orphan files.
pub full_file_listing: bool,
}

impl Display for GcRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
self.regions,
self.file_refs_manifest.file_refs.len(),
self.full_file_listing
)
}
}

/// Reply for GetFileRefs instruction.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GetFileRefsReply {
/// The file references manifest.
pub file_refs_manifest: FileRefsManifest,
/// Whether the operation was successful.
pub success: bool,
/// Error message if any.
pub error: Option<String>,
}

impl Display for GetFileRefsReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
self.success,
self.file_refs_manifest.file_refs.len(),
self.error
)
}
}

/// Reply for GC instruction.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GcRegionsReply {
pub result: Result<GcReport, String>,
}

impl Display for GcRegionsReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GcReply(result={})",
match &self.result {
Ok(report) => format!(
"GcReport(deleted_files_count={}, need_retry_regions_count={})",
report.deleted_files.len(),
report.need_retry_regions.len()
),
Err(err) => format!("Err({})", err),
}
)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens regions.
Expand All @@ -429,6 +511,10 @@ pub enum Instruction {
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegions(FlushRegions),
/// Gets file references for regions.
GetFileRefs(GetFileRefs),
/// Triggers garbage collection for a region.
GcRegions(GcRegions),
}

/// The reply of [UpgradeRegion].
Expand Down Expand Up @@ -462,6 +548,8 @@ pub enum InstructionReply {
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegions(FlushRegionReply),
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
}

impl Display for InstructionReply {
Expand All @@ -474,6 +562,8 @@ impl Display for InstructionReply {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
}
}
}
Expand All @@ -497,6 +587,10 @@ impl InstructionReply {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use store_api::storage::FileId;

use super::*;

#[test]
Expand Down Expand Up @@ -756,4 +850,30 @@ mod tests {
_ => panic!("Expected FlushRegions instruction"),
}
}

#[test]
fn test_serialize_get_file_refs_instruction_reply() {
let mut manifest = FileRefsManifest::default();
let r0 = RegionId::new(1024, 1);
let r1 = RegionId::new(1024, 2);
manifest
.file_refs
.insert(r0, HashSet::from([FileId::random()]));
manifest
.file_refs
.insert(r1, HashSet::from([FileId::random()]));
manifest.manifest_version.insert(r0, 10);
manifest.manifest_version.insert(r1, 20);

let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest: manifest,
success: true,
error: None,
});

let serialized = serde_json::to_string(&instruction_reply).unwrap();
let deserialized = serde_json::from_str(&serialized).unwrap();

assert_eq!(instruction_reply, deserialized);
}
}
25 changes: 21 additions & 4 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,21 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to run gc for region {}", region_id))]
GcMitoEngine {
region_id: RegionId,
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid arguments for GC: {}", msg))]
InvalidGcArgs {
msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to list SST entries from storage"))]
ListStorageSsts {
#[snafu(implicit)]
Expand Down Expand Up @@ -438,9 +453,11 @@ impl ErrorExt for Error {

AsyncTaskExecute { source, .. } => source.status_code(),

CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
StatusCode::Internal
}
CreateDir { .. }
| RemoveDir { .. }
| ShutdownInstance { .. }
| DataFusion { .. }
| InvalidGcArgs { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
Expand All @@ -458,7 +475,7 @@ impl ErrorExt for Error {
StopRegionEngine { source, .. } => source.status_code(),

FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(),
BuildMetricEngine { source, .. } => source.status_code(),
ListStorageSsts { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
Expand Down
21 changes: 19 additions & 2 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ use common_workload::DatanodeWorkloadType;
use meta_client::MetaClientRef;
use meta_client::client::{HeartbeatSender, MetaClient};
use servers::addrs;
use snafu::ResultExt;
use snafu::{OptionExt as _, ResultExt};
use tokio::sync::{Notify, mpsc};
use tokio::time::Instant;

use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
Expand Down Expand Up @@ -239,12 +239,18 @@ impl HeartbeatTask {
let mut last_sent = Instant::now();
let cpus = self.resource_spec.cpus as u32;
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
let gc_limiter = self
.region_server
.mito_engine()
.context(RegionEngineNotFoundSnafu { name: "mito" })?
.gc_limiter();

common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

let build_info = common_version::build_info();

let heartbeat_request = HeartbeatRequest {
peer: self_peer,
node_epoch,
Expand Down Expand Up @@ -275,8 +281,13 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let mut extensions = heartbeat_request.extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);

let req = HeartbeatRequest {
mailbox_message: Some(message),
extensions,
..heartbeat_request.clone()
};
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
Expand All @@ -297,10 +308,16 @@ impl HeartbeatTask {
let topic_stats = region_server_clone.topic_stats();
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;

let mut extensions = heartbeat_request.extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);

let req = HeartbeatRequest {
region_stats,
topic_stats,
duration_since_epoch,
extensions,
..heartbeat_request.clone()
};
sleep.as_mut().reset(now + Duration::from_millis(interval));
Expand Down
Loading
Loading