Skip to content

Commit d087799

Browse files
committed
feat: scaffold repartition procedure with plan/resource stubs
Signed-off-by: Zhenchi <[email protected]>
1 parent aa84642 commit d087799

File tree

6 files changed

+527
-1
lines changed

6 files changed

+527
-1
lines changed

src/common/meta/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true }
8787
tonic.workspace = true
8888
tracing.workspace = true
8989
typetag.workspace = true
90+
uuid.workspace = true
9091

9192
[dev-dependencies]
9293
chrono.workspace = true

src/common/meta/src/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub mod drop_table;
4444
pub mod drop_view;
4545
pub mod flow_meta;
4646
pub mod table_meta;
47+
pub mod repartition;
4748
#[cfg(any(test, feature = "testing"))]
4849
pub mod test_util;
4950
#[cfg(test)]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod plan;
16+
pub mod procedure;
17+
18+
pub use plan::{
19+
PartitionChange, PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand,
20+
};
21+
pub use procedure::{RepartitionProcedure, RepartitionTask};
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use serde::{Deserialize, Serialize};
16+
use store_api::storage::{RegionId, TableId};
17+
use uuid::Uuid;
18+
19+
/// Identifier of a plan group.
20+
pub type PlanGroupId = Uuid;
21+
22+
/// Logical description of the repartition plan.
23+
///
24+
/// The plan is persisted by the procedure framework so it must remain
25+
/// serializable/deserializable across versions.
26+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27+
pub struct RepartitionPlan {
28+
/// Identifier of the physical table to repartition.
29+
pub table_id: TableId,
30+
/// Deterministic hash of the generated plan. Used for idempotence checks.
31+
pub plan_hash: String,
32+
/// Plan groups to execute. Each group is independent from others.
33+
pub groups: Vec<PlanGroup>,
34+
/// Aggregate resource expectation.
35+
pub resource_demand: ResourceDemand,
36+
}
37+
38+
impl RepartitionPlan {
39+
/// Creates an empty plan. Primarily used in tests and early skeleton code.
40+
pub fn empty(table_id: TableId) -> Self {
41+
Self {
42+
table_id,
43+
plan_hash: String::new(),
44+
groups: Vec::new(),
45+
resource_demand: ResourceDemand::default(),
46+
}
47+
}
48+
49+
/// Returns `true` if the plan does not contain any work.
50+
pub fn is_trivial(&self) -> bool {
51+
self.groups.is_empty()
52+
}
53+
}
54+
55+
/// A group of repartition operations that can be executed as a sub-procedure.
56+
///
57+
/// Groups are designed to be independent so that failure in one group does not
58+
/// propagate to others.
59+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60+
pub struct PlanGroup {
61+
/// Stable identifier of the group.
62+
pub group_id: PlanGroupId,
63+
/// Regions that provide the data source.
64+
pub source_regions: Vec<RegionId>,
65+
/// Target regions that should exist after this group finishes.
66+
pub target_regions: Vec<RegionId>,
67+
/// Ordered list of logical changes required by this group.
68+
pub changes: Vec<PartitionChange>,
69+
/// Estimated resource demand contributed by this group.
70+
pub resource_hint: ResourceDemand,
71+
}
72+
73+
impl PlanGroup {
74+
/// Convenience constructor for skeleton code and tests.
75+
pub fn new(group_id: PlanGroupId) -> Self {
76+
Self {
77+
group_id,
78+
source_regions: Vec::new(),
79+
target_regions: Vec::new(),
80+
changes: Vec::new(),
81+
resource_hint: ResourceDemand::default(),
82+
}
83+
}
84+
}
85+
86+
/// Diff between the old and the new partition rules.
87+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
88+
pub struct PartitionRuleDiff {
89+
/// Ordered list of changes to transform the old rule into the new rule.
90+
pub changes: Vec<PartitionChange>,
91+
}
92+
93+
impl PartitionRuleDiff {
94+
/// Returns `true` if there is no change between two rules.
95+
pub fn is_empty(&self) -> bool {
96+
self.changes.is_empty()
97+
}
98+
}
99+
100+
/// Primitive repartition changes recognised by the planner.
101+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
102+
pub enum PartitionChange {
103+
/// Split one region into multiple target regions.
104+
Split {
105+
from: RegionId,
106+
to: Vec<RegionId>,
107+
},
108+
/// Merge multiple regions into one target region.
109+
Merge {
110+
from: Vec<RegionId>,
111+
to: RegionId,
112+
},
113+
/// No-op placeholder for future operations (e.g. rule rewrite).
114+
Unsupported,
115+
}
116+
117+
impl PartitionChange {
118+
/// Returns the regions referenced by this change.
119+
pub fn referenced_regions(&self) -> Vec<RegionId> {
120+
match self {
121+
PartitionChange::Split { from, to } => {
122+
let mut regions = Vec::with_capacity(1 + to.len());
123+
regions.push(*from);
124+
regions.extend(to.iter().copied());
125+
regions
126+
}
127+
PartitionChange::Merge { from, to } => {
128+
let mut regions = Vec::with_capacity(from.len() + 1);
129+
regions.extend(from.iter().copied());
130+
regions.push(*to);
131+
regions
132+
}
133+
PartitionChange::Unsupported => Vec::new(),
134+
}
135+
}
136+
}
137+
138+
/// Resource estimation for executing a plan or a plan group.
139+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
140+
pub struct ResourceDemand {
141+
/// Number of brand-new regions that must be allocated before execution.
142+
pub new_regions: u32,
143+
/// Rough estimate of data volume to rewrite (in bytes).
144+
pub estimated_bytes: u64,
145+
}

0 commit comments

Comments
 (0)