55and provide recommendations for optimization.
66"""
77
8+ import logging
89import statistics
910from dataclasses import dataclass
1011from typing import Any , Dict , List , Optional , Tuple
1516
1617from cratedb_toolkit .admin .xmover .util .database import CrateDBClient
1718
19+ logger = logging .getLogger (__name__ )
20+
1821
1922def format_storage_size (size_gb : float ) -> str :
2023 """Format storage size with appropriate units and spacing"""
@@ -134,7 +137,7 @@ def get_table_distribution_detailed(self, table_identifier: str) -> Optional[Tab
134137 AND s.routing_state = 'STARTED'
135138 GROUP BY s.schema_name, s.table_name, s.node['name']
136139 ORDER BY s.node['name'] \
137- """
140+ """ # noqa: E501
138141
139142 result = self .client .execute_query (query , [schema_name , table_name ])
140143 rows = result .get ("rows" , [])
@@ -190,7 +193,8 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
190193 rprint (f"• Total Shards: { total_shards } ({ total_primary_shards } primary + { total_replica_shards } replica)" )
191194 rprint (f"• Total Documents: { total_documents :,} " )
192195 rprint (
193- f"• Node Coverage: { len (table_nodes )} /{ len (cluster_nodes )} nodes ({ len (table_nodes ) / len (cluster_nodes ) * 100 :.0f} %)"
196+ f"• Node Coverage: { len (table_nodes )} /{ len (cluster_nodes )} nodes "
197+ f"({ len (table_nodes ) / len (cluster_nodes ) * 100 :.0f} %)"
194198 )
195199
196200 if missing_nodes :
@@ -261,7 +265,8 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
261265 # Storage distribution analysis
262266 if storage_cv > 0.4 :
263267 rprint (
264- f"• [red]⚠ Storage Imbalance:[/red] Range { format_storage_size (min_storage )} -{ format_storage_size (max_storage )} per node (CV: { storage_cv :.2f} )"
268+ f"• [red]⚠ Storage Imbalance:[/red] Range "
269+ f"{ format_storage_size (min_storage )} -{ format_storage_size (max_storage )} per node (CV: { storage_cv :.2f} )"
265270 )
266271 else :
267272 rprint (f"• [green]✓ Storage Balance:[/green] Well distributed (CV: { storage_cv :.2f} )" )
@@ -306,11 +311,13 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
306311 for zone in sorted (zone_distribution .keys ()):
307312 zone_data = zone_distribution [zone ]
308313 rprint (
309- f"• { zone } : { zone_data ['nodes' ]} nodes, { zone_data ['shards' ]} shards, { format_storage_size (zone_data ['size' ])} "
314+ f"• { zone } : { zone_data ['nodes' ]} nodes, "
315+ f"{ zone_data ['shards' ]} shards, { format_storage_size (zone_data ['size' ])} "
310316 )
311317
312318 except Exception :
313- pass # Zone info not available
319+ # Zone info not available
320+ logger .exception ("Zone info not available" )
314321
315322 # Health Summary
316323 rprint ("\n [bold]💊 Health Summary[/bold]" )
@@ -375,7 +382,7 @@ def get_largest_tables_distribution(self, top_n: int = 10) -> List[TableDistribu
375382 WHERE s.routing_state = 'STARTED'
376383 GROUP BY s.schema_name, s.table_name, s.node['name']
377384 ORDER BY s.schema_name, s.table_name, s.node['name'] \
378- """
385+ """ # noqa: E501
379386
380387 result = self .client .execute_query (query , [top_n ])
381388
@@ -534,7 +541,8 @@ def detect_storage_imbalance(self, table: TableDistribution) -> Optional[Distrib
534541
535542 if overloaded_node and underloaded_node :
536543 recommendations .append (
537- f"Rebalance storage from { overloaded_node } ({ format_storage_size (max_size )} ) to { underloaded_node } ({ format_storage_size (min_size )} )"
544+ f"Rebalance storage from { overloaded_node } ({ format_storage_size (max_size )} ) "
545+ f"to { underloaded_node } ({ format_storage_size (min_size )} )"
538546 )
539547
540548 return DistributionAnomaly (
@@ -643,7 +651,7 @@ def detect_document_imbalance(self, table: TableDistribution) -> Optional[Distri
643651 recommendations = recommendations ,
644652 )
645653
646- def analyze_distribution (self , top_tables : int = 10 ) -> List [DistributionAnomaly ]:
654+ def analyze_distribution (self , top_tables : int = 10 ) -> Tuple [ List [DistributionAnomaly ], int ]:
647655 """Analyze shard distribution and return ranked anomalies"""
648656
649657 # Get table distributions
@@ -672,12 +680,13 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
672680
673681 if not anomalies :
674682 rprint (
675- f"[green]✓ No significant shard distribution anomalies detected in top { tables_analyzed } tables![/green]"
683+ f"[green]✓ No significant shard distribution anomalies "
684+ f"detected in top { tables_analyzed } tables![/green]"
676685 )
677686 return
678687
679688 # Show analysis scope
680- unique_tables = set ( anomaly .table .full_table_name for anomaly in anomalies )
689+ unique_tables = { anomaly .table .full_table_name for anomaly in anomalies }
681690 rprint (
682691 f"[blue]📋 Analyzed { tables_analyzed } largest tables, found issues in { len (unique_tables )} tables[/blue]"
683692 )
@@ -731,7 +740,8 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
731740 overloaded = [node for node , count in counts .items () if count == max_count ]
732741 underloaded = [node for node , count in counts .items () if count == min_count ]
733742 rprint (
734- f" [red]⚠ Issue:[/red] { overloaded [0 ]} has { max_count } shards while { underloaded [0 ]} has only { min_count } shards"
743+ f" [red]⚠ Issue:[/red] { overloaded [0 ]} has { max_count } shards "
744+ f"while { underloaded [0 ]} has only { min_count } shards"
735745 )
736746
737747 elif anomaly .anomaly_type == "Storage Imbalance" :
@@ -742,19 +752,20 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
742752 overloaded = [node for node , size in sizes .items () if size == max_size ][0 ]
743753 underloaded = [node for node , size in sizes .items () if size == min_size ][0 ]
744754 rprint (
745- f" [red]⚠ Issue:[/red] Storage ranges from { format_storage_size (min_size )} ({ underloaded } ) to { format_storage_size (max_size )} ({ overloaded } ) - { max_size / min_size :.1f} x difference"
755+ f" [red]⚠ Issue:[/red] Storage ranges from { format_storage_size (min_size )} ({ underloaded } ) " # noqa: E501
756+ f"to { format_storage_size (max_size )} ({ overloaded } ) - { max_size / min_size :.1f} x difference"
746757 )
747758
748759 elif anomaly .anomaly_type == "Node Coverage Issue" :
749760 if "nodes_without_shards" in anomaly .details :
750761 missing_nodes = anomaly .details ["nodes_without_shards" ]
751762 coverage_ratio = anomaly .details ["coverage_ratio" ]
752763 rprint (
753- f" [red]⚠ Issue:[/red] Table missing from { len (missing_nodes )} nodes ({ coverage_ratio :.0%} cluster coverage)"
754- )
755- rprint (
756- f" [dim] Missing from: { ', ' .join (missing_nodes [:3 ])} { '...' if len (missing_nodes ) > 3 else '' } [/dim]"
764+ f" [red]⚠ Issue:[/red] Table missing from { len (missing_nodes )} nodes "
765+ f"({ coverage_ratio :.0%} cluster coverage)"
757766 )
767+ ellipsis = "..." if len (missing_nodes ) > 3 else ""
768+ rprint (f" [dim] Missing from: { ', ' .join (missing_nodes [:3 ])} { ellipsis } [/dim]" )
758769
759770 elif anomaly .anomaly_type == "Document Imbalance" :
760771 if "document_counts" in anomaly .details :
@@ -763,7 +774,8 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
763774 max_docs = max (doc_counts .values ())
764775 ratio = max_docs / min_docs if min_docs > 0 else float ("inf" )
765776 rprint (
766- f" [red]⚠ Issue:[/red] Document counts range from { min_docs :,} to { max_docs :,} ({ ratio :.1f} x difference)"
777+ f" [red]⚠ Issue:[/red] Document counts range "
778+ f"from { min_docs :,} to { max_docs :,} ({ ratio :.1f} x difference)"
767779 )
768780
769781 # Show recommendations
@@ -772,7 +784,7 @@ def format_distribution_report(self, anomalies: List[DistributionAnomaly], table
772784 rprint (f" • { rec } " )
773785
774786 # Summary statistics
775- unique_tables = set ( anomaly .table .full_table_name for anomaly in anomalies )
787+ unique_tables = { anomaly .table .full_table_name for anomaly in anomalies }
776788 rprint ("\n [dim]📊 Analysis Summary:[/dim]" )
777789 rprint (f"[dim]• Tables analyzed: { tables_analyzed } [/dim]" )
778790 rprint (f"[dim]• Tables with issues: { len (unique_tables )} [/dim]" )
0 commit comments