diff --git a/Cargo.lock b/Cargo.lock index c2bad3d97129..4250632b5dec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3934,6 +3934,7 @@ dependencies = [ "mito2", "num_cpus", "object-store", + "partition", "prometheus", "prost 0.13.5", "query", @@ -5324,7 +5325,6 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", @@ -6922,7 +6922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -7428,6 +7428,7 @@ dependencies = [ "local-ip-address", "once_cell", "parking_lot 0.12.4", + "partition", "prometheus", "prost 0.13.5", "rand 0.9.1", @@ -9859,7 +9860,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -9905,7 +9906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.106", @@ -14510,7 +14511,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ebafce51bae9..be7ecab8a5c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,8 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" } +# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" } +greptime-proto = { path = "../greptime-proto" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 3ea7627f9c6e..582e12eb89fb 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true } tonic.workspace = true tracing.workspace = true typetag.workspace = true +uuid.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 265ede339e25..8cd67f44bd58 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -53,6 +53,7 @@ metric-engine.workspace = true mito2.workspace = true num_cpus.workspace = true object-store.workspace = true +partition.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a2e6f674e2e7..da26ce018fea 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -19,6 +19,7 @@ use common_error::define_into_tonic_status; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use mito2::remap_manifest::Error as RemapManifestError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::error::Error as TableError; @@ -396,6 +397,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to remap manifests: {}", source))] + RemapManifest { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: RemapManifestError, + }, + #[snafu(display("Not yet implemented: {what}"))] NotYetImplemented { what: String }, } @@ -469,6 +478,7 @@ impl ErrorExt for Error { ObjectStore { source, .. } => source.status_code(), BuildCacheStore { .. } => StatusCode::StorageUnavailable, + RemapManifest { .. } => StatusCode::Unexpected, } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 341ee9442c4c..d2df5a4793d3 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -24,7 +24,9 @@ use api::region::RegionResponse; use api::v1::meta::TopicStat; use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ - ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request, + ApplyStagedManifestRequest, ListMetadataRequest, PauseRequest, PublishRegionRuleRequest, + RegionResponse as RegionResponseV1, RemapManifestRequest, ResumeRequest, + StageRegionRuleRequest, SyncRequest, region_request, }; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; @@ -84,6 +86,8 @@ use crate::error::{ use crate::event_listener::RegionServerEventListenerRef; use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; +const REMAP_STATS_EXTENSION_KEY: &str = "repartition.manifest.stats"; + #[derive(Clone)] pub struct RegionServer { inner: Arc, @@ -370,6 +374,24 @@ impl RegionServer { } } + /// Temporarily pauses compaction and snapshot related activities for the region. + /// + /// Currently a stub; real implementation will coordinate with region worker. + pub async fn pause_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> { + info!("pause_compaction_and_snapshot stub invoked for region {region_id}"); + let _ = region_id; + Ok(()) + } + + /// Resumes compaction and snapshot related activities for the region. + /// + /// Currently a stub; real implementation will coordinate with region worker. + pub async fn resume_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> { + info!("resume_compaction_and_snapshot stub invoked for region {region_id}"); + let _ = region_id; + Ok(()) + } + /// Stop the region server. pub async fn stop(&self) -> Result<()> { self.inner.stop().await @@ -538,6 +560,124 @@ impl RegionServer { Ok(response) } + async fn handle_pause_region_request(&self, request: &PauseRequest) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let tracing_context = TracingContext::from_current_span(); + let span = tracing_context.attach(info_span!( + "RegionServer::handle_pause_region_request", + region_id = region_id.to_string() + )); + + self.pause_compaction_and_snapshot(region_id) + .trace(span) + .await + .map(|_| RegionResponse::new(AffectedRows::default())) + } + + async fn handle_resume_region_request( + &self, + request: &ResumeRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let tracing_context = TracingContext::from_current_span(); + let span = tracing_context.attach(info_span!( + "RegionServer::handle_resume_region_request", + region_id = region_id.to_string() + )); + + self.resume_compaction_and_snapshot(region_id) + .trace(span) + .await + .map(|_| RegionResponse::new(AffectedRows::default())) + } + + async fn handle_stage_region_rule_request( + &self, + request: &StageRegionRuleRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + info!( + "Stage region rule for region {region_id} with version {}", + request.rule_version + ); + match self + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .await? + { + SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => { + Ok(RegionResponse::new(AffectedRows::default())) + } + SetRegionRoleStateResponse::InvalidTransition(err) => { + Err(err).with_context(|_| HandleRegionRequestSnafu { region_id }) + } + } + } + + async fn handle_publish_region_rule_request( + &self, + request: &PublishRegionRuleRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + info!( + "Publish region rule for region {region_id} with version {}", + request.rule_version + ); + match self + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader) + .await? + { + SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => { + Ok(RegionResponse::new(AffectedRows::default())) + } + SetRegionRoleStateResponse::InvalidTransition(err) => { + Err(err).with_context(|_| HandleRegionRequestSnafu { region_id }) + } + } + } + + async fn handle_remap_manifest_request( + &self, + request: &RemapManifestRequest, + ) -> Result { + info!( + "received remap manifest request for table {} group {}", + request.table_id, request.group_id + ); + + let stats_json = serde_json::to_vec(&serde_json::json!({ + "files_per_region": HashMap::::new(), + "total_file_refs": 0u64, + "empty_regions": Vec::::new(), + "group_id": &request.group_id, + })) + .context(SerializeJsonSnafu)?; + + let mut extensions = HashMap::new(); + extensions.insert(REMAP_STATS_EXTENSION_KEY.to_string(), stats_json); + + Ok(RegionResponse { + affected_rows: 0, + extensions, + metadata: Vec::new(), + }) + } + + async fn handle_apply_staged_manifest_request( + &self, + request: &ApplyStagedManifestRequest, + ) -> Result { + info!( + "received manifest apply request for table {} group {} publish={} regions {:?}", + request.table_id, request.group_id, request.publish, request.region_ids + ); + + Ok(RegionResponse { + affected_rows: 0, + extensions: HashMap::new(), + metadata: Vec::new(), + }) + } + /// Sync region manifest and registers new opened logical regions. pub async fn sync_region( &self, @@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer { region_request::Body::Sync(sync_request) => { self.handle_sync_region_request(sync_request).await } + region_request::Body::Pause(pause_request) => { + self.handle_pause_region_request(pause_request).await + } + region_request::Body::Resume(resume_request) => { + self.handle_resume_region_request(resume_request).await + } + region_request::Body::StageRegionRule(stage_request) => { + self.handle_stage_region_rule_request(stage_request).await + } + region_request::Body::PublishRegionRule(publish_request) => { + self.handle_publish_region_rule_request(publish_request) + .await + } + region_request::Body::RemapManifest(remap_request) => { + self.handle_remap_manifest_request(remap_request).await + } + region_request::Body::ApplyStagedManifest(apply_request) => { + self.handle_apply_staged_manifest_request(apply_request) + .await + } region_request::Body::ListMetadata(list_metadata_request) => { self.handle_list_metadata_request(list_metadata_request) .await diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index bd2075501c98..87f03ffbd7dc 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -45,6 +45,7 @@ common-time.workspace = true common-version.workspace = true common-wal.workspace = true common-workload.workspace = true +partition.workspace = true dashmap.workspace = true datatypes.workspace = true deadpool = { workspace = true, optional = true } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 2f4756c2aec3..dfdcb79d6072 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -17,12 +17,14 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_meta::DatanodeId; +use common_procedure::ProcedureId; use common_runtime::JoinError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use uuid::Uuid; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -774,6 +776,129 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to create repartition subtasks"))] + RepartitionCreateSubtasks { + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serialize partition expression"))] + RepartitionSerializePartitionExpr { + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Source partition expression '{}' does not match any existing region", + expr + ))] + RepartitionSourceExprMismatch { + expr: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} is missing a source region id", group_id))] + RepartitionMissingSourceRegionId { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} cannot find route for source region {}", + group_id, + region_id + ))] + RepartitionSourceRegionRouteMissing { + group_id: Uuid, + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} has no source regions after planning", group_id))] + RepartitionNoSourceRegions { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} target {} is missing an allocated region id", + group_id, + target_index + ))] + RepartitionMissingTargetRegionId { + group_id: Uuid, + target_index: usize, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Route for target region {} not found", region_id))] + RepartitionTargetRegionRouteMissing { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} is missing prepare context", group_id))] + RepartitionMissingPrepareContext { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} has no registered subprocedure", group_id))] + RepartitionSubprocedureUnknown { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Failed to fetch state for repartition group {} subprocedure {}", + group_id, + procedure_id + ))] + RepartitionSubprocedureStateFetch { + group_id: Uuid, + procedure_id: ProcedureId, + #[snafu(source)] + source: common_procedure::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} subprocedure {} state missing", + group_id, + procedure_id + ))] + RepartitionSubprocedureStateMissing { + group_id: Uuid, + procedure_id: ProcedureId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} subprocedure {} failed: {}", + group_id, + procedure_id, + reason + ))] + RepartitionSubprocedureFailed { + group_id: Uuid, + procedure_id: ProcedureId, + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unsupported operation {}", operation))] Unsupported { operation: String, @@ -997,6 +1122,11 @@ impl Error { matches!(self, Error::RetryLater { .. }) || matches!(self, Error::RetryLaterWithSource { .. }) } + + /// Returns `true` if the error requires cleaning poison records. + pub fn need_clean_poisons(&self) -> bool { + false + } } pub type Result = std::result::Result; @@ -1012,6 +1142,8 @@ impl ErrorExt for Error { | Error::TcpBind { .. } | Error::SerializeToJson { .. } | Error::DeserializeFromJson { .. } + | Error::RepartitionCreateSubtasks { .. } + | Error::RepartitionSerializePartitionExpr { .. } | Error::NoLeader { .. } | Error::LeaderLeaseExpired { .. } | Error::LeaderLeaseChanged { .. } @@ -1032,7 +1164,8 @@ impl ErrorExt for Error { | Error::FlowStateHandler { .. } | Error::BuildWalOptionsAllocator { .. } | Error::BuildPartitionClient { .. } - | Error::BuildKafkaClient { .. } => StatusCode::Internal, + | Error::BuildKafkaClient { .. } + | Error::RepartitionSubprocedureStateFetch { .. } => StatusCode::Internal, Error::DeleteRecords { .. } | Error::GetOffset { .. } @@ -1066,7 +1199,14 @@ impl ErrorExt for Error { | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } | Error::HandlerNotFound { .. } - | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments, + | Error::LeaderPeerChanged { .. } + | Error::RepartitionSourceExprMismatch { .. } + | Error::RepartitionMissingSourceRegionId { .. } + | Error::RepartitionSourceRegionRouteMissing { .. } + | Error::RepartitionNoSourceRegions { .. } + | Error::RepartitionMissingTargetRegionId { .. } + | Error::RepartitionTargetRegionRouteMissing { .. } + | Error::RepartitionMissingPrepareContext { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } @@ -1080,7 +1220,10 @@ impl ErrorExt for Error { | Error::RegionRouteNotFound { .. } | Error::MigrationAbort { .. } | Error::MigrationRunning { .. } - | Error::RegionMigrated { .. } => StatusCode::Unexpected, + | Error::RegionMigrated { .. } + | Error::RepartitionSubprocedureUnknown { .. } + | Error::RepartitionSubprocedureStateMissing { .. } + | Error::RepartitionSubprocedureFailed { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::SaveClusterInfo { source, .. } | Error::InvalidClusterInfoFormat { source, .. } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 88869d84827d..da1a1b00e7ab 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef; use snafu::ResultExt; pub mod region_migration; +pub mod repartition; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs new file mode 100644 index 000000000000..ee4daa66e75b --- /dev/null +++ b/src/meta-srv/src/procedure/repartition.rs @@ -0,0 +1,506 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod context; +mod group; +mod plan; + +use std::collections::HashMap; + +use common_meta::ddl::DdlContext; +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock}; +use common_procedure::error::{Error as ProcedureError, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, ProcedureId, ProcedureWithId, Status, +}; +use common_telemetry::error; +use partition::expr::PartitionExpr; +use partition::subtask::{self, RepartitionSubtask}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; +use strum::AsRefStr; +use uuid::Uuid; + +use self::context::{GroupManifestSummary, RepartitionContext}; +use self::group::RepartitionGroupProcedure; +use self::plan::{PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand}; +use crate::error::{self, Result}; + +/// Task payload passed from the DDL entry point. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepartitionTask { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, + /// Partition expressions representing the source regions. + pub from_exprs: Vec, + /// Partition expressions representing the target regions. + pub into_exprs: Vec, +} + +/// Procedure that orchestrates the repartition flow. +pub struct RepartitionProcedure { + context: DdlContext, + group_context: RepartitionContext, + data: RepartitionData, +} + +impl RepartitionProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; + + /// Constructs a new procedure instance from a task payload. + pub fn new(task: RepartitionTask, context: DdlContext) -> Result { + let group_context = RepartitionContext::new(&context); + Ok(Self { + context, + group_context, + data: RepartitionData::new(task), + }) + } + + /// Builds the repartition plan if we have not done so yet. + async fn on_prepare(&mut self) -> Result { + if self.data.plan.is_none() { + self.build_plan().await?; + } + + self.data.state = RepartitionState::AllocateResources; + Ok(Status::executing(true)) + } + + /// Allocates target regions and decides whether the procedure can proceed. + async fn on_allocate_resources(&mut self) -> Result { + if !self.data.resource_allocated { + let allocated = self.allocate_resources().await?; + if !allocated { + if let Some(plan) = &self.data.plan { + let failed_groups = plan.entries.iter().map(|entry| entry.group_id); + self.data.failed_groups.extend(failed_groups); + } + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + self.data.resource_allocated = true; + } + + self.data.state = RepartitionState::DispatchSubprocedures; + Ok(Status::executing(true)) + } + + /// Spawns group subprocedures for every pending plan entry. + async fn on_dispatch_subprocedures(&mut self) -> Result { + let plan = match self.data.plan.as_ref() { + Some(plan) => plan, + None => { + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + }; + + let entries_to_schedule: Vec = plan + .entries + .iter() + .filter(|entry| { + !self.data.succeeded_groups.contains(&entry.group_id) + && !self.data.failed_groups.contains(&entry.group_id) + }) + .cloned() + .collect(); + + if entries_to_schedule.is_empty() { + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + + let groups_to_schedule: Vec = entries_to_schedule + .iter() + .map(|entry| entry.group_id) + .collect(); + + let subprocedures = self.spawn_group_procedures( + plan.table_id, + plan.route_snapshot.clone(), + entries_to_schedule, + ); + self.data.pending_groups = groups_to_schedule; + self.data.state = RepartitionState::CollectSubprocedures; + + Ok(Status::suspended(subprocedures, true)) + } + + /// Records the list of subprocedures that finished and move to finalisation. + async fn on_collect_subprocedures(&mut self, ctx: &ProcedureContext) -> Result { + let pending = std::mem::take(&mut self.data.pending_groups); + let mut first_error: Option = None; + let mut succeeded = Vec::new(); + + for group_id in pending { + let procedure_id = match self.data.group_subprocedures.remove(&group_id) { + Some(id) => id, + None => { + let err = error::RepartitionSubprocedureUnknownSnafu { group_id }.build(); + self.data.failed_groups.push(group_id); + if first_error.is_none() { + first_error = Some(err); + } + continue; + } + }; + + let state_opt = ctx.provider.procedure_state(procedure_id).await.context( + error::RepartitionSubprocedureStateFetchSnafu { + group_id, + procedure_id, + }, + )?; + + let state = match state_opt { + Some(state) => state, + None => { + let err = error::RepartitionSubprocedureStateMissingSnafu { + group_id, + procedure_id, + } + .build(); + self.data.failed_groups.push(group_id); + if first_error.is_none() { + first_error = Some(err); + } + continue; + } + }; + + if state.is_done() { + succeeded.push(group_id); + continue; + } + + let reason = state + .error() + .map(|err| err.to_string()) + .unwrap_or_else(|| format!("subprocedure state {}", state.as_str_name())); + let err = error::RepartitionSubprocedureFailedSnafu { + group_id, + procedure_id, + reason, + } + .build(); + self.data.failed_groups.push(group_id); + if first_error.is_none() { + first_error = Some(err); + } + } + + self.data.succeeded_groups.extend(succeeded); + self.data.state = RepartitionState::Finalize; + + if let Some(err) = first_error { + return Err(err); + } + + Ok(Status::executing(true)) + } + + /// Builds the summary that will be returned to the caller. + async fn on_finalize(&mut self) -> Result { + self.deallocate_resources().await?; + + self.data.summary = Some(RepartitionSummary { + succeeded_groups: self.data.succeeded_groups.clone(), + failed_groups: self.data.failed_groups.clone(), + manifest_summaries: self.group_context.manifest_summaries(), + }); + self.group_context.clear_group_records(); + self.data.state = RepartitionState::Finished; + Ok(Status::done()) + } + + /// Constructs the repartition plan from the task specification. + async fn build_plan(&mut self) -> Result<()> { + let table_id = self.data.task.table_id; + let from_exprs = &self.data.task.from_exprs; + let into_exprs = &self.data.task.into_exprs; + + let (physical_table_id, physical_route) = self + .context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .context(error::TableMetadataManagerSnafu)?; + + let src_descriptors = Self::source_region_descriptors(from_exprs, &physical_route)?; + let subtasks = subtask::create_subtasks(from_exprs, into_exprs) + .context(error::RepartitionCreateSubtasksSnafu)?; + let entries = Self::build_plan_entries(subtasks, &src_descriptors, into_exprs); + + let demand = ResourceDemand::from_plan_entries(&entries); + let plan = RepartitionPlan::new(physical_table_id, entries, demand, physical_route.clone()); + self.data.plan = Some(plan); + + Ok(()) + } + + fn source_region_descriptors( + from_exprs: &[PartitionExpr], + physical_route: &PhysicalTableRouteValue, + ) -> Result> { + let existing_regions = physical_route + .region_routes + .iter() + .map(|route| (route.region.id, route.region.partition_expr())) + .collect::>(); + + let descriptors = from_exprs + .iter() + .map(|expr| { + let expr_json = expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?; + + let matched_region_id = existing_regions + .iter() + .find_map(|(region_id, existing_expr)| { + (existing_expr == &expr_json).then_some(*region_id) + }) + .with_context(|| error::RepartitionSourceExprMismatchSnafu { + expr: expr_json, + })?; + + Ok(RegionDescriptor { + region_id: Some(matched_region_id), + partition_expr: expr.clone(), + }) + }) + .collect::>>()?; + + Ok(descriptors) + } + + fn build_plan_entries( + subtasks: Vec, + source_index: &[RegionDescriptor], + target_exprs: &[PartitionExpr], + ) -> Vec { + let plan_entries = subtasks + .into_iter() + .map(|subtask| { + let group_id = Uuid::new_v4(); + let sources = subtask + .from_expr_indices + .iter() + .map(|&idx| source_index[idx].clone()) + .collect::>(); + + let targets = subtask + .to_expr_indices + .iter() + .map(|&idx| RegionDescriptor { + region_id: None, // will be assigned later + partition_expr: target_exprs[idx].clone(), + }) + .collect::>(); + + PlanEntry::new(group_id, subtask, sources, targets) + }) + .collect::>(); + + plan_entries + } + + /// Allocates resources required by the plan. Returning `false` + /// indicates that the procedure should abort. + async fn allocate_resources(&mut self) -> Result { + todo!("allocate resources"); + } + + async fn deallocate_resources(&mut self) -> Result<()> { + if !self.data.resource_allocated { + return Ok(()); + } + self.data.resource_allocated = false; + + todo!("deallocate resources"); + } + + /// Builds the child procedure list for the provided plan groups. + fn spawn_group_procedures( + &mut self, + table_id: TableId, + route_snapshot: PhysicalTableRouteValue, + entries: Vec, + ) -> Vec { + let mut id_map = HashMap::new(); + + let procedures = entries + .into_iter() + .map(|entry| { + let group_id = entry.group_id; + let group_procedure = RepartitionGroupProcedure::new( + entry, + table_id, + route_snapshot.clone(), + self.data.task.catalog_name.clone(), + self.data.task.schema_name.clone(), + self.group_context.clone(), + ); + let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure)); + id_map.insert(group_id, procedure.id); + procedure + }) + .collect::>(); + + self.data.group_subprocedures = id_map; + procedures + } + + /// Composes the set of locks required to safely mutate table metadata. + fn table_lock_key(&self) -> Vec { + let mut lock_key = Vec::with_capacity(3); + let catalog = self.data.task.catalog_name.as_str(); + let schema = self.data.task.schema_name.as_str(); + lock_key.push(CatalogLock::Read(catalog).into()); + lock_key.push(SchemaLock::read(catalog, schema).into()); + lock_key.push(TableLock::Write(self.data.task.table_id).into()); + + lock_key + } + + async fn trigger_group_rollbacks(&mut self) { + if self.data.rollback_triggered { + return; + } + + match self.group_context.rollback_registered_groups().await { + Ok(_) => { + self.data.rollback_triggered = true; + } + Err(err) => { + error!(err; "repartition: rollback of successful groups failed"); + self.data.rollback_triggered = true; + } + } + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { + let state = self.data.state; + let status = match state { + RepartitionState::Prepare => self.on_prepare().await, + RepartitionState::AllocateResources => self.on_allocate_resources().await, + RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await, + RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await, + RepartitionState::Finalize => self.on_finalize().await, + RepartitionState::Finished => Ok(Status::done()), + }; + + match status { + Ok(status) => Ok(status), + Err(err) => { + self.trigger_group_rollbacks().await; + if let Err(dealloc_err) = self.deallocate_resources().await { + error!(dealloc_err; "repartition: deallocating resources after failure failed"); + } + Err(map_repartition_error(err)) + } + } + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(self.table_lock_key()) + } +} + +/// Serialized data of the repartition procedure. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RepartitionData { + state: RepartitionState, + task: RepartitionTask, + #[serde(default)] + plan: Option, + #[serde(default)] + resource_allocated: bool, + #[serde(default)] + pending_groups: Vec, + #[serde(default)] + succeeded_groups: Vec, + #[serde(default)] + failed_groups: Vec, + #[serde(default)] + summary: Option, + #[serde(default)] + rollback_triggered: bool, + #[serde(default)] + group_subprocedures: HashMap, +} + +impl RepartitionData { + /// Initialise the procedure data for a fresh run. + fn new(task: RepartitionTask) -> Self { + Self { + state: RepartitionState::Prepare, + task, + plan: None, + resource_allocated: false, + pending_groups: Vec::new(), + succeeded_groups: Vec::new(), + failed_groups: Vec::new(), + summary: None, + rollback_triggered: false, + group_subprocedures: HashMap::new(), + } + } +} + +pub(super) fn map_repartition_error(err: error::Error) -> ProcedureError { + match (err.is_retryable(), err.need_clean_poisons()) { + (true, true) => ProcedureError::retry_later_and_clean_poisons(err), + (true, false) => ProcedureError::retry_later(err), + (false, true) => ProcedureError::external_and_clean_poisons(err), + (false, false) => ProcedureError::external(err), + } +} + +/// High level states of the repartition procedure. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] +enum RepartitionState { + Prepare, + AllocateResources, + DispatchSubprocedures, + CollectSubprocedures, + Finalize, + Finished, +} + +/// Information returned to the caller after the procedure finishes. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct RepartitionSummary { + succeeded_groups: Vec, + failed_groups: Vec, + #[serde(default)] + manifest_summaries: Vec, +} diff --git a/src/meta-srv/src/procedure/repartition/context.rs b/src/meta-srv/src/procedure/repartition/context.rs new file mode 100644 index 000000000000..179ce74d36db --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/context.rs @@ -0,0 +1,351 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use api::region::RegionResponse; +use api::v1::region::{ + ApplyStagedManifestRequest, PauseRequest, PublishRegionRuleRequest, RegionRequest, + RegionRequestHeader, RemapManifestRequest, ResumeRequest, StageRegionRuleRequest, + region_request, +}; +use common_error::ext::BoxedError; +use common_meta::ddl::DdlContext; +use common_meta::key::TableMetadataManagerRef; +use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::Peer; +use common_telemetry::{error, info}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; + +pub const REMAP_MANIFEST_STATS_EXTENSION: &str = "repartition.manifest.stats"; + +use super::group::{GroupRollbackRecord, RepartitionGroupProcedure}; +use crate::procedure::repartition::plan::PlanGroupId; + +/// Track the overall manifest stage for a repartition group. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum ManifestStatus { + #[default] + NotStarted, + Staged, + Published, + Discarded, + Skipped, + Failed, +} + +/// Per-group status record that is collected by the top-level procedure. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GroupManifestSummary { + pub group_id: PlanGroupId, + pub status: ManifestStatus, + pub staged_region_count: u64, + pub stats: Option, + pub error: Option, +} + +/// Shared context that allows group procedures to interact with metadata and +/// datanodes. It also aggregates per-group manifest summaries. +#[derive(Clone)] +pub struct RepartitionContext { + pub table_metadata_manager: TableMetadataManagerRef, + pub node_manager: NodeManagerRef, + manifest_records: Arc>>, + rollback_records: Arc>>, +} + +impl RepartitionContext { + pub fn new(context: &DdlContext) -> Self { + Self { + table_metadata_manager: context.table_metadata_manager.clone(), + node_manager: context.node_manager.clone(), + manifest_records: Arc::new(Mutex::new(HashMap::new())), + rollback_records: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Send a pause request to the region leader so that local IO is quiesced. + pub async fn pause_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> { + info!( + "requesting pause to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::Pause(PauseRequest { + region_id: region_id.as_u64(), + })), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to pause region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Resume a previously paused region. + pub async fn resume_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> { + info!( + "requesting resume to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::Resume(ResumeRequest { + region_id: region_id.as_u64(), + rule_version: String::new(), + })), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to resume region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Stage the provided rule version on the datanode. + pub async fn stage_region_rule_on_datanode( + &self, + peer: &Peer, + region_id: RegionId, + rule_version: &str, + ) -> Result<()> { + info!( + "requesting region rule staging to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::StageRegionRule( + StageRegionRuleRequest { + region_id: region_id.as_u64(), + rule_version: rule_version.to_string(), + }, + )), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to stage region rule for region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Publish the staged rule version to make it active. + pub async fn publish_region_rule_on_datanode( + &self, + peer: &Peer, + region_id: RegionId, + rule_version: &str, + ) -> Result<()> { + info!( + "requesting region rule publish to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::PublishRegionRule( + PublishRegionRuleRequest { + region_id: region_id.as_u64(), + rule_version: rule_version.to_string(), + }, + )), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to publish region rule for region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Drop the staged rule version during rollback. + pub async fn clear_region_rule_stage_on_datanode( + &self, + peer: &Peer, + region_id: RegionId, + ) -> Result<()> { + info!( + "requesting region rule stage clear to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::StageRegionRule( + StageRegionRuleRequest { + region_id: region_id.as_u64(), + rule_version: String::new(), + }, + )), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to clear staged region rule for region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Instruct the datanode to remap manifests for this group. + pub async fn remap_manifests_on_datanode( + &self, + peer: &Peer, + manifest_request: RemapManifestRequest, + ) -> Result { + let table_id = manifest_request.table_id; + let group_id = manifest_request.group_id.clone(); + info!( + "requesting manifest remap to datanode {} for table {} in group {}", + peer.id, table_id, group_id + ); + let datanode = self.node_manager.datanode(peer).await; + let region_request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::RemapManifest(manifest_request)), + }; + let response = datanode + .handle(region_request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to remap manifests for group {} on datanode {}", + group_id, peer.id + ), + })?; + Ok(response) + } + + /// Publish or discard staged manifests. + pub async fn apply_staged_manifests_on_datanode( + &self, + peer: &Peer, + manifest_request: ApplyStagedManifestRequest, + ) -> Result { + let publish = manifest_request.publish; + let table_id = manifest_request.table_id; + let group_id = manifest_request.group_id.clone(); + info!( + "requesting manifest {} on datanode {} for table {} in group {}", + if publish { "publish" } else { "discard" }, + peer.id, + table_id, + group_id + ); + let datanode = self.node_manager.datanode(peer).await; + let region_request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::ApplyStagedManifest(manifest_request)), + }; + let response = datanode + .handle(region_request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to {} staged manifests for group {} on datanode {}", + if publish { "publish" } else { "discard" }, + group_id, + peer.id + ), + })?; + Ok(response) + } + + /// Store the latest manifest summary for a group. + pub fn record_manifest_summary(&self, summary: GroupManifestSummary) { + let mut records = self.manifest_records.lock().unwrap(); + records.insert(summary.group_id, summary); + } + + pub fn register_group_success(&self, record: GroupRollbackRecord) { + let mut records = self.rollback_records.lock().unwrap(); + let group_id = record.group_id; + records.insert(group_id, record); + } + + pub async fn rollback_registered_groups(&self) -> Result<()> { + let records: Vec = { + let mut map = self.rollback_records.lock().unwrap(); + map.drain().map(|(_, record)| record).collect() + }; + + let mut first_err: Option = None; + for record in records { + let group_id = record.group_id; + if let Err(err) = + RepartitionGroupProcedure::execute_rollback(self.clone(), record).await + { + error!(err; "repartition: rollback of group {:?} failed", group_id); + if first_err.is_none() { + first_err = Some(err); + } + } + } + + if let Some(err) = first_err { + return Err(err); + } + + Ok(()) + } + + pub fn clear_group_records(&self) { + self.rollback_records.lock().unwrap().clear(); + } + + /// Collect all manifest summaries recorded so far. + pub fn manifest_summaries(&self) -> Vec { + let records = self.manifest_records.lock().unwrap(); + records.values().cloned().collect() + } +} diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs new file mode 100644 index 000000000000..e9cd13ac520b --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -0,0 +1,1406 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeSet, HashMap}; + +use api::region::RegionResponse; +use api::v1::region::{ + ApplyStagedManifestRequest, RemapManifestRequest, RemapManifestSource, RemapManifestTarget, +}; +use common_meta::key::DeserializedValueWithBytes; +use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo}; +use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock}; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_procedure::error::{Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::{debug, info}; +use serde::{Deserialize, Serialize}; +use serde_json::{self, Value}; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::{RegionId, TableId}; +use strum::AsRefStr; + +use crate::error::{self, Result}; +use crate::procedure::repartition::context::{ + GroupManifestSummary, ManifestStatus, REMAP_MANIFEST_STATS_EXTENSION, RepartitionContext, +}; +use crate::procedure::repartition::plan::{PlanEntry, PlanGroupId, RegionDescriptor}; + +/// Logical states executed by the group procedure state machine. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] +pub enum GroupState { + Prepare, + Freeze, + UpdateMetadata, + UpdateRegionRule, + UpdateManifests, + Confirm, + Cleanup, + Finished, +} + +/// Persisted snapshot of group execution, replayed by the procedure framework +/// when the workflow is resumed. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct GroupProcedureData { + table_id: TableId, + entry: PlanEntry, + state: GroupState, + route_snapshot: PhysicalTableRouteValue, + #[serde(skip)] + route_raw: Option>, + #[serde(skip)] + region_info: Option, + #[serde(default)] + prepare_result: Option, + #[serde(default)] + freeze_completed: bool, + #[serde(default)] + metadata_updated: bool, + #[serde(default)] + staged_regions: Vec, + #[serde(default)] + region_rule_updated: bool, + #[serde(default)] + region_rule_version: Option, + #[serde(default)] + manifests_generated: bool, + #[serde(default)] + manifest_stats: Option, + #[serde(default)] + catalog_name: String, + #[serde(default)] + schema_name: String, + #[serde(default)] + manifest_summary: GroupManifestSummary, + #[serde(default)] + route_committed: bool, + #[serde(default)] + rollback_registered: bool, +} + +impl GroupProcedureData { + /// Record the latest manifest status and return a cloned summary for + /// immediate use by the caller. + fn record_manifest_summary( + &mut self, + status: ManifestStatus, + staged_region_count: u64, + error: Option, + ) -> GroupManifestSummary { + self.manifest_summary.status = status; + self.manifest_summary.staged_region_count = staged_region_count; + self.manifest_summary.stats = self.manifest_stats.clone(); + self.manifest_summary.error = error; + self.manifests_generated = + matches!(status, ManifestStatus::Staged) && staged_region_count > 0; + self.manifest_summary.clone() + } + + /// Drop manifest tracking state – typically called around rollback paths. + fn reset_manifest_state(&mut self) { + self.manifest_stats = None; + self.manifests_generated = false; + } + + /// Decode manifest stats from the RPC response and update bookkeeping. + fn note_remap_stats(&mut self, response: &RegionResponse) { + if let Some(payload) = response.extensions.get(REMAP_MANIFEST_STATS_EXTENSION) { + match serde_json::from_slice::(payload) { + Ok(value) => { + self.manifest_stats = Some(value.clone()); + let total_refs = value + .get("total_file_refs") + .and_then(Value::as_u64) + .unwrap_or_default(); + info!( + "repartition group {:?}: staged manifests for {} regions (total file refs {})", + self.entry.group_id, response.affected_rows, total_refs + ); + debug!( + "repartition group {:?}: manifest remap detail {:?}", + self.entry.group_id, value + ); + } + Err(err) => { + debug!( + error = ?err, + "repartition group {:?}: failed to decode manifest remap stats", + self.entry.group_id + ); + self.reset_manifest_state(); + } + } + } else { + debug!( + "repartition group {:?}: manifest remap response missing stats extension", + self.entry.group_id + ); + self.reset_manifest_state(); + } + } + + /// Update manifest bookkeeping after publishing or discarding staged files. + fn note_manifest_application( + &mut self, + publish: bool, + response: Option<&RegionResponse>, + ) -> GroupManifestSummary { + match response { + Some(resp) => { + self.note_remap_stats(resp); + let status = if publish { + ManifestStatus::Published + } else { + ManifestStatus::Discarded + }; + let summary = self.record_manifest_summary(status, resp.affected_rows as u64, None); + if !publish { + self.reset_manifest_state(); + } + summary + } + None => { + self.reset_manifest_state(); + self.record_manifest_summary(ManifestStatus::Skipped, 0, None) + } + } + } +} + +#[derive(Clone)] +pub(crate) struct GroupRollbackRecord { + pub(crate) group_id: PlanGroupId, + data: GroupProcedureData, +} + +impl GroupRollbackRecord { + fn new(data: &GroupProcedureData) -> Option { + if data.route_raw.is_none() { + return None; + } + + let mut snapshot = data.clone(); + snapshot.route_committed = true; + Some(Self { + group_id: data.entry.group_id, + data: snapshot, + }) + } + + fn into_inner(self) -> (PlanGroupId, GroupProcedureData) { + (self.group_id, self.data) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct GroupPrepareResult { + source_routes: Vec, + target_routes: Vec>, + central_region: RegionId, +} + +/// Stateful executor that drives a single plan group through the repartition +/// lifecycle. It is scheduled by the parent procedure and persisted/resumed by +/// the procedure framework. +pub struct RepartitionGroupProcedure { + context: RepartitionContext, + data: GroupProcedureData, +} + +/// Lazy payload describing the table route delta that the confirm stage writes. +#[allow(dead_code)] +pub struct RouteMetadataPayload<'a> { + pub table_id: TableId, + pub original: &'a DeserializedValueWithBytes, + pub new_routes: Vec, +} + +impl RepartitionGroupProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup"; + + pub fn new( + entry: PlanEntry, + table_id: TableId, + route_snapshot: PhysicalTableRouteValue, + catalog_name: String, + schema_name: String, + context: RepartitionContext, + ) -> Self { + let group_id = entry.group_id; + Self { + context, + data: GroupProcedureData { + table_id, + entry, + state: GroupState::Prepare, + route_snapshot, + route_raw: None, + region_info: None, + prepare_result: None, + freeze_completed: false, + metadata_updated: false, + staged_regions: Vec::new(), + region_rule_updated: false, + region_rule_version: None, + manifests_generated: false, + manifest_stats: None, + catalog_name, + schema_name, + manifest_summary: GroupManifestSummary { + group_id, + ..Default::default() + }, + route_committed: false, + rollback_registered: false, + }, + } + } + + #[allow(dead_code)] + pub fn route_metadata_payload(&self) -> Option> { + if !self.data.metadata_updated { + return None; + } + + let original = self.data.route_raw.as_ref()?; + Some(RouteMetadataPayload { + table_id: self.data.table_id, + original, + new_routes: self.data.route_snapshot.region_routes.clone(), + }) + } + + /// Update the shared context with the latest manifest summary. + fn update_manifest_summary( + &mut self, + status: ManifestStatus, + staged_region_count: u64, + error: Option, + ) { + let summary = self + .data + .record_manifest_summary(status, staged_region_count, error); + self.context.record_manifest_summary(summary); + } + + /// Standardised bookkeeping when a manifest-related RPC fails. + fn handle_manifest_failure(&mut self, status: ManifestStatus, error: String) { + self.data.reset_manifest_state(); + self.update_manifest_summary(status, 0, Some(error)); + } + + fn register_success_record(&mut self) { + if self.data.rollback_registered { + return; + } + + if let Some(record) = GroupRollbackRecord::new(&self.data) { + self.context.register_group_success(record); + self.data.rollback_registered = true; + } + } + + pub(crate) async fn execute_rollback( + context: RepartitionContext, + record: GroupRollbackRecord, + ) -> Result<()> { + let (_, data) = record.into_inner(); + let mut procedure = Self { context, data }; + procedure.rollback_group(GroupState::Prepare).await + } + + /// Roll the group back to an earlier state after a failure. The method + /// attempts to reverse any local state, resume regions, and reapply the + /// original table route if necessary. + async fn rollback_group(&mut self, reset_state: GroupState) -> Result<()> { + debug!( + "repartition group {:?}: rolling back to {:?}", + self.data.entry.group_id, reset_state + ); + + let needs_cleanup = self.data.prepare_result.is_some() + && (self.data.freeze_completed + || self.data.metadata_updated + || self.data.region_rule_updated + || self.data.manifests_generated + || !self.data.staged_regions.is_empty()); + + if needs_cleanup { + self.cleanup_resources(false).await?; + } + + if self.data.route_committed { + self.revert_table_route().await?; + } else { + self.refresh_route_snapshot().await?; + } + + self.reset_in_memory_state(); + self.data.state = reset_state; + Ok(()) + } + + /// Restore the original table route snapshot that was captured before this + /// group wrote a new route during the confirm phase. + async fn revert_table_route(&mut self) -> Result<()> { + let original = self + .data + .route_raw + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!( + "group {} missing original route snapshot during rollback", + self.data.entry.group_id + ), + })?; + + let route_manager = self.context.table_metadata_manager.table_route_manager(); + let current = route_manager + .table_route_storage() + .get_with_raw_bytes(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::UnexpectedSnafu { + violated: format!( + "group {} missing current route snapshot during rollback", + self.data.entry.group_id + ), + })?; + + let original_routes = original + .region_routes() + .context(error::TableMetadataManagerSnafu)? + .clone(); + let region_info = self + .data + .region_info + .clone() + .context(error::UnexpectedSnafu { + violated: format!( + "group {} missing region info for rollback", + self.data.entry.group_id + ), + })?; + + self.context + .table_metadata_manager + .update_table_route( + self.data.table_id, + region_info.clone(), + ¤t, + original_routes, + ®ion_info.region_options, + ®ion_info.region_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu)?; + + self.refresh_route_snapshot().await?; + self.data.route_committed = false; + Ok(()) + } + + /// Refresh the in-memory table route snapshot from storage. This should be + /// used after either commit or rollback paths run. + async fn refresh_route_snapshot(&mut self) -> Result<()> { + let table_route_manager = self.context.table_metadata_manager.table_route_manager(); + let (_, latest_route) = table_route_manager + .get_physical_table_route(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)?; + let raw_route = table_route_manager + .table_route_storage() + .get_with_raw_bytes(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::UnexpectedSnafu { + violated: format!( + "group {} failed to refresh route snapshot", + self.data.entry.group_id + ), + })?; + + self.data.route_snapshot = latest_route; + self.data.route_raw = Some(raw_route); + Ok(()) + } + + /// Reset volatile fields so the procedure can safely retry the workflow. + fn reset_in_memory_state(&mut self) { + self.data.prepare_result = None; + self.data.region_info = None; + self.data.freeze_completed = false; + self.data.metadata_updated = false; + self.data.region_rule_updated = false; + self.data.region_rule_version = None; + self.data.manifests_generated = false; + self.data.manifest_stats = None; + self.data.staged_regions.clear(); + self.data.route_committed = false; + } + + /// Drive the procedure state machine by executing the handler that + /// corresponds to the current state and returning the framework status. + pub async fn step(&mut self) -> Result { + match self.data.state { + GroupState::Prepare => { + self.on_prepare().await?; + self.data.state = GroupState::Freeze; + Ok(Status::executing(true)) + } + GroupState::Freeze => { + self.on_freeze().await?; + self.data.state = GroupState::UpdateMetadata; + Ok(Status::executing(true)) + } + GroupState::UpdateMetadata => { + self.on_update_metadata().await?; + self.data.state = GroupState::UpdateRegionRule; + Ok(Status::executing(true)) + } + GroupState::UpdateRegionRule => { + self.on_update_region_rule().await?; + self.data.state = GroupState::UpdateManifests; + Ok(Status::executing(true)) + } + GroupState::UpdateManifests => { + self.on_update_manifests().await?; + self.data.state = GroupState::Confirm; + Ok(Status::executing(true)) + } + GroupState::Confirm => { + self.on_confirm().await?; + self.data.state = GroupState::Cleanup; + Ok(Status::executing(true)) + } + GroupState::Cleanup => { + self.on_cleanup().await?; + self.register_success_record(); + self.data.state = GroupState::Finished; + Ok(Status::done()) + } + GroupState::Finished => Ok(Status::done()), + } + } + + /// Capture the latest metadata snapshot prior to any modifications. + async fn on_prepare(&mut self) -> Result<()> { + if self.data.prepare_result.is_some() { + return Ok(()); + } + + info!( + "repartition group {:?}: preparing metadata snapshot for table {}", + self.data.entry.group_id, self.data.table_id + ); + + let table_route_manager = self.context.table_metadata_manager.table_route_manager(); + let (_, latest_route) = table_route_manager + .get_physical_table_route(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)?; + + let raw_route = table_route_manager + .table_route_storage() + .get_with_raw_bytes(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::UnexpectedSnafu { + violated: format!( + "table {} route missing raw snapshot during repartition", + self.data.table_id + ), + })?; + + self.data.route_snapshot = latest_route.clone(); + self.data.route_raw = Some(raw_route.clone()); + self.ensure_target_routes_present()?; + + let mut source_routes = Vec::with_capacity(self.data.entry.sources.len()); + for descriptor in &self.data.entry.sources { + let Some(region_id) = descriptor.region_id else { + return error::RepartitionMissingSourceRegionIdSnafu { + group_id: self.data.entry.group_id, + } + .fail(); + }; + + let Some(route) = latest_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + else { + return error::RepartitionSourceRegionRouteMissingSnafu { + group_id: self.data.entry.group_id, + region_id, + } + .fail(); + }; + source_routes.push(route.clone()); + } + + if source_routes.is_empty() { + return error::RepartitionNoSourceRegionsSnafu { + group_id: self.data.entry.group_id, + } + .fail(); + } + + let mut target_routes = Vec::with_capacity(self.data.entry.targets.len()); + for descriptor in &self.data.entry.targets { + if let Some(region_id) = descriptor.region_id { + if let Some(route) = latest_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + { + target_routes.push(Some(route.clone())); + } else { + target_routes.push(None); + } + } else { + target_routes.push(None); + } + } + + let central_region = source_routes[0].region.id; + self.data.prepare_result = Some(GroupPrepareResult { + source_routes, + target_routes, + central_region, + }); + + debug!( + "repartition group {:?}: captured {} sources, {} targets", + self.data.entry.group_id, + self.data + .prepare_result + .as_ref() + .map(|r| r.source_routes.len()) + .unwrap_or(0), + self.data + .prepare_result + .as_ref() + .map(|r| r.target_routes.len()) + .unwrap_or(0) + ); + + Ok(()) + } + + /// Pause IO on the source/target regions in preparation for metadata work. + async fn on_freeze(&mut self) -> Result<()> { + if self.data.freeze_completed { + return Ok(()); + } + + info!( + "repartition group {:?}: entering freeze stage", + self.data.entry.group_id + ); + + let prepare_result = self.prepare_result()?; + + for route in &prepare_result.source_routes { + self.pause_region(route).await?; + } + + for route in prepare_result.target_routes.iter().flatten() { + self.pause_region(route).await?; + } + + self.data.freeze_completed = true; + debug!( + "repartition group {:?}: freeze stage completed", + self.data.entry.group_id + ); + Ok(()) + } + + /// Apply partition rule updates and mark the involved regions as staging in + /// the metadata layer. + async fn on_update_metadata(&mut self) -> Result<()> { + if self.data.metadata_updated { + return Ok(()); + } + + self.ensure_targets_allocated()?; + + info!( + "repartition group {:?}: applying metadata updates", + self.data.entry.group_id + ); + + self.apply_target_partition_rules()?; + + let prepare_result = self.prepare_result()?; + let region_ids = self.collect_existing_region_ids(prepare_result); + if region_ids.is_empty() { + self.data.metadata_updated = true; + return Ok(()); + } + + let mut staged_set: BTreeSet = self.data.staged_regions.iter().copied().collect(); + let route_manager = self.context.table_metadata_manager.table_route_manager(); + + for region_id in ®ion_ids { + if staged_set.insert(*region_id) { + route_manager + .set_region_staging_state(*region_id, true) + .await + .context(error::TableMetadataManagerSnafu)?; + } + } + + Self::mark_regions_staging(&mut self.data.route_snapshot.region_routes, ®ion_ids); + self.data.staged_regions = staged_set.into_iter().collect(); + + self.data.metadata_updated = true; + debug!( + "repartition group {:?}: staged regions {:?}", + self.data.entry.group_id, self.data.staged_regions + ); + Ok(()) + } + + /// Stage the next rule version on every region that participates in the + /// repartition group. + async fn on_update_region_rule(&mut self) -> Result<()> { + if self.data.region_rule_updated { + return Ok(()); + } + + ensure!( + self.data.metadata_updated, + error::UnexpectedSnafu { + violated: format!( + "group {} region rule update before metadata stage", + self.data.entry.group_id + ), + } + ); + + let prepare_result = self.prepare_result()?; + let region_ids = self.collect_existing_region_ids(prepare_result); + + info!( + "repartition group {:?}: scheduling region rule update for {:?}", + self.data.entry.group_id, region_ids + ); + + let rule_version = self + .data + .region_rule_version + .get_or_insert_with(|| format!("{}", self.data.entry.group_id)) + .clone(); + + let stage_targets: Vec<_> = { + let prepare_result = self.prepare_result()?; + prepare_result + .source_routes + .iter() + .chain(prepare_result.target_routes.iter().flatten()) + .map(|route| (route.leader_peer.clone(), route.region.id)) + .collect() + }; + + for (peer, region_id) in stage_targets { + if let Some(peer) = peer { + self.context + .stage_region_rule_on_datanode(&peer, region_id, &rule_version) + .await?; + } else { + debug!( + "repartition group {:?}: skip region rule staging, region {} has no leader", + self.data.entry.group_id, region_id + ); + } + } + + self.data.region_rule_updated = true; + Ok(()) + } + + /// Trigger manifest remapping on the datanode and record the resulting stats. + async fn on_update_manifests(&mut self) -> Result<()> { + if self.data.manifests_generated { + return Ok(()); + } + + ensure!( + self.data.region_rule_updated, + error::UnexpectedSnafu { + violated: format!( + "group {} manifest stage invoked before region rule update", + self.data.entry.group_id + ), + } + ); + + self.ensure_targets_allocated()?; + + let prepare_result = self.prepare_result()?; + let leader_peer = self.leader_peer_for_central(prepare_result)?; + + let mut target_index_lookup = HashMap::new(); + for (position, &global_idx) in self.data.entry.subtask.to_expr_indices.iter().enumerate() { + target_index_lookup.insert(global_idx, position); + } + + let mut sources = Vec::with_capacity(self.data.entry.sources.len()); + for (source_position, descriptor) in self.data.entry.sources.iter().enumerate() { + let Some(region_id) = descriptor.region_id else { + continue; + }; + + let transitions = self + .data + .entry + .subtask + .transition_map + .get(source_position) + .context(error::UnexpectedSnafu { + violated: format!( + "group {} transition map missing entry {}", + self.data.entry.group_id, source_position + ), + })?; + + let mut target_region_ids = Vec::with_capacity(transitions.len()); + for global_target_idx in transitions { + let target_position = + target_index_lookup + .get(global_target_idx) + .context(error::UnexpectedSnafu { + violated: format!( + "group {} transition references unknown target expr {}", + self.data.entry.group_id, global_target_idx + ), + })?; + + let target_descriptor = self.data.entry.targets.get(*target_position).context( + error::UnexpectedSnafu { + violated: format!( + "group {} missing target descriptor at {}", + self.data.entry.group_id, target_position + ), + }, + )?; + + let target_region_id = + target_descriptor + .region_id + .context(error::UnexpectedSnafu { + violated: format!( + "group {} target {} missing allocated region id", + self.data.entry.group_id, target_position + ), + })?; + + target_region_ids.push(target_region_id.as_u64()); + } + + sources.push(RemapManifestSource { + region_id: region_id.as_u64(), + target_region_ids, + }); + } + + let mut targets = Vec::with_capacity(self.data.entry.targets.len()); + for descriptor in &self.data.entry.targets { + let region_id = descriptor.region_id.context(error::UnexpectedSnafu { + violated: format!( + "group {} missing target region allocation", + self.data.entry.group_id + ), + })?; + + targets.push(RemapManifestTarget { + region_id: region_id.as_u64(), + partition_expr: descriptor + .partition_expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?, + }); + } + + let request = RemapManifestRequest { + table_id: self.data.table_id as u64, + group_id: self.data.entry.group_id.to_string(), + sources, + targets, + }; + + info!( + "repartition group {:?}: scheduling manifest remap via {:?}", + self.data.entry.group_id, leader_peer.id + ); + + let response = match self + .context + .remap_manifests_on_datanode(&leader_peer, request) + .await + { + Ok(response) => response, + Err(err) => { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + }; + + self.data.note_remap_stats(&response); + + if response.affected_rows > 0 { + self.update_manifest_summary( + ManifestStatus::Staged, + response.affected_rows as u64, + None, + ); + } else { + self.data.reset_manifest_state(); + self.update_manifest_summary(ManifestStatus::Skipped, 0, None); + } + + Ok(()) + } + + /// Commit the table route change and publish staged manifests along with the + /// staged region rule version. + async fn on_confirm(&mut self) -> Result<()> { + info!( + "repartition group {:?}: confirming metadata update", + self.data.entry.group_id + ); + + self.ensure_targets_allocated()?; + + let region_info = self.ensure_region_info().await?.clone(); + let region_options = region_info.region_options.clone(); + let region_wal_options = region_info.region_wal_options.clone(); + + let payload = self + .route_metadata_payload() + .context(error::UnexpectedSnafu { + violated: format!( + "group {} metadata not prepared for confirmation", + self.data.entry.group_id + ), + })?; + let RouteMetadataPayload { + table_id, + original, + new_routes, + } = payload; + + match self + .context + .table_metadata_manager + .update_table_route( + table_id, + region_info, + original, + new_routes, + ®ion_options, + ®ion_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu) + { + Ok(()) => {} + Err(err) => { + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + } + self.data.route_committed = true; + + info!( + "repartition group {:?}: table route updated", + self.data.entry.group_id + ); + + match self.apply_staged_manifests(true).await { + Ok(Some(response)) => { + let summary = self.data.note_manifest_application(true, Some(&response)); + self.context.record_manifest_summary(summary); + } + Ok(None) => { + debug!( + "repartition group {:?}: no staged manifests to publish", + self.data.entry.group_id + ); + let summary = self.data.note_manifest_application(true, None); + self.context.record_manifest_summary(summary); + } + Err(err) => { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + } + + self.refresh_route_snapshot().await?; + + if self.data.region_rule_updated { + if let Some(rule_version) = self.data.region_rule_version.clone() { + let publish_targets: Vec<_> = { + let prepare_result = self.prepare_result()?; + prepare_result + .source_routes + .iter() + .chain(prepare_result.target_routes.iter().flatten()) + .filter_map(|route| { + route + .leader_peer + .clone() + .map(|peer| (peer, route.region.id)) + }) + .collect() + }; + + for (peer, region_id) in publish_targets { + if let Err(err) = self + .context + .publish_region_rule_on_datanode(&peer, region_id, &rule_version) + .await + { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + } + } + + self.data.region_rule_updated = false; + } + self.data.route_committed = false; + Ok(()) + } + + /// Resume region IO, clear staging marks, and optionally publish region rules. + async fn on_cleanup(&mut self) -> Result<()> { + self.cleanup_resources(true).await + } + + /// Shared cleanup implementation used for the success path as well as the + /// rollback handler. + async fn cleanup_resources(&mut self, publish_rules: bool) -> Result<()> { + let leader_targets: Vec<_> = { + let prepare_result = self.prepare_result()?; + + for route in &prepare_result.source_routes { + self.resume_region(route).await?; + } + for route in prepare_result.target_routes.iter().flatten() { + self.resume_region(route).await?; + } + + prepare_result + .source_routes + .iter() + .chain(prepare_result.target_routes.iter().flatten()) + .filter_map(|route| { + route + .leader_peer + .clone() + .map(|peer| (peer, route.region.id)) + }) + .collect() + }; + + if self.data.manifests_generated { + match self.apply_staged_manifests(false).await { + Ok(Some(response)) => { + let summary = self.data.note_manifest_application(false, Some(&response)); + self.context.record_manifest_summary(summary); + } + Ok(None) => { + debug!( + "repartition group {:?}: no staged manifests to discard", + self.data.entry.group_id + ); + let summary = self.data.note_manifest_application(false, None); + self.context.record_manifest_summary(summary); + } + Err(err) => { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + return Err(err); + } + } + } + + if !self.data.staged_regions.is_empty() { + let route_manager = self.context.table_metadata_manager.table_route_manager(); + + for region_id in &self.data.staged_regions { + route_manager + .set_region_staging_state(*region_id, false) + .await + .context(error::TableMetadataManagerSnafu)?; + } + + Self::clear_regions_staging( + &mut self.data.route_snapshot.region_routes, + &self.data.staged_regions, + ); + self.data.staged_regions.clear(); + self.data.metadata_updated = false; + } + + if self.data.region_rule_updated { + if let Some(rule_version) = self.data.region_rule_version.clone() { + if publish_rules { + for (peer, region_id) in &leader_targets { + self.context + .publish_region_rule_on_datanode(peer, *region_id, &rule_version) + .await?; + } + } else { + for (peer, region_id) in &leader_targets { + self.context + .clear_region_rule_stage_on_datanode(peer, *region_id) + .await?; + } + } + } + self.data.region_rule_updated = false; + self.data.region_rule_version = None; + } + + self.data.freeze_completed = false; + self.data.route_committed = false; + + if publish_rules { + info!( + "repartition group {:?}: cleanup finished", + self.data.entry.group_id + ); + } else { + info!( + "repartition group {:?}: rollback cleanup finished", + self.data.entry.group_id + ); + } + Ok(()) + } + + /// Borrow the cached prepare result or surface a friendly error if it has + /// not been initialised yet. + fn prepare_result(&self) -> Result<&GroupPrepareResult> { + self.data.prepare_result.as_ref().ok_or_else(|| { + error::RepartitionMissingPrepareContextSnafu { + group_id: self.data.entry.group_id, + } + .build() + }) + } + + /// Resolve the leader peer for the central region – a prerequisite for the + /// datanode-side RPCs we send throughout the workflow. + fn leader_peer_for_central(&self, prepare_result: &GroupPrepareResult) -> Result { + let central_region = prepare_result.central_region; + let leader_peer = prepare_result + .source_routes + .iter() + .find(|route| route.region.id == central_region) + .and_then(|route| route.leader_peer.clone()) + .or_else(|| { + self.data + .route_snapshot + .region_routes + .iter() + .find(|route| route.region.id == central_region) + .and_then(|route| route.leader_peer.clone()) + }); + + let Some(peer) = leader_peer else { + return error::RetryLaterSnafu { + reason: format!( + "group {} missing leader for central region {}", + self.data.entry.group_id, central_region + ), + } + .fail(); + }; + + Ok(peer) + } + + /// Ask the datanode to either publish or discard staged manifests for all + /// regions tracked as staging. + async fn apply_staged_manifests(&self, publish: bool) -> Result> { + if self.data.staged_regions.is_empty() { + return Ok(None); + } + + let prepare_result = self.prepare_result()?; + let leader_peer = self.leader_peer_for_central(prepare_result)?; + + let request = ApplyStagedManifestRequest { + table_id: self.data.table_id as u64, + group_id: self.data.entry.group_id.to_string(), + region_ids: self + .data + .staged_regions + .iter() + .map(|region_id| region_id.as_u64()) + .collect(), + publish, + }; + + let response = self + .context + .apply_staged_manifests_on_datanode(&leader_peer, request) + .await?; + + Ok(Some(response)) + } + + /// Pause the target region if it currently has a leader peer. + async fn pause_region(&self, _route: &RegionRoute) -> Result<()> { + if let Some(peer) = &_route.leader_peer { + self.context + .pause_region_on_datanode(peer, _route.region.id) + .await?; + } else { + debug!( + "repartition group {:?}: skip pause, region {} has no leader peer", + self.data.entry.group_id, _route.region.id + ); + } + Ok(()) + } + + /// Resume the target region if it currently has a leader peer. + async fn resume_region(&self, _route: &RegionRoute) -> Result<()> { + if let Some(peer) = &_route.leader_peer { + self.context + .resume_region_on_datanode(peer, _route.region.id) + .await?; + } else { + debug!( + "repartition group {:?}: skip resume, region {} has no leader peer", + self.data.entry.group_id, _route.region.id + ); + } + Ok(()) + } + + fn mark_regions_staging(routes: &mut [RegionRoute], region_ids: &[RegionId]) { + for region_id in region_ids.iter().copied() { + if let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) { + route.set_leader_staging(); + } + } + } + + fn clear_regions_staging(routes: &mut [RegionRoute], region_ids: &[RegionId]) { + for region_id in region_ids.iter().copied() { + if let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) { + route.clear_leader_staging(); + } + } + } + + fn collect_existing_region_ids(&self, prepare_result: &GroupPrepareResult) -> Vec { + let mut set = BTreeSet::new(); + for route in &prepare_result.source_routes { + set.insert(route.region.id); + } + for route in prepare_result.target_routes.iter().flatten() { + set.insert(route.region.id); + } + set.into_iter().collect() + } + + fn apply_target_partition_rules(&mut self) -> Result<()> { + for target in &self.data.entry.targets { + if let Some(region_id) = target.region_id { + Self::apply_partition_rule( + &mut self.data.route_snapshot.region_routes, + region_id, + target, + )?; + } + } + Ok(()) + } + + async fn ensure_region_info(&mut self) -> Result<&RegionInfo> { + if self.data.region_info.is_none() { + let central_region = self.prepare_result()?.central_region; + let leader_peer_id = self + .data + .route_snapshot + .region_routes + .iter() + .find(|route| route.region.id == central_region) + .and_then(|route| route.leader_peer.as_ref()) + .map(|peer| peer.id) + .context(error::UnexpectedSnafu { + violated: format!( + "group {} has no leader peer for region {}", + self.data.entry.group_id, central_region + ), + })?; + + let datanode_table_value = self + .context + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey { + datanode_id: leader_peer_id, + table_id: self.data.table_id, + }) + .await + .context(error::TableMetadataManagerSnafu)?; + + let Some(datanode_table_value) = datanode_table_value else { + return error::RetryLaterSnafu { + reason: format!( + "group {} waiting for datanode {} metadata to propagate", + self.data.entry.group_id, leader_peer_id + ), + } + .fail(); + }; + + self.data.region_info = Some(datanode_table_value.region_info.clone()); + } + + Ok(self.data.region_info.as_ref().unwrap()) + } + + fn ensure_target_routes_present(&mut self) -> Result<()> { + let leader_candidate = self + .data + .entry + .sources + .iter() + .filter_map(|descriptor| descriptor.region_id) + .find_map(|region_id| { + self.data + .route_snapshot + .region_routes + .iter() + .find(|route| route.region.id == region_id) + .and_then(|route| route.leader_peer.clone()) + }); + + for descriptor in &self.data.entry.targets { + let Some(region_id) = descriptor.region_id else { + continue; + }; + + if self + .data + .route_snapshot + .region_routes + .iter() + .any(|route| route.region.id == region_id) + { + continue; + } + + let partition_expr = descriptor + .partition_expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?; + + let region = Region { + id: region_id, + partition_expr, + ..Default::default() + }; + + self.data.route_snapshot.region_routes.push(RegionRoute { + region, + leader_peer: leader_candidate.clone(), + ..Default::default() + }); + } + Ok(()) + } + + fn ensure_targets_allocated(&self) -> Result<()> { + if let Some((idx, _)) = self + .data + .entry + .targets + .iter() + .enumerate() + .find(|(_, descriptor)| descriptor.region_id.is_none()) + { + return error::RepartitionMissingTargetRegionIdSnafu { + group_id: self.data.entry.group_id, + target_index: idx, + } + .fail(); + } + + Ok(()) + } + + fn apply_partition_rule( + routes: &mut [RegionRoute], + region_id: RegionId, + descriptor: &RegionDescriptor, + ) -> Result<()> { + let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) else { + return error::RepartitionTargetRegionRouteMissingSnafu { region_id }.fail(); + }; + + route.region.partition_expr = descriptor + .partition_expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionGroupProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let status = self.step().await.map_err(super::map_repartition_error)?; + Ok(status) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(vec![ + CatalogLock::Read(self.data.catalog_name.as_str()).into(), + SchemaLock::read( + self.data.catalog_name.as_str(), + self.data.schema_name.as_str(), + ) + .into(), + TableLock::Write(self.data.table_id).into(), + ]) + } +} diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs new file mode 100644 index 000000000000..141ab6ea1cfe --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::key::table_route::PhysicalTableRouteValue; +use partition::expr::PartitionExpr; +use partition::subtask::RepartitionSubtask; +use serde::{Deserialize, Serialize}; +use store_api::storage::{RegionId, TableId}; +use uuid::Uuid; + +/// Identifier of a plan group. +pub type PlanGroupId = Uuid; + +/// Logical description of the repartition plan. +/// +/// The plan is persisted by the procedure framework so it must remain +/// serializable/deserializable across versions. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RepartitionPlan { + pub table_id: TableId, + pub entries: Vec, + pub resource_demand: ResourceDemand, + pub route_snapshot: PhysicalTableRouteValue, +} + +impl RepartitionPlan { + pub fn new( + table_id: TableId, + entries: Vec, + resource_demand: ResourceDemand, + route_snapshot: PhysicalTableRouteValue, + ) -> Self { + Self { + table_id, + entries, + resource_demand, + route_snapshot, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PlanEntry { + pub group_id: PlanGroupId, + pub subtask: RepartitionSubtask, + pub sources: Vec, + pub targets: Vec, +} + +impl PlanEntry { + /// Construct a plan entry consisting of the connected component returned by + /// the planner. + pub fn new( + group_id: PlanGroupId, + subtask: RepartitionSubtask, + sources: Vec, + targets: Vec, + ) -> Self { + Self { + group_id, + subtask, + sources, + targets, + } + } +} + +/// Metadata describing a region involved in the plan. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RegionDescriptor { + pub region_id: Option, + pub partition_expr: PartitionExpr, +} + +/// Auxiliary information about resources required to execute the plan. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ResourceDemand {} + +impl ResourceDemand { + pub fn from_plan_entries(_entries: &[PlanEntry]) -> Self { + // placeholder + Self {} + } +} diff --git a/src/partition/src/subtask.rs b/src/partition/src/subtask.rs index e74e9872f5f9..4d9e22821e59 100644 --- a/src/partition/src/subtask.rs +++ b/src/partition/src/subtask.rs @@ -14,12 +14,14 @@ use std::collections::VecDeque; +use serde::{Deserialize, Serialize}; + use crate::error::Result; use crate::expr::PartitionExpr; use crate::overlap::associate_from_to; /// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RepartitionSubtask { pub from_expr_indices: Vec, pub to_expr_indices: Vec, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 344d35eaa07d..4268e501c9fd 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -179,6 +179,30 @@ impl RegionRequest { reason: "ListMetadata request should be handled separately by RegionServer", } .fail(), + region_request::Body::Pause(_) => UnexpectedSnafu { + reason: "Pause request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::Resume(_) => UnexpectedSnafu { + reason: "Resume request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::StageRegionRule(_) => UnexpectedSnafu { + reason: "StageRegionRule request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::PublishRegionRule(_) => UnexpectedSnafu { + reason: "PublishRegionRule request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::RemapManifest(_) => UnexpectedSnafu { + reason: "RemapManifest request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::ApplyStagedManifest(_) => UnexpectedSnafu { + reason: "ApplyStagedManifest request should be handled separately by RegionServer", + } + .fail(), } }