4
4
import os
5
5
from typing import List , NamedTuple
6
6
from tqdm import tqdm
7
+ import concurrent .futures
8
+ import chardet
7
9
8
10
9
11
class ScanResult (NamedTuple ):
@@ -27,8 +29,7 @@ def _load_checks(self):
27
29
checks .append (check_class ())
28
30
return checks
29
31
30
- def scan (self , path : str , limit : int = 40000 ) -> List [ScanResult ]:
31
- results = []
32
+ def scan (self , path : str , limit : int = 1000000000 , num_threads : int = 48 ) -> List [ScanResult ]:
32
33
files_to_scan = []
33
34
34
35
# Collect all files to scan
@@ -47,24 +48,42 @@ def scan(self, path: str, limit: int = 40000) -> List[ScanResult]:
47
48
# Limit the number of files to scan
48
49
files_to_scan = files_to_scan [:limit ]
49
50
50
- # Scan files with progress bar
51
- for file_path in tqdm (files_to_scan , desc = "Scanning files" , unit = "file" ):
52
- results .extend (self ._scan_file (file_path ))
51
+ # Scan files in parallel with progress bar
52
+ with concurrent .futures .ThreadPoolExecutor (max_workers = num_threads ) as executor :
53
+ futures = [executor .submit (self ._scan_file , file_path ) for file_path in files_to_scan ]
54
+ results = []
55
+ for future in tqdm (concurrent .futures .as_completed (futures ), total = len (files_to_scan ), desc = "Scanning files" , unit = " file" ):
56
+ results .extend (future .result ())
53
57
54
58
return results
55
59
56
60
def _scan_file (self , file_path : str ) -> List [ScanResult ]:
57
61
results = []
58
- with open (file_path , 'r' ) as f :
59
- content = f .read ()
60
- for check in self .checks :
61
- check_results = check .run (content )
62
- for result in check_results :
63
- results .append (ScanResult (
64
- file_path = file_path ,
65
- line_number = result .line_number ,
66
- title = check .title ,
67
- message = result .line_content ,
68
- severity = check .severity
69
- ))
70
- return results
62
+ try :
63
+ # First, try to detect the file encoding
64
+ with open (file_path , 'rb' ) as f :
65
+ raw_data = f .read ()
66
+ detected_encoding = chardet .detect (raw_data )['encoding' ]
67
+
68
+ # Try to read the file with the detected encoding
69
+ try :
70
+ with open (file_path , 'r' , encoding = detected_encoding ) as f :
71
+ content = f .read ()
72
+ except UnicodeDecodeError :
73
+ # If that fails, try with 'latin-1' encoding, which should read all byte values
74
+ with open (file_path , 'r' , encoding = 'latin-1' ) as f :
75
+ content = f .read ()
76
+
77
+ for check in self .checks :
78
+ check_results = check .run (content )
79
+ for result in check_results :
80
+ results .append (ScanResult (
81
+ file_path = file_path ,
82
+ line_number = result .line_number ,
83
+ title = check .title ,
84
+ message = result .line_content ,
85
+ severity = check .severity
86
+ ))
87
+ except Exception as e :
88
+ print (f"Error scanning file { file_path } : { str (e )} " )
89
+ return results
0 commit comments