@@ -24,7 +24,9 @@ use api::region::RegionResponse;
2424use api:: v1:: meta:: TopicStat ;
2525use api:: v1:: region:: sync_request:: ManifestInfo ;
2626use api:: v1:: region:: {
27- ListMetadataRequest , RegionResponse as RegionResponseV1 , SyncRequest , region_request,
27+ ApplyStagedManifestRequest , ListMetadataRequest , PauseRequest , PublishRegionRuleRequest ,
28+ RegionResponse as RegionResponseV1 , RemapManifestRequest , ResumeRequest ,
29+ StageRegionRuleRequest , SyncRequest , region_request,
2830} ;
2931use api:: v1:: { ResponseHeader , Status } ;
3032use arrow_flight:: { FlightData , Ticket } ;
@@ -84,6 +86,8 @@ use crate::error::{
8486use crate :: event_listener:: RegionServerEventListenerRef ;
8587use crate :: region_server:: catalog:: { NameAwareCatalogList , NameAwareDataSourceInjectorBuilder } ;
8688
89+ const REMAP_STATS_EXTENSION_KEY : & str = "repartition.manifest.stats" ;
90+
8791#[ derive( Clone ) ]
8892pub struct RegionServer {
8993 inner : Arc < RegionServerInner > ,
@@ -370,6 +374,24 @@ impl RegionServer {
370374 }
371375 }
372376
377+ /// Temporarily pauses compaction and snapshot related activities for the region.
378+ ///
379+ /// Currently a stub; real implementation will coordinate with region worker.
380+ pub async fn pause_compaction_and_snapshot ( & self , region_id : RegionId ) -> Result < ( ) > {
381+ info ! ( "pause_compaction_and_snapshot stub invoked for region {region_id}" ) ;
382+ let _ = region_id;
383+ Ok ( ( ) )
384+ }
385+
386+ /// Resumes compaction and snapshot related activities for the region.
387+ ///
388+ /// Currently a stub; real implementation will coordinate with region worker.
389+ pub async fn resume_compaction_and_snapshot ( & self , region_id : RegionId ) -> Result < ( ) > {
390+ info ! ( "resume_compaction_and_snapshot stub invoked for region {region_id}" ) ;
391+ let _ = region_id;
392+ Ok ( ( ) )
393+ }
394+
373395 /// Stop the region server.
374396 pub async fn stop ( & self ) -> Result < ( ) > {
375397 self . inner . stop ( ) . await
@@ -538,6 +560,124 @@ impl RegionServer {
538560 Ok ( response)
539561 }
540562
563+ async fn handle_pause_region_request ( & self , request : & PauseRequest ) -> Result < RegionResponse > {
564+ let region_id = RegionId :: from_u64 ( request. region_id ) ;
565+ let tracing_context = TracingContext :: from_current_span ( ) ;
566+ let span = tracing_context. attach ( info_span ! (
567+ "RegionServer::handle_pause_region_request" ,
568+ region_id = region_id. to_string( )
569+ ) ) ;
570+
571+ self . pause_compaction_and_snapshot ( region_id)
572+ . trace ( span)
573+ . await
574+ . map ( |_| RegionResponse :: new ( AffectedRows :: default ( ) ) )
575+ }
576+
577+ async fn handle_resume_region_request (
578+ & self ,
579+ request : & ResumeRequest ,
580+ ) -> Result < RegionResponse > {
581+ let region_id = RegionId :: from_u64 ( request. region_id ) ;
582+ let tracing_context = TracingContext :: from_current_span ( ) ;
583+ let span = tracing_context. attach ( info_span ! (
584+ "RegionServer::handle_resume_region_request" ,
585+ region_id = region_id. to_string( )
586+ ) ) ;
587+
588+ self . resume_compaction_and_snapshot ( region_id)
589+ . trace ( span)
590+ . await
591+ . map ( |_| RegionResponse :: new ( AffectedRows :: default ( ) ) )
592+ }
593+
594+ async fn handle_stage_region_rule_request (
595+ & self ,
596+ request : & StageRegionRuleRequest ,
597+ ) -> Result < RegionResponse > {
598+ let region_id = RegionId :: from_u64 ( request. region_id ) ;
599+ info ! (
600+ "Stage region rule for region {region_id} with version {}" ,
601+ request. rule_version
602+ ) ;
603+ match self
604+ . set_region_role_state_gracefully ( region_id, SettableRegionRoleState :: StagingLeader )
605+ . await ?
606+ {
607+ SetRegionRoleStateResponse :: Success ( _) | SetRegionRoleStateResponse :: NotFound => {
608+ Ok ( RegionResponse :: new ( AffectedRows :: default ( ) ) )
609+ }
610+ SetRegionRoleStateResponse :: InvalidTransition ( err) => {
611+ Err ( err) . with_context ( |_| HandleRegionRequestSnafu { region_id } )
612+ }
613+ }
614+ }
615+
616+ async fn handle_publish_region_rule_request (
617+ & self ,
618+ request : & PublishRegionRuleRequest ,
619+ ) -> Result < RegionResponse > {
620+ let region_id = RegionId :: from_u64 ( request. region_id ) ;
621+ info ! (
622+ "Publish region rule for region {region_id} with version {}" ,
623+ request. rule_version
624+ ) ;
625+ match self
626+ . set_region_role_state_gracefully ( region_id, SettableRegionRoleState :: Leader )
627+ . await ?
628+ {
629+ SetRegionRoleStateResponse :: Success ( _) | SetRegionRoleStateResponse :: NotFound => {
630+ Ok ( RegionResponse :: new ( AffectedRows :: default ( ) ) )
631+ }
632+ SetRegionRoleStateResponse :: InvalidTransition ( err) => {
633+ Err ( err) . with_context ( |_| HandleRegionRequestSnafu { region_id } )
634+ }
635+ }
636+ }
637+
638+ async fn handle_remap_manifest_request (
639+ & self ,
640+ request : & RemapManifestRequest ,
641+ ) -> Result < RegionResponse > {
642+ info ! (
643+ "received remap manifest request for table {} group {}" ,
644+ request. table_id, request. group_id
645+ ) ;
646+
647+ let stats_json = serde_json:: to_vec ( & serde_json:: json!( {
648+ "files_per_region" : HashMap :: <u64 , usize >:: new( ) ,
649+ "total_file_refs" : 0u64 ,
650+ "empty_regions" : Vec :: <u64 >:: new( ) ,
651+ "group_id" : & request. group_id,
652+ } ) )
653+ . context ( SerializeJsonSnafu ) ?;
654+
655+ let mut extensions = HashMap :: new ( ) ;
656+ extensions. insert ( REMAP_STATS_EXTENSION_KEY . to_string ( ) , stats_json) ;
657+
658+ Ok ( RegionResponse {
659+ affected_rows : 0 ,
660+ extensions,
661+ metadata : Vec :: new ( ) ,
662+ } )
663+ }
664+
665+ async fn handle_apply_staged_manifest_request (
666+ & self ,
667+ request : & ApplyStagedManifestRequest ,
668+ ) -> Result < RegionResponse > {
669+ info ! (
670+ "received manifest apply request for table {} group {} publish={} regions {:?}" ,
671+ request. table_id, request. group_id, request. publish, request. region_ids
672+ ) ;
673+
674+ Ok ( RegionResponse {
675+ affected_rows : 0 ,
676+ extensions : HashMap :: new ( ) ,
677+ metadata : Vec :: new ( ) ,
678+ } )
679+ }
680+
541681 /// Sync region manifest and registers new opened logical regions.
542682 pub async fn sync_region (
543683 & self ,
@@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer {
569709 region_request:: Body :: Sync ( sync_request) => {
570710 self . handle_sync_region_request ( sync_request) . await
571711 }
712+ region_request:: Body :: Pause ( pause_request) => {
713+ self . handle_pause_region_request ( pause_request) . await
714+ }
715+ region_request:: Body :: Resume ( resume_request) => {
716+ self . handle_resume_region_request ( resume_request) . await
717+ }
718+ region_request:: Body :: StageRegionRule ( stage_request) => {
719+ self . handle_stage_region_rule_request ( stage_request) . await
720+ }
721+ region_request:: Body :: PublishRegionRule ( publish_request) => {
722+ self . handle_publish_region_rule_request ( publish_request)
723+ . await
724+ }
725+ region_request:: Body :: RemapManifest ( remap_request) => {
726+ self . handle_remap_manifest_request ( remap_request) . await
727+ }
728+ region_request:: Body :: ApplyStagedManifest ( apply_request) => {
729+ self . handle_apply_staged_manifest_request ( apply_request)
730+ . await
731+ }
572732 region_request:: Body :: ListMetadata ( list_metadata_request) => {
573733 self . handle_list_metadata_request ( list_metadata_request)
574734 . await
0 commit comments