@@ -18,10 +18,11 @@ use std::time::Duration;
1818use arrow_schema:: SchemaRef as ArrowSchemaRef ;
1919use common_catalog:: consts:: INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID ;
2020use common_error:: ext:: BoxedError ;
21- use common_meta:: cluster:: NodeInfo ;
21+ use common_meta:: cluster:: { DatanodeStatus , NodeInfo , NodeStatus } ;
2222use common_recordbatch:: adapter:: RecordBatchStreamAdapter ;
2323use common_recordbatch:: { RecordBatch , SendableRecordBatchStream } ;
2424use common_time:: timestamp:: Timestamp ;
25+ use common_workload:: DatanodeWorkloadType ;
2526use datafusion:: execution:: TaskContext ;
2627use datafusion:: physical_plan:: SendableRecordBatchStream as DfSendableRecordBatchStream ;
2728use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter as DfRecordBatchStreamAdapter ;
@@ -32,7 +33,9 @@ use datatypes::timestamp::TimestampMillisecond;
3233use datatypes:: value:: Value ;
3334use datatypes:: vectors:: {
3435 Int64VectorBuilder , StringVectorBuilder , TimestampMillisecondVectorBuilder ,
36+ UInt32VectorBuilder , UInt64VectorBuilder ,
3537} ;
38+ use serde:: Serialize ;
3639use snafu:: ResultExt ;
3740use store_api:: storage:: { ScanRequest , TableId } ;
3841
@@ -41,14 +44,20 @@ use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
4144use crate :: system_schema:: information_schema:: { CLUSTER_INFO , InformationTable , Predicates } ;
4245use crate :: system_schema:: utils;
4346
47+ const PEER_TYPE_FRONTEND : & str = "FRONTEND" ;
48+ const PEER_TYPE_METASRV : & str = "METASRV" ;
49+
4450const PEER_ID : & str = "peer_id" ;
4551const PEER_TYPE : & str = "peer_type" ;
4652const PEER_ADDR : & str = "peer_addr" ;
53+ const CPUS : & str = "cpus" ;
54+ const MEMORY_BYTES : & str = "memory_bytes" ;
4755const VERSION : & str = "version" ;
4856const GIT_COMMIT : & str = "git_commit" ;
4957const START_TIME : & str = "start_time" ;
5058const UPTIME : & str = "uptime" ;
5159const ACTIVE_TIME : & str = "active_time" ;
60+ const NODE_STATUS : & str = "node_status" ;
5261
5362const INIT_CAPACITY : usize = 42 ;
5463
@@ -57,11 +66,14 @@ const INIT_CAPACITY: usize = 42;
5766/// - `peer_id`: the peer server id.
5867/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
5968/// - `peer_addr`: the peer gRPC address.
69+ /// - `cpus`: the number of CPUs of the peer.
70+ /// - `memory_bytes`: the memory bytes of the peer.
6071/// - `version`: the build package version of the peer.
6172/// - `git_commit`: the build git commit hash of the peer.
6273/// - `start_time`: the starting time of the peer.
6374/// - `uptime`: the uptime of the peer.
6475/// - `active_time`: the time since the last activity of the peer.
76+ /// - `node_status`: the status info of the peer.
6577///
6678#[ derive( Debug ) ]
6779pub ( super ) struct InformationSchemaClusterInfo {
@@ -82,6 +94,8 @@ impl InformationSchemaClusterInfo {
8294 ColumnSchema :: new( PEER_ID , ConcreteDataType :: int64_datatype( ) , false ) ,
8395 ColumnSchema :: new( PEER_TYPE , ConcreteDataType :: string_datatype( ) , false ) ,
8496 ColumnSchema :: new( PEER_ADDR , ConcreteDataType :: string_datatype( ) , true ) ,
97+ ColumnSchema :: new( CPUS , ConcreteDataType :: uint32_datatype( ) , false ) ,
98+ ColumnSchema :: new( MEMORY_BYTES , ConcreteDataType :: uint64_datatype( ) , false ) ,
8599 ColumnSchema :: new( VERSION , ConcreteDataType :: string_datatype( ) , false ) ,
86100 ColumnSchema :: new( GIT_COMMIT , ConcreteDataType :: string_datatype( ) , false ) ,
87101 ColumnSchema :: new(
@@ -91,6 +105,7 @@ impl InformationSchemaClusterInfo {
91105 ) ,
92106 ColumnSchema :: new( UPTIME , ConcreteDataType :: string_datatype( ) , true ) ,
93107 ColumnSchema :: new( ACTIVE_TIME , ConcreteDataType :: string_datatype( ) , true ) ,
108+ ColumnSchema :: new( NODE_STATUS , ConcreteDataType :: string_datatype( ) , true ) ,
94109 ] ) )
95110 }
96111
@@ -140,11 +155,14 @@ struct InformationSchemaClusterInfoBuilder {
140155 peer_ids : Int64VectorBuilder ,
141156 peer_types : StringVectorBuilder ,
142157 peer_addrs : StringVectorBuilder ,
158+ cpus : UInt32VectorBuilder ,
159+ memory_bytes : UInt64VectorBuilder ,
143160 versions : StringVectorBuilder ,
144161 git_commits : StringVectorBuilder ,
145162 start_times : TimestampMillisecondVectorBuilder ,
146163 uptimes : StringVectorBuilder ,
147164 active_times : StringVectorBuilder ,
165+ node_status : StringVectorBuilder ,
148166}
149167
150168impl InformationSchemaClusterInfoBuilder {
@@ -155,11 +173,14 @@ impl InformationSchemaClusterInfoBuilder {
155173 peer_ids : Int64VectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
156174 peer_types : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
157175 peer_addrs : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
176+ cpus : UInt32VectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
177+ memory_bytes : UInt64VectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
158178 versions : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
159179 git_commits : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
160180 start_times : TimestampMillisecondVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
161181 uptimes : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
162182 active_times : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
183+ node_status : StringVectorBuilder :: with_capacity ( INIT_CAPACITY ) ,
163184 }
164185 }
165186
@@ -176,9 +197,10 @@ impl InformationSchemaClusterInfoBuilder {
176197
177198 fn add_node_info ( & mut self , predicates : & Predicates , node_info : NodeInfo ) {
178199 let peer_type = node_info. status . role_name ( ) ;
200+ let peer_id = peer_id ( peer_type, node_info. peer . id ) ;
179201
180202 let row = [
181- ( PEER_ID , & Value :: from ( node_info . peer . id ) ) ,
203+ ( PEER_ID , & Value :: from ( peer_id ) ) ,
182204 ( PEER_TYPE , & Value :: from ( peer_type) ) ,
183205 ( PEER_ADDR , & Value :: from ( node_info. peer . addr . as_str ( ) ) ) ,
184206 ( VERSION , & Value :: from ( node_info. version . as_str ( ) ) ) ,
@@ -189,13 +211,7 @@ impl InformationSchemaClusterInfoBuilder {
189211 return ;
190212 }
191213
192- if peer_type == "FRONTEND" || peer_type == "METASRV" {
193- // Always set peer_id to be -1 for frontends and metasrvs
194- self . peer_ids . push ( Some ( -1 ) ) ;
195- } else {
196- self . peer_ids . push ( Some ( node_info. peer . id as i64 ) ) ;
197- }
198-
214+ self . peer_ids . push ( Some ( peer_id) ) ;
199215 self . peer_types . push ( Some ( peer_type) ) ;
200216 self . peer_addrs . push ( Some ( & node_info. peer . addr ) ) ;
201217 self . versions . push ( Some ( & node_info. version ) ) ;
@@ -212,6 +228,8 @@ impl InformationSchemaClusterInfoBuilder {
212228 self . start_times . push ( None ) ;
213229 self . uptimes . push ( None ) ;
214230 }
231+ self . cpus . push ( Some ( node_info. cpus ) ) ;
232+ self . memory_bytes . push ( Some ( node_info. memory_bytes ) ) ;
215233
216234 if node_info. last_activity_ts > 0 {
217235 self . active_times . push ( Some (
@@ -220,6 +238,8 @@ impl InformationSchemaClusterInfoBuilder {
220238 } else {
221239 self . active_times . push ( None ) ;
222240 }
241+ self . node_status
242+ . push ( format_node_status ( & node_info) . as_deref ( ) ) ;
223243 }
224244
225245 fn format_duration_since ( ts : u64 ) -> String {
@@ -233,11 +253,14 @@ impl InformationSchemaClusterInfoBuilder {
233253 Arc :: new( self . peer_ids. finish( ) ) ,
234254 Arc :: new( self . peer_types. finish( ) ) ,
235255 Arc :: new( self . peer_addrs. finish( ) ) ,
256+ Arc :: new( self . cpus. finish( ) ) ,
257+ Arc :: new( self . memory_bytes. finish( ) ) ,
236258 Arc :: new( self . versions. finish( ) ) ,
237259 Arc :: new( self . git_commits. finish( ) ) ,
238260 Arc :: new( self . start_times. finish( ) ) ,
239261 Arc :: new( self . uptimes. finish( ) ) ,
240262 Arc :: new( self . active_times. finish( ) ) ,
263+ Arc :: new( self . node_status. finish( ) ) ,
241264 ] ;
242265 RecordBatch :: new ( self . schema . clone ( ) , columns) . context ( CreateRecordBatchSnafu )
243266 }
@@ -263,3 +286,56 @@ impl DfPartitionStream for InformationSchemaClusterInfo {
263286 ) )
264287 }
265288}
289+
290+ fn peer_id ( peer_type : & str , peer_id : u64 ) -> i64 {
291+ if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
292+ -1
293+ } else {
294+ peer_id as i64
295+ }
296+ }
297+
298+ #[ derive( Serialize ) ]
299+ struct DisplayMetasrvStatus {
300+ is_leader : bool ,
301+ }
302+
303+ #[ derive( Serialize ) ]
304+ struct DisplayDatanodeStatus {
305+ workloads : Vec < DatanodeWorkloadType > ,
306+ leader_regions : usize ,
307+ follower_regions : usize ,
308+ }
309+
310+ impl From < & DatanodeStatus > for DisplayDatanodeStatus {
311+ fn from ( status : & DatanodeStatus ) -> Self {
312+ Self {
313+ workloads : status
314+ . workloads
315+ . types
316+ . iter ( )
317+ . flat_map ( |w| DatanodeWorkloadType :: from_i32 ( * w) )
318+ . collect ( ) ,
319+ leader_regions : status. leader_regions ,
320+ follower_regions : status. follower_regions ,
321+ }
322+ }
323+ }
324+
325+ fn format_node_status ( node_info : & NodeInfo ) -> Option < String > {
326+ match & node_info. status {
327+ NodeStatus :: Datanode ( datanode_status) => {
328+ serde_json:: to_string ( & DisplayDatanodeStatus :: from ( datanode_status) ) . ok ( )
329+ }
330+ NodeStatus :: Frontend ( _) => None ,
331+ NodeStatus :: Flownode ( _) => None ,
332+ NodeStatus :: Metasrv ( metasrv_status) => {
333+ if metasrv_status. is_leader {
334+ serde_json:: to_string ( & DisplayMetasrvStatus { is_leader : true } ) . ok ( )
335+ } else {
336+ None
337+ }
338+ }
339+ NodeStatus :: Standalone => None ,
340+ }
341+ }
0 commit comments