-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathadsbsniffer.py
88 lines (79 loc) · 3.66 KB
/
adsbsniffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import logging
import os
import subprocess
import json
import time
from datetime import datetime
import pwnagotchi.plugins as plugins
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
class ADSBSniffer(plugins.Plugin):
__author__ = '4li3nMaJ1k'
__version__ = '0.1.0'
__license__ = 'GPL3'
__description__ = 'A plugin that captures ADS-B data from aircraft using RTL-SDR and logs it.'
def __init__(self):
self.options = {
'timer': 60, # Time interval in seconds for checking for new aircraft
'aircraft_file': '/root/handshakes/adsb_aircraft.json', # File to store detected aircraft information
'adsb_x_coord': 160,
'adsb_y_coord': 80
}
self.last_scan_time = 0
self.data = {}
def on_loaded(self):
logging.info("[ADSB] ADSBSniffer plugin loaded.")
if not os.path.exists(os.path.dirname(self.options['aircraft_file'])):
os.makedirs(os.path.dirname(self.options['aircraft_file']))
if not os.path.exists(self.options['aircraft_file']):
with open(self.options['aircraft_file'], 'w') as f:
json.dump({}, f)
with open(self.options['aircraft_file'], 'r') as f:
self.data = json.load(f)
def on_ui_setup(self, ui):
ui.add_element('ADSB', LabeledValue(color=BLACK,
label='ADSB',
value=" ",
position=(self.options["adsb_x_coord"],
self.options["adsb_y_coord"]),
label_font=fonts.Small,
text_font=fonts.Small))
def on_ui_update(self, ui):
current_time = time.time()
if current_time - self.last_scan_time >= self.options['timer']:
self.last_scan_time = current_time
result = self.scan()
ui.set('ADSB', result)
def scan(self):
logging.info("[ADSB] Scanning for ADS-B signals...")
cmd = "timeout 10s rtl_adsb"
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
aircrafts = self.parse_output(output.decode('utf-8'))
return f"{len(aircrafts)} aircrafts detected"
except subprocess.CalledProcessError as e:
if e.returncode == 124: # Graceful handling of the timeout exit status
logging.info("[ADSB] Successfully completed ADS-B scan.")
output = e.output.decode('utf-8')
aircrafts = self.parse_output(output)
return f"{len(aircrafts)} aircrafts detected"
else:
logging.error("[ADSB] Error running rtl_adsb: %s, output: %s", e, e.output.decode('utf-8'))
return "Scan error"
def parse_output(self, raw_data):
aircrafts = []
for line in raw_data.split('\n'):
if line.strip():
aircraft_data = line.split(',')
if len(aircraft_data) >= 2:
hex_id, signal = aircraft_data[0], aircraft_data[1]
aircrafts.append({'hex': hex_id, 'signal_strength': signal})
self.data[hex_id] = {'last_seen': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'signal_strength': signal}
with open(self.options['aircraft_file'], 'w') as f:
json.dump(self.data, f)
return aircrafts
def on_unload(self, ui):
with ui._lock:
ui.remove_element('ADSB')