1
1
import argparse
2
2
import base64
3
3
import sys
4
+ import re
4
5
6
+ from Crypto .Cipher import AES
5
7
from cryptography .hazmat .primitives .ciphers import Cipher , algorithms , modes
6
8
from cryptography .hazmat .backends import default_backend
7
9
import json
8
10
import socket
9
11
10
12
11
13
GENERIC_KEY = "a3K8Bx%2r8Y7#xDh"
14
+ ENCRYPTION_TYPE = 'ECB'
15
+ GENERIC_GCM_KEY = "{yxAHAY_Lm6pbC/<"
16
+ GCM_IV = b'\x54 \x40 \x78 \x44 \x49 \x67 \x5a \x51 \x6c \x5e \x63 \x13 '
17
+ GCM_ADD = b'qualcomm-test'
12
18
13
19
14
20
class ScanResult :
15
21
ip = ''
16
22
port = 0
17
23
id = ''
18
24
name = '<unknown>'
25
+ encryption_type = 'ECB'
19
26
20
- def __init__ (self , ip , port , id , name = '' ):
27
+ def __init__ (self , ip , port , id , name = '' , encryption_type = 'ECB' ):
21
28
self .ip = ip
22
29
self .port = port
23
30
self .id = id
24
31
self .name = name
32
+ self .encryption_type = encryption_type
25
33
26
34
27
35
def send_data (ip , port , data ):
@@ -38,7 +46,13 @@ def send_data(ip, port, data):
38
46
39
47
40
48
def create_request (tcid , pack_encrypted , i = 0 ):
41
- return '{"cid":"app","i":' + str (i ) + ',"t":"pack","uid":0,"tcid":"' + tcid + '","pack":"' + pack_encrypted + '"}'
49
+ request = '{"cid":"app","i":' + str (i ) + ',"t":"pack","uid":0,"tcid":"' + tcid + '",'
50
+ if (isinstance (pack_encrypted , dict )):
51
+ request += '"tag":"' + pack_encrypted ["tag" ] + '","pack":"' + pack_encrypted ["pack" ] + '"}'
52
+ else :
53
+ request += '"pack":"' + pack_encrypted + '"}'
54
+
55
+ return request
42
56
43
57
44
58
def create_status_request_pack (tcid ):
@@ -80,6 +94,40 @@ def encrypt_generic(pack):
80
94
return encrypt (pack , GENERIC_KEY )
81
95
82
96
97
+ def create_GCM_cipher (key ):
98
+ cipher = AES .new (bytes (key , 'utf-8' ), AES .MODE_GCM , nonce = GCM_IV )
99
+ cipher .update (GCM_ADD )
100
+ return cipher
101
+
102
+
103
+ def decrypt_GCM (pack_encoded , tag_encoded , key ):
104
+ cipher = create_GCM_cipher (key )
105
+ base64decodedPack = base64 .b64decode (pack_encoded )
106
+ base64decodedTag = base64 .b64decode (tag_encoded )
107
+ decryptedPack = cipher .decrypt_and_verify (base64decodedPack , base64decodedTag )
108
+ decodedPack = decryptedPack .replace (b'\xff ' , b'' ).decode ('utf-8' )
109
+ return decodedPack
110
+
111
+
112
+ def decrypt_GCM_generic (pack_encoded , tag_encoded ):
113
+ return decrypt_GCM (pack_encoded , tag_encoded , GENERIC_GCM_KEY )
114
+
115
+
116
+ def encrypt_GCM (pack , key ):
117
+ encrypted_data , tag = create_GCM_cipher (key ).encrypt_and_digest (pack .encode ("utf-8" ))
118
+ encrypted_pack = base64 .b64encode (encrypted_data ).decode ('utf-8' )
119
+ tag = base64 .b64encode (tag ).decode ('utf-8' )
120
+ data = {
121
+ "pack" : encrypted_pack ,
122
+ "tag" : tag
123
+ }
124
+ return data
125
+
126
+
127
+ def encrypt_GCM_generic (pack ):
128
+ return encrypt_GCM (pack , GENERIC_GCM_KEY )
129
+
130
+
83
131
def search_devices ():
84
132
print ('Searching for devices using broadcast address: %s' % args .broadcast )
85
133
@@ -106,12 +154,29 @@ def search_devices():
106
154
print (f'search_devices: data={ data } , raw_json={ raw_json } ' )
107
155
108
156
resp = json .loads (raw_json )
109
- pack = json .loads (decrypt_generic (resp ['pack' ]))
157
+
158
+ encryption_type = 'ECB'
159
+ if 'tag' in resp :
160
+ print ('Setting the encryption to GCM because tag property is present in the responce' )
161
+ encryption_type = 'GCM'
162
+
163
+ if encryption_type == 'GCM' :
164
+ decrypted_pack = decrypt_GCM_generic (resp ['pack' ], resp ['tag' ])
165
+ else :
166
+ decrypted_pack = decrypt_generic (resp ['pack' ])
167
+
168
+ pack = json .loads (decrypted_pack )
110
169
111
170
cid = pack ['cid' ] if 'cid' in pack and len (pack ['cid' ]) > 0 else \
112
171
resp ['cid' ] if 'cid' in resp else '<unknown-cid>'
113
172
114
- results .append (ScanResult (address [0 ], address [1 ], cid , pack ['name' ] if 'name' in pack else '<unknown>' ))
173
+ if encryption_type != 'GCM' and 'ver' in pack :
174
+ ver = re .search (r'(?<=V)[0-9]+(?<=.)' , pack ['ver' ])
175
+ if int (ver .group (0 )) >= 2 :
176
+ print ('Set GCM encryption because version in search responce is 2 or later' )
177
+ encryption_type = 'GCM' ;
178
+
179
+ results .append (ScanResult (address [0 ], address [1 ], cid , pack ['name' ] if 'name' in pack else '<unknown>' , encryption_type ))
115
180
116
181
if args .verbose :
117
182
print (f'search_devices: pack={ pack } ' )
@@ -126,19 +191,32 @@ def search_devices():
126
191
127
192
128
193
def bind_device (search_result ):
129
- print ('Binding device: %s (%s, ID: %s)' % (search_result .ip , search_result .name , search_result .id ))
194
+ print ('Binding device: %s (%s, ID: %s, encryption: %s )' % (search_result .ip , search_result .name , search_result .id , search_result . encryption_type ))
130
195
131
196
pack = '{"mac":"%s","t":"bind","uid":0}' % search_result .id
132
- pack_encrypted = encrypt_generic (pack )
197
+ if search_result .encryption_type == 'GCM' :
198
+ pack_encrypted = encrypt_GCM_generic (pack )
199
+ else :
200
+ pack_encrypted = encrypt_generic (pack )
133
201
134
202
request = create_request (search_result .id , pack_encrypted , 1 )
135
- result = send_data (search_result .ip , 7000 , bytes (request , encoding = 'utf-8' ))
203
+ try :
204
+ result = send_data (search_result .ip , 7000 , bytes (request , encoding = 'utf-8' ))
205
+ except socket .timeout :
206
+ print ('Device %s is not responding on bind request' % search_result .ip )
207
+ if search_result .encryption_type != 'GCM' :
208
+ search_result .encryption_type = 'GCM'
209
+ bind_device (search_result )
210
+
211
+ return
136
212
137
213
response = json .loads (result )
138
214
if response ["t" ] == "pack" :
139
- pack = response ["pack" ]
140
215
141
- pack_decrypted = decrypt_generic (pack )
216
+ if search_result .encryption_type == 'GCM' :
217
+ pack_decrypted = decrypt_GCM_generic (response ["pack" ], response ["tag" ])
218
+ else :
219
+ pack_decrypted = decrypt_generic (response ["pack" ])
142
220
143
221
bind_resp = json .loads (pack_decrypted )
144
222
@@ -156,10 +234,13 @@ def get_param():
156
234
cols = ',' .join (f'"{ i } "' for i in args .params )
157
235
158
236
pack = f'{{"cols":[{ cols } ],"mac":"{ args .id } ","t":"status"}}'
159
- pack_encrypted = encrypt (pack , args .key )
160
-
161
- request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
162
- % (pack_encrypted , args .id )
237
+ request = '{"cid":"app","i":0,"t":"pack","uid":0,"tcid":"%s",' % args .id
238
+ if ENCRYPTION_TYPE == 'GCM' :
239
+ data_encrypted = encrypt_GCM (pack , args .key )
240
+ request += '"tag":"%s","pack":"%s"}' % (data_encrypted ["tag" ], data_encrypted ["pack" ])
241
+ else :
242
+ pack_encrypted = encrypt (pack , args .key )
243
+ request += '"pack":"%s"}' % pack_encrypted
163
244
164
245
result = send_data (args .client , 7000 , bytes (request , encoding = 'utf-8' ))
165
246
@@ -171,7 +252,12 @@ def get_param():
171
252
if response ["t" ] == "pack" :
172
253
pack = response ["pack" ]
173
254
174
- pack_decrypted = decrypt (pack , args .key )
255
+ if ENCRYPTION_TYPE == 'GCM' :
256
+ tag = response ["tag" ]
257
+ pack_decrypted = decrypt_GCM (pack , tag , args .key )
258
+ else :
259
+ pack_decrypted = decrypt (pack , args .key )
260
+
175
261
pack_json = json .loads (pack_decrypted )
176
262
177
263
if args .verbose :
@@ -196,10 +282,15 @@ def set_param():
196
282
197
283
pack = f'{{"opt":[{ opts } ],"p":[{ ps } ],"t":"cmd"}}'
198
284
print (pack )
199
- pack_encrypted = encrypt (pack , args .key )
200
285
201
- request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
202
- % (pack_encrypted , args .id )
286
+ request = '{"cid":"app","i":0,"t":"pack","tcid":"%s","uid":0,' % args .id
287
+
288
+ if ENCRYPTION_TYPE == 'GCM' :
289
+ data_encrypted = encrypt_GCM (pack , args .key )
290
+ request += '"tag":"%s","pack":"%s"}' % (data_encrypted ["tag" ], data_encrypted ["pack" ])
291
+ else :
292
+ pack_encrypted = encrypt (pack , args .key )
293
+ request += '"pack":"%s"}' % pack_encrypted
203
294
204
295
result = send_data (args .client , 7000 , bytes (request , encoding = 'utf-8' ))
205
296
@@ -211,7 +302,12 @@ def set_param():
211
302
if response ["t" ] == "pack" :
212
303
pack = response ["pack" ]
213
304
214
- pack_decrypted = decrypt (pack , args .key )
305
+ if ENCRYPTION_TYPE == 'GCM' :
306
+ tag = response ["tag" ]
307
+ pack_decrypted = decrypt_GCM (pack , tag , args .key )
308
+ else :
309
+ pack_decrypted = decrypt (pack , args .key )
310
+
215
311
pack_json = json .loads (pack_decrypted )
216
312
217
313
if args .verbose :
@@ -228,15 +324,21 @@ def set_param():
228
324
parser .add_argument ('command' , help = 'You can use the following commands: search, get, set' )
229
325
parser .add_argument ('-c' , '--client' , help = 'IP address of the client device' )
230
326
parser .add_argument ('-b' , '--broadcast' , help = 'Broadcast IP address of the network the devices connecting to' )
231
- parser .add_argument ('-i' , '--id' , help = 'Unique ID of the device' )
327
+ parser .add_argument ('-i' , '--id' , help = 'Unique ID of the device (mac address) ' )
232
328
parser .add_argument ('-k' , '--key' , help = 'Unique encryption key of the device' )
329
+ parser .add_argument ('-e' , '--encryption' , help = 'Set the encryption type AES128 used: ECB(default), GCM' )
233
330
parser .add_argument ('--verbose' , help = 'Enable verbose logging' , action = 'store_true' )
234
331
if sys .platform == 'linux' :
235
332
parser .add_argument ('--socket-interface' , help = 'Bind the socket to a specific network interface' )
236
333
parser .add_argument ('params' , nargs = '*' , default = None , type = str )
237
334
238
335
args = parser .parse_args ()
239
336
337
+ if args .encryption is None :
338
+ ENCRYPTION_TYPE = 'ECB'
339
+ else :
340
+ ENCRYPTION_TYPE = args .encryption
341
+
240
342
command = args .command .lower ()
241
343
if command == 'search' :
242
344
if args .broadcast is None :
0 commit comments