11"""
22Shard analysis and rebalancing logic for CrateDB
33"""
4- import datetime
4+
5+ import enum
56import logging
67import math
78from collections import defaultdict
9+ from datetime import datetime
810from time import sleep
911from typing import Any , Dict , List , Optional , Set , Tuple , Union
10- from unittest import result
1112
1213from rich import box
1314from rich .console import Console
1415from rich .panel import Panel
1516from rich .table import Table
16- from rich .live import Live
1717
1818from cratedb_toolkit .admin .xmover .model import (
1919 ActiveShardActivity ,
@@ -841,93 +841,112 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
841841 }
842842
843843
844- class ShardMonitor :
844+ class ShardHeatSortByChoice (enum .Enum ):
845+ heat = enum .auto ()
846+ table = enum .auto ()
847+ node = enum .auto ()
848+
849+
850+ class ShardHeatReporter :
845851 def __init__ (self , analyzer : ShardAnalyzer ):
846852 self .analyzer = analyzer
847- self .reference_shards : dict [str , ShardInfo ]
848- self .latest_shards : list [ShardInfo ]
849- self .seq_deltas : dict [str , int ]
850- self .size_deltas : dict [str , float ]
853+ self .reference_shards : dict [str , ShardInfo ] = {}
854+ self .latest_shards : list [ShardInfo ] = []
855+ self .seq_deltas : dict [str , int ] = {}
856+ self .size_deltas : dict [str , float ] = {}
851857
852- self .table_filter : str | None = None
853- self .sort_by : str = ' heat'
858+ self .table_filter : str | None = None
859+ self .sort_by : ShardHeatSortByChoice = ShardHeatSortByChoice . heat
854860
855- def monitor_shards (self , table_filter : str | None , interval_in_seconds : int = 5 , repeat : int = 10 , n_shards : int = 40 , sort_by : str = 'heat' ):
861+ def report (
862+ self ,
863+ table_filter : str | None ,
864+ interval_in_seconds : int ,
865+ watch : bool ,
866+ n_shards : int ,
867+ sort_by : ShardHeatSortByChoice ,
868+ ):
856869 self .table_filter = table_filter
857870 self .sort_by = sort_by
858871
859872 self .reference_shards = {self ._get_shard_compound_id (shard ): shard for shard in self .analyzer .shards }
873+ start_time = datetime .now ()
860874 self .refresh_data ()
861875
862- console .print (Panel .fit (f "[bold blue]The { n_shards } Hottest Shards [/bold blue]" ))
876+ console .print (Panel .fit ("[bold blue]Shard heat analyzer [/bold blue]" ))
863877
864- iterations = 0
865878 while True :
866879 sleep (interval_in_seconds )
867880 self .refresh_data ()
868- shards_table = self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas )
881+ shards_table = self .generate_shards_table (
882+ self ._get_top_shards (self .latest_shards , n_shards ),
883+ self .seq_deltas ,
884+ (datetime .now () - start_time ).total_seconds (),
885+ )
869886 console .print (shards_table )
870887 nodes_table = self .generate_nodes_table (self ._get_nodes_heat_info (self .reference_shards , self .seq_deltas ))
871888 console .print (nodes_table )
872889
873- iterations += 1
874- if 0 < repeat <= iterations :
890+ if not watch :
875891 break
876892
877- def generate_nodes_table (self , heat_nodes_info : dict [str , int ]):
893+ @staticmethod
894+ def generate_nodes_table (heat_nodes_info : dict [str , int ]):
895+ console .print ()
878896 table = Table (title = "Shard heat by node" , box = box .ROUNDED )
879897 table .add_column ("Node name" , style = "cyan" )
880898 table .add_column ("Heat" , style = "magenta" )
881899
882- sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
900+ sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
883901
884902 for k , v in sorted_items :
885903 table .add_row (k , str (v ))
886904
887905 return table
888906
889- def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
890- t = self .display_shards_table_header ()
891- self .display_shards_table_rows (t , sorted_shards , deltas )
907+ def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ], elapsed_time_s : float ):
908+ t = self ._display_shards_table_header ()
909+ self ._display_shards_table_rows (t , sorted_shards , deltas , elapsed_time_s )
892910 return t
893911
894- # Cluster summary table
895- def display_shards_table_header (self ):
896- shards_table = Table (title = "Hot shards" , box = box .ROUNDED )
912+ def _display_shards_table_header (self ):
913+ shards_table = Table (title = f"Shards sorted by { self .sort_by .name } " , box = box .ROUNDED )
897914 shards_table .add_column ("Schema" , style = "cyan" )
898915 shards_table .add_column ("Table" , style = "cyan" )
899- shards_table .add_column ("ID" , style = "cyan" )
916+ shards_table .add_column ("Partition" , style = "cyan" )
917+ shards_table .add_column ("Shard ID" , style = "cyan" )
900918 shards_table .add_column ("Node" , style = "cyan" )
901919 shards_table .add_column ("Primary" , style = "cyan" )
902920 shards_table .add_column ("Size" , style = "magenta" )
903921 shards_table .add_column ("Size Delta" , style = "magenta" )
904922 shards_table .add_column ("Seq Delta" , style = "magenta" )
923+ shards_table .add_column ("ops/second" , style = "magenta" )
905924 return shards_table
906925
907- def display_shards_table_rows ( self , shards_table : Table , sorted_shards : list [ ShardInfo ], deltas : dict [ str , int ]):
908- shards_table . rows . clear ()
909-
926+ def _display_shards_table_rows (
927+ self , shards_table : Table , sorted_shards : list [ ShardInfo ], deltas : dict [ str , int ], elapsed_time_s : float
928+ ):
910929 for shard in sorted_shards :
911930 shard_compound_id = self ._get_shard_compound_id (shard )
912931 seq_delta = deltas .get (shard_compound_id , 0 )
913- if seq_delta != 0 :
914- shards_table . add_row (
915- shard .schema_name ,
916- shard .table_name ,
917- str (shard .shard_id ),
918- shard .node_name ,
919- str (shard .is_primary ),
920- format_size (shard .size_gb ),
921- format_size (self . size_deltas [ shard_compound_id ] ),
922- str (seq_delta )
923- )
924- console . print ( shards_table )
932+ shards_table . add_row (
933+ shard . schema_name ,
934+ shard .table_name ,
935+ shard .partition_id ,
936+ str (shard .shard_id ),
937+ shard .node_name ,
938+ str (shard .is_primary ),
939+ format_size (shard .size_gb ),
940+ format_size (seq_delta ),
941+ str (seq_delta ),
942+ str ( seq_delta / elapsed_time_s ),
943+ )
925944
926945 def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
927- if self .sort_by == ' node' :
928- return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } "
946+ if self .sort_by == ShardHeatSortByChoice . node :
947+ return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } - { shard . partition_id } "
929948 else :
930- return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } "
949+ return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } - { shard . partition_id } "
931950
932951 def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
933952 seq_result : dict [str , int ] = {}
@@ -945,7 +964,7 @@ def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_
945964 reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
946965
947966 if refreshed_number < reference :
948- refreshed_number += 2 ** 63 - 1
967+ refreshed_number += 2 ** 63 - 1
949968
950969 seq_result [shard_compound_id ] = refreshed_number - reference
951970 size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
@@ -955,29 +974,33 @@ def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_
955974
956975 def refresh_data (self ):
957976 self .analyzer ._refresh_data ()
958- updated_shards : list [ShardInfo ] = [s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name ]
977+ updated_shards : list [ShardInfo ] = [
978+ s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name
979+ ]
959980 self .calculate_heat_deltas (self .reference_shards , updated_shards )
960- if self .sort_by == 'heat' :
961- self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )],
962- reverse = True )
981+ if self .sort_by == ShardHeatSortByChoice .heat :
982+ self .latest_shards = sorted (
983+ updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )], reverse = True
984+ )
963985 else :
964986 self .latest_shards = sorted (updated_shards , key = lambda s : self ._get_shard_compound_id (s ))
965987
966-
967988 def _get_top_shards (self , sorted_shards : list [ShardInfo ], n_shards : int ) -> list [ShardInfo ]:
968- if n_shards < 1 :
989+ if n_shards > 0 :
969990 return sorted_shards [:n_shards ]
970991 else :
971992 return sorted_shards
972993
973994 def _get_nodes_heat_info (self , shards : dict [str , ShardInfo ], seq_deltas : dict [str , int ]) -> dict [str , int ]:
974995 nodes : dict [str , int ] = {}
975996 for k , v in seq_deltas .items ():
976- node_name = shards .get (k ).node_name
977- if node_name not in nodes :
978- nodes [node_name ] = v
979- else :
980- nodes [node_name ] += v
997+ shard = shards .get (k )
998+ if shard :
999+ node_name = shard .node_name
1000+ if node_name not in nodes :
1001+ nodes [node_name ] = v
1002+ else :
1003+ nodes [node_name ] += v
9811004 return nodes
9821005
9831006
0 commit comments