-
-
Notifications
You must be signed in to change notification settings - Fork 33
/
LeakSearch.py
159 lines (128 loc) · 5.65 KB
/
LeakSearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/python3
#============================#
# LeakSearch by @JoelGMSec #
# https://darkbyte.net #
#============================#
import os
import json
import urllib3
import requests
import argparse
from tabulate import tabulate
from neotermcolor import colored
from requests import ConnectionError
urllib3.disable_warnings()
banner = r"""
_ _ ____ _
| | ___ __ _| | __/ ___| ___ __ _ _ __ ___| |__
| | / _ \/ _` | |/ /\___ \ / _ \/ _` | '__/ __| '_ \
| |__| __/ (_| | < ___) | __/ (_| | | | (__| | | |
|_____\___|\__,_|_|\_\|____/ \___|\__,_|_| \___|_| |_|"""
banner2 = r"""
------------------- by @JoelGMSec -------------------
"""
def find_leaks_proxynova(email, proxy, number):
url = f"https://api.proxynova.com/comb?query={email}"
headers = {'User-Agent': 'curl'}
session = requests.session()
if proxy:
session.proxies = {'http': proxy, 'https': proxy}
response = session.get(url, headers=headers, verify=False)
if response.status_code == 200:
data = json.loads(response.text)
total_results = data.get("count", 0)
print(colored(f"[*] Found {total_results} different records in database", "magenta"), end='')
lines = data.get("lines", [])[:number]
return lines
else:
print(colored(f"[!] Failed to fetch results from ProxyNova. Status code: {response.status_code}\n", "red"))
return []
def find_leaks_local_db(database, keyword, number):
if not os.path.exists(database):
print(colored(f"[!] Local database file not found: {database}\n", "red"))
exit(-1)
if database.endswith('.json'):
with open(database, 'r') as json_file:
try:
data = json.load(json_file)
lines = data.get("lines", [])
except json.JSONDecodeError:
print(colored("[!] Error: Failed to parse local database as JSON\n", "red"))
exit(-1)
else:
file_length = os.path.getsize(database)
block_size = 1
line_count = 0
results = []
try:
with open(database, 'r') as file:
while True:
block = [next(file).strip() for _ in range(block_size)]
line_count += len(block)
if not block or line_count > file_length:
break
filtered_block = [line for line in block if keyword.lower() in line.lower()]
results.extend(filtered_block)
print(colored(f"\r[*] Reading {line_count} lines in database..", "magenta"), end='', flush=True)
if number is not None and len(results) >= number:
break
except KeyboardInterrupt:
print (colored("\n[!] Exiting..\n", "red"))
exit(-1)
except:
pass
return results[:number] if number is not None else results
def main(database, keyword, output=None, proxy=None, number=20):
print(colored(f"[>] Searching for {keyword} leaks in {database}..", "yellow"))
if database.lower() == "proxynova":
results = find_leaks_proxynova(keyword.strip(), proxy, number)
else:
results = find_leaks_local_db(database.strip(), keyword.strip(), number)
if not results:
print(colored(f"\n[!] No leaks found in {database}!\n", "red"))
else:
print_results(results, output, number)
def print_results(results, output, number):
print(colored(f"\n[-] Selecting the first {len(results)} results..", "blue"))
headers = ["Username@Domain", "Password"]
table_data = []
for line in results:
parts = line.split(":")
if len(parts) == 2:
username_domain, password = parts
table_data.append([username_domain, password])
if output is not None:
if output.endswith('.json'):
with open(output, 'w') as json_file:
json.dump({"lines": results}, json_file, indent=2)
print(colored(f"[+] Data saved successfully in {output}!\n", "green"))
else:
with open(output, 'w') as txt_file:
txt_file.write(tabulate(table_data, headers, showindex="never"))
print(colored(f"[+] Data saved successfully in {output}!\n", "green"))
else:
print(colored("[+] Done!\n", "green"))
print(tabulate(table_data, headers, showindex="never"))
print()
if __name__ == '__main__':
print(colored(banner, "blue"))
print(colored(banner2, "green"))
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--database", default="ProxyNova", help="Database used for the search (ProxyNova or LocalFile)")
parser.add_argument("-k", "--keyword", help="Keyword (user/domain/pass) to search for leaks in the DB")
parser.add_argument("-n", "--number", type=int, default=20, help="Number of results to show (default is 20)")
parser.add_argument("-o", "--output", help="Save the results as json or txt into a file")
parser.add_argument("-p", "--proxy", help="Set HTTP/S proxy (like http://localhost:8080)")
args = parser.parse_args()
if not args.keyword:
parser.print_help()
exit(-1)
try:
main(args.database, args.keyword, args.output, args.proxy, args.number)
except ConnectionError:
print(colored("[!] Can't connect to service! Check your internet connection!\n", "red"))
except KeyboardInterrupt:
print (colored("\n[!] Exiting..\n", "red"))
exit(-1)
except Exception as e:
print(colored(f"\n[!] Error: {e}\n", "red"))