11"""
22Shard analysis and rebalancing logic for CrateDB
33"""
4- import datetime
4+
5+ import enum
56import logging
67import math
78from collections import defaultdict
9+ from datetime import datetime
810from time import sleep
911from typing import Any , Dict , List , Optional , Set , Tuple , Union
10- from unittest import result
1112
1213from rich import box
1314from rich .console import Console
1415from rich .panel import Panel
1516from rich .table import Table
16- from rich .live import Live
1717
1818from cratedb_toolkit .admin .xmover .model import (
1919 DistributionStats ,
@@ -839,93 +839,112 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
839839 }
840840
841841
842- class ShardMonitor :
842+ class ShardHeatSortByChoice (enum .Enum ):
843+ heat = enum .auto ()
844+ table = enum .auto ()
845+ node = enum .auto ()
846+
847+
848+ class ShardHeatReporter :
843849 def __init__ (self , analyzer : ShardAnalyzer ):
844850 self .analyzer = analyzer
845- self .reference_shards : dict [str , ShardInfo ]
846- self .latest_shards : list [ShardInfo ]
847- self .seq_deltas : dict [str , int ]
848- self .size_deltas : dict [str , float ]
851+ self .reference_shards : dict [str , ShardInfo ] = {}
852+ self .latest_shards : list [ShardInfo ] = []
853+ self .seq_deltas : dict [str , int ] = {}
854+ self .size_deltas : dict [str , float ] = {}
849855
850- self .table_filter : str | None = None
851- self .sort_by : str = ' heat'
856+ self .table_filter : str | None = None
857+ self .sort_by : ShardHeatSortByChoice = ShardHeatSortByChoice . heat
852858
853- def monitor_shards (self , table_filter : str | None , interval_in_seconds : int = 5 , repeat : int = 10 , n_shards : int = 40 , sort_by : str = 'heat' ):
859+ def report (
860+ self ,
861+ table_filter : str | None ,
862+ interval_in_seconds : int ,
863+ watch : bool ,
864+ n_shards : int ,
865+ sort_by : ShardHeatSortByChoice ,
866+ ):
854867 self .table_filter = table_filter
855868 self .sort_by = sort_by
856869
857870 self .reference_shards = {self ._get_shard_compound_id (shard ): shard for shard in self .analyzer .shards }
871+ start_time = datetime .now ()
858872 self .refresh_data ()
859873
860- console .print (Panel .fit (f "[bold blue]The { n_shards } Hottest Shards [/bold blue]" ))
874+ console .print (Panel .fit ("[bold blue]Shard heat analyzer [/bold blue]" ))
861875
862- iterations = 0
863876 while True :
864877 sleep (interval_in_seconds )
865878 self .refresh_data ()
866- shards_table = self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas )
879+ shards_table = self .generate_shards_table (
880+ self ._get_top_shards (self .latest_shards , n_shards ),
881+ self .seq_deltas ,
882+ (datetime .now () - start_time ).total_seconds (),
883+ )
867884 console .print (shards_table )
868885 nodes_table = self .generate_nodes_table (self ._get_nodes_heat_info (self .reference_shards , self .seq_deltas ))
869886 console .print (nodes_table )
870887
871- iterations += 1
872- if 0 < repeat <= iterations :
888+ if not watch :
873889 break
874890
875- def generate_nodes_table (self , heat_nodes_info : dict [str , int ]):
891+ @staticmethod
892+ def generate_nodes_table (heat_nodes_info : dict [str , int ]):
893+ console .print ()
876894 table = Table (title = "Shard heat by node" , box = box .ROUNDED )
877895 table .add_column ("Node name" , style = "cyan" )
878896 table .add_column ("Heat" , style = "magenta" )
879897
880- sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
898+ sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
881899
882900 for k , v in sorted_items :
883901 table .add_row (k , str (v ))
884902
885903 return table
886904
887- def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
888- t = self .display_shards_table_header ()
889- self .display_shards_table_rows (t , sorted_shards , deltas )
905+ def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ], elapsed_time_s : float ):
906+ t = self ._display_shards_table_header ()
907+ self ._display_shards_table_rows (t , sorted_shards , deltas , elapsed_time_s )
890908 return t
891909
892- # Cluster summary table
893- def display_shards_table_header (self ):
894- shards_table = Table (title = "Hot shards" , box = box .ROUNDED )
910+ def _display_shards_table_header (self ):
911+ shards_table = Table (title = f"Shards sorted by { self .sort_by .name } " , box = box .ROUNDED )
895912 shards_table .add_column ("Schema" , style = "cyan" )
896913 shards_table .add_column ("Table" , style = "cyan" )
897- shards_table .add_column ("ID" , style = "cyan" )
914+ shards_table .add_column ("Partition" , style = "cyan" )
915+ shards_table .add_column ("Shard ID" , style = "cyan" )
898916 shards_table .add_column ("Node" , style = "cyan" )
899917 shards_table .add_column ("Primary" , style = "cyan" )
900918 shards_table .add_column ("Size" , style = "magenta" )
901919 shards_table .add_column ("Size Delta" , style = "magenta" )
902920 shards_table .add_column ("Seq Delta" , style = "magenta" )
921+ shards_table .add_column ("ops/second" , style = "magenta" )
903922 return shards_table
904923
905- def display_shards_table_rows ( self , shards_table : Table , sorted_shards : list [ ShardInfo ], deltas : dict [ str , int ]):
906- shards_table . rows . clear ()
907-
924+ def _display_shards_table_rows (
925+ self , shards_table : Table , sorted_shards : list [ ShardInfo ], deltas : dict [ str , int ], elapsed_time_s : float
926+ ):
908927 for shard in sorted_shards :
909928 shard_compound_id = self ._get_shard_compound_id (shard )
910929 seq_delta = deltas .get (shard_compound_id , 0 )
911- if seq_delta != 0 :
912- shards_table . add_row (
913- shard .schema_name ,
914- shard .table_name ,
915- str (shard .shard_id ),
916- shard .node_name ,
917- str (shard .is_primary ),
918- format_size (shard .size_gb ),
919- format_size (self . size_deltas [ shard_compound_id ] ),
920- str (seq_delta )
921- )
922- console . print ( shards_table )
930+ shards_table . add_row (
931+ shard . schema_name ,
932+ shard .table_name ,
933+ shard .partition_id ,
934+ str (shard .shard_id ),
935+ shard .node_name ,
936+ str (shard .is_primary ),
937+ format_size (shard .size_gb ),
938+ format_size (seq_delta ),
939+ str (seq_delta ),
940+ str ( seq_delta / elapsed_time_s ),
941+ )
923942
924943 def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
925- if self .sort_by == ' node' :
926- return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } "
944+ if self .sort_by == ShardHeatSortByChoice . node :
945+ return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } - { shard . partition_id } "
927946 else :
928- return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } "
947+ return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } - { shard . partition_id } "
929948
930949 def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
931950 seq_result : dict [str , int ] = {}
@@ -943,7 +962,7 @@ def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_
943962 reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
944963
945964 if refreshed_number < reference :
946- refreshed_number += 2 ** 63 - 1
965+ refreshed_number += 2 ** 63 - 1
947966
948967 seq_result [shard_compound_id ] = refreshed_number - reference
949968 size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
@@ -953,29 +972,33 @@ def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_
953972
954973 def refresh_data (self ):
955974 self .analyzer ._refresh_data ()
956- updated_shards : list [ShardInfo ] = [s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name ]
975+ updated_shards : list [ShardInfo ] = [
976+ s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name
977+ ]
957978 self .calculate_heat_deltas (self .reference_shards , updated_shards )
958- if self .sort_by == 'heat' :
959- self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )],
960- reverse = True )
979+ if self .sort_by == ShardHeatSortByChoice .heat :
980+ self .latest_shards = sorted (
981+ updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )], reverse = True
982+ )
961983 else :
962984 self .latest_shards = sorted (updated_shards , key = lambda s : self ._get_shard_compound_id (s ))
963985
964-
965986 def _get_top_shards (self , sorted_shards : list [ShardInfo ], n_shards : int ) -> list [ShardInfo ]:
966- if n_shards < 1 :
987+ if n_shards > 0 :
967988 return sorted_shards [:n_shards ]
968989 else :
969990 return sorted_shards
970991
971992 def _get_nodes_heat_info (self , shards : dict [str , ShardInfo ], seq_deltas : dict [str , int ]) -> dict [str , int ]:
972993 nodes : dict [str , int ] = {}
973994 for k , v in seq_deltas .items ():
974- node_name = shards .get (k ).node_name
975- if node_name not in nodes :
976- nodes [node_name ] = v
977- else :
978- nodes [node_name ] += v
995+ shard = shards .get (k )
996+ if shard :
997+ node_name = shard .node_name
998+ if node_name not in nodes :
999+ nodes [node_name ] = v
1000+ else :
1001+ nodes [node_name ] += v
9791002 return nodes
9801003
9811004
0 commit comments