1
1
#!/usr/bin/env python3
2
2
3
3
# Usage:
4
- # pip3 install bluepy
4
+ # pip3 install bleak asyncio
5
5
# python3 get_beacon_key.py <MAC> <PRODUCT_ID>
6
6
#
7
7
# List of PRODUCT_ID:
14
14
# Example:
15
15
# python3 get_beacon_key.py AB:CD:EF:12:34:56 950
16
16
17
+ import asyncio
17
18
import random
18
19
import re
19
20
import sys
20
21
21
- from bluepy .btle import UUID , DefaultDelegate , Peripheral
22
+ from bleak import BleakClient
23
+ from bleak .uuids import normalize_uuid_16
22
24
23
25
MAC_PATTERN = r"^[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}$"
24
26
25
27
UUID_SERVICE = "fe95"
26
28
27
- HANDLE_AUTH = 3
28
- HANDLE_FIRMWARE_VERSION = 10
29
- HANDLE_AUTH_INIT = 19
30
- HANDLE_BEACON_KEY = 25
29
+ # The characteristics of the 'fe95' service have unique uuid values and thus can be addressed via their uuid
30
+ # this can be checked by using the service explorer from https://github.com/hbldh/bleak/blob/master/examples/service_explorer.py
31
+ HANDLE_AUTH = normalize_uuid_16 (0x0001 )
32
+ HANDLE_FIRMWARE_VERSION = normalize_uuid_16 (0x0004 )
33
+ HANDLE_AUTH_INIT = normalize_uuid_16 (0x0010 )
34
+ HANDLE_BEACON_KEY = normalize_uuid_16 (0x0014 )
31
35
32
36
MI_KEY1 = bytes ([0x90 , 0xCA , 0x85 , 0xDE ])
33
37
MI_KEY2 = bytes ([0x92 , 0xAB , 0x54 , 0xFA ])
@@ -94,7 +98,7 @@ def generateRandomToken() -> bytes:
94
98
return token
95
99
96
100
97
- def get_beacon_key (mac , product_id ):
101
+ async def get_beacon_key (mac , product_id ):
98
102
reversed_mac = reverseMac (mac )
99
103
token = generateRandomToken ()
100
104
@@ -103,29 +107,57 @@ def get_beacon_key(mac, product_id):
103
107
104
108
# Connect
105
109
print ("Connection in progress..." )
106
- peripheral = Peripheral (deviceAddr = mac )
107
- print ("Successful connection!" )
108
-
109
- # Auth (More information: https://github.com/archaron/docs/blob/master/BLE/ylkg08y.md)
110
- print ("Authentication in progress..." )
111
- auth_service = peripheral .getServiceByUUID (UUID_SERVICE )
112
- auth_descriptors = auth_service .getDescriptors ()
113
- peripheral .writeCharacteristic (HANDLE_AUTH_INIT , MI_KEY1 , "true" )
114
- auth_descriptors [1 ].write (SUBSCRIBE_TRUE , "true" )
115
- peripheral .writeCharacteristic (HANDLE_AUTH , cipher (mixA (reversed_mac , product_id ), token ), "true" )
116
- peripheral .waitForNotifications (10.0 )
117
- peripheral .writeCharacteristic (3 , cipher (token , MI_KEY2 ), "true" )
118
- print ("Successful authentication!" )
119
-
120
- # Read
121
- beacon_key = cipher (token , peripheral .readCharacteristic (HANDLE_BEACON_KEY )).hex ()
122
- firmware_version = cipher (token , peripheral .readCharacteristic (HANDLE_FIRMWARE_VERSION )).decode ()
123
-
124
- print (f"beaconKey: '{ beacon_key } '" )
125
- print (f"firmware_version: '{ firmware_version } '" )
126
-
127
-
128
- def main (argv ):
110
+ client = BleakClient (mac )
111
+ try :
112
+ await client .connect ()
113
+ print ("Successful connection!" )
114
+
115
+ # An asyncio future object is needed for callback handling
116
+ future = asyncio .get_event_loop ().create_future ()
117
+
118
+ # Auth (More information: https://github.com/archaron/docs/blob/master/BLE/ylkg08y.md)
119
+ print ("Authentication in progress..." )
120
+
121
+ # Send 0x90, 0xCA, 0x85, 0xDE bytes to authInitCharacteristic.
122
+ await client .write_gatt_char (HANDLE_AUTH_INIT , MI_KEY1 , True )
123
+ # Subscribe authCharacteristic.
124
+ # (a lambda callback is used to set the futures result on the notification event)
125
+ await client .start_notify (HANDLE_AUTH , lambda _ , data : future .set_result (data ))
126
+ # Send cipher(mixA(reversedMac, productID), token) to authCharacteristic.
127
+ await client .write_gatt_char (HANDLE_AUTH , cipher (mixA (reversed_mac , product_id ), token ), True )
128
+ # Now you'll get a notification on authCharacteristic. You must wait for this notification before proceeding to next step
129
+ await asyncio .wait_for (future , 10.0 )
130
+
131
+ # The notification data can be ignored or used to check an integrity, this is optional
132
+ print (f"notifyData: '{ future .result ().hex ()} '" )
133
+ # If you want to perform a check, compare cipher(mixB(reversedMac, productID), cipher(mixA(reversedMac, productID), res))
134
+ # where res is received payload ...
135
+ print (f"cipheredRes: '{ cipher (mixB (reversed_mac , product_id ), cipher (mixA (reversed_mac , product_id ), future .result ())).hex ()} '" )
136
+ # ... with your token, they must equal.
137
+ print (f"randomToken: '{ token .hex ()} '" )
138
+
139
+ # Send 0x92, 0xAB, 0x54, 0xFA to authCharacteristic.
140
+ await client .write_gatt_char (HANDLE_AUTH , cipher (token , MI_KEY2 ), True )
141
+ print ("Successful authentication!" )
142
+
143
+ # Read
144
+ beacon_key = cipher (token , await client .read_gatt_char (HANDLE_BEACON_KEY )).hex ()
145
+ # Read from verCharacteristics. You can ignore the response data, you just have to perform a read to complete authentication process.
146
+ # If the data is used, it will show the firmware version
147
+ firmware_version = cipher (token , await client .read_gatt_char (HANDLE_FIRMWARE_VERSION )).decode ()
148
+
149
+ print (f"beaconKey: '{ beacon_key } '" )
150
+ print (f"firmware_version: '{ firmware_version } '" )
151
+
152
+ print ("Disconnection in progress..." )
153
+ except Exception as e :
154
+ print (e )
155
+ finally :
156
+ await client .disconnect ()
157
+ print ("Disconnected!" )
158
+
159
+
160
+ async def main (argv ):
129
161
# ARGS
130
162
if len (argv ) <= 2 :
131
163
print ("usage: get_beacon_key.py <MAC> <PRODUCT_ID>\n " )
@@ -152,8 +184,8 @@ def main(argv):
152
184
return
153
185
154
186
# BEACON_KEY
155
- get_beacon_key (mac , product_id )
187
+ await get_beacon_key (mac , product_id )
156
188
157
189
158
190
if __name__ == '__main__' :
159
- main (sys .argv )
191
+ asyncio . run ( main (sys .argv ) )
0 commit comments