-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathAirPlot.py
351 lines (258 loc) · 9.08 KB
/
AirPlot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
from time import perf_counter
import ctypes
import numpy as np
import pyqtgraph as pg
##Use Reference IMU?
useRefImu = False
#Class to store sensor data
class SensorData:
def __init__(self):
self.x = 0
self.y = 0
self.z = 0
# Create instances of SensorData class
ref_gyro = SensorData()
ref_accel = SensorData()
ref_mag = SensorData()
ref_euler = SensorData()
if useRefImu:
#### Configure Reference IMU ####
import threading
import serial
# Open serial port on COM# at a baud rate of 115200
ser = serial.Serial('COM3', 115200)
def read_serial_data():
while True:
#read line from serial port
line = ser.readline().decode('ascii' , errors='replace')
#split based on comma
line = line.split(',')
if line[0] == '$PCHRA':
ref_euler.x = line[2] #roll
ref_euler.y = line[3] #pitch
ref_euler.z = line[4] #yaw
else:
#Determine which sensor is sending data
match line[1]:
case '0':
ref_gyro.x = line[3]
ref_gyro.y = line[4]
ref_gyro.z = line[5]
#print(gyro.x, gyro.y, gyro.z)
case '1':
ref_accel.x = line[3]
ref_accel.y = line[4]
ref_accel.z = line[5]
#print(accel.x, accel.y, accel.z)
case '2':
ref_mag.x = line[3]
ref_mag.y = line[4]
ref_mag.z = line[5]
#print(mag.x, mag.y, mag.z)
# Create and start the thread
serial_thread = threading.Thread(target=read_serial_data)
serial_thread.start()
#### Configure DLL ####
# Load the DLL
AirAPI = ctypes.cdll.LoadLibrary("./DevDLL/AirAPI_Windows.dll")
# Set the return type of the GetQuaternion function to a float pointer
AirAPI.GetRawGyro.restype = ctypes.POINTER(ctypes.c_float)
AirAPI.GetRawAccel.restype = ctypes.POINTER(ctypes.c_float)
AirAPI.GetEuler.restype = ctypes.POINTER(ctypes.c_float)
AirAPI.GetRawMag.restype = ctypes.POINTER(ctypes.c_float)
AirAPI.GetRejectionCounters.restype = ctypes.POINTER(ctypes.c_int64)
#set return type of GetAirTimestamp to uint64
AirAPI.GetAirTimestamp.restype = ctypes.c_uint64
# Call StartConnection
print("Attempting to connect to AirAPI_Driver...")
connection = AirAPI.StartConnection()
def getRawGyro():
# Call GetRawGyro function from the DLL
gyroPtr = AirAPI.GetRawGyro()
rawGyro = [gyroPtr[i] for i in range(3)]
return rawGyro
def getRawAccel():
# Call GetRawAccel function from the DLL
AccelPtr = AirAPI.GetRawAccel()
rawAccel = [AccelPtr[i] for i in range(3)]
return rawAccel
def getRawMag():
# Call GetRawMag function from the DLL
magPtr = AirAPI.GetRawMag()
rawMag = [magPtr[i] for i in range(3)]
return rawMag
def getEuler():
# Call GetEuler function from the DLL
eulerPtr = AirAPI.GetEuler()
euler = [eulerPtr[i] for i in range(3)]
return euler
def getRejectionCounters():
# Call GetRejectionCounters function from the DLL
rejectionCountersPtr = AirAPI.GetRejectionCounters()
rejectionCounters = [rejectionCountersPtr[i] for i in range(2)]
return rejectionCounters
#### Configure Data sources ####
rawGyroReading = [0,0,0]
rawAccelReading = [0,0,0]
rawMagReading = [0,0,0]
eulerReading = [0,0,0]
rejectionCounters = [0,0]
##Air Data source
def pollAirData():
global rawGyroReading
global rawAccelReading
global eulerReading
global rejectionCounters
rawGyroReading = getRawGyro()
rawAccelReading = getRawAccel()
eulerReading = getEuler()
rawMagReading = getRawMag()
rejectionCounters = getRejectionCounters()
return
## Air data will be drawn grey
def getGyroX():
return rawGyroReading[0]
## Reference data will be drawn red
def getRefGyroX():
return ref_gyro.x
def getGyroY():
return rawGyroReading[1]
def getRefGyroY():
return ref_gyro.y
def getGyroZ():
return rawGyroReading[2]
def getRefGyroZ():
return ref_gyro.z
def getAccelX():
return rawAccelReading[0]
def getRefAccelX():
return ref_accel.x
def getAccelY():
return rawAccelReading[1]
def getRefAccelY():
return ref_accel.y
def getAccelZ():
return rawAccelReading[2]
def getRefAccelZ():
return ref_accel.z
def getMagX():
return rawMagReading[0]
def getRefMagX():
return ref_mag.x
def getMagY():
return rawMagReading[1]
def getRefMagY():
return ref_mag.y
def getMagZ():
return rawMagReading[2]
def getRefMagZ():
return ref_mag.z
def getEulerX():
return eulerReading[0]
def getRefEulerX():
return ref_euler.x
def getEulerY():
return eulerReading[1]
def getRefEulerY():
return ref_euler.y
def getEulerZ():
return eulerReading[2]
def getRefEulerZ():
return ref_euler.z
def getTimestamp():
# Call GetAirTimestamp function from the DLL and convert nanoseconds to seconds
ts = AirAPI.GetAirTimestamp() / 1000000000
return ts
def getAccelRejectCounter():
return rejectionCounters[0]
def getMagRejectCounter():
return rejectionCounters[1]
def create_plot(window, title,source, ref_source=None,XRange=None,YRange=None, row=None, colspan=2):
plot = window.addPlot(title=title, colspan=colspan, row=row)
plot.setLabel('bottom', 'Time', 's')
if(XRange != None):
plot.setXRange(XRange[0], XRange[1])
else:
plot.setXRange(-10, 0)
plot.addLegend()
if(YRange != None):
plot.setYRange(YRange[0], YRange[1])
data = np.empty((chunkSize, 2))
data[:, 0] = np.linspace(-10, 0, chunkSize)
data[:, 1] = np.nan
ref_data = np.empty((chunkSize, 2)) if ref_source else None
if ref_data is not None:
ref_data[:, 0] = np.linspace(-10, 0, chunkSize)
ref_data[:, 1] = np.nan
ptr = 0
main_curve = plot.plot(name="Air")
ref_curve = plot.plot(name="Ref", pen='r') if ref_source else None
def update():
nonlocal data, ref_data, ptr, main_curve, ref_curve
now = perf_counter()
data[:-1] = data[1:]
data[-1] = [now - startTime, source()]
if ref_data is not None:
ref_data[:-1] = ref_data[1:]
ref_data[-1] = [now - startTime, ref_source()]
plot.setXRange(now - startTime - 5, now - startTime)
main_curve.setData(x=data[:, 0], y=data[:, 1])
if ref_curve:
ref_curve.setData(x=ref_data[:, 0], y=ref_data[:, 1])
ptr += 1
return update
win = pg.GraphicsLayoutWidget(show=True, size=(1600, 800))
win.setWindowTitle('AirPlot')
chunkSize = 100
maxChunks = 10
startTime = perf_counter()
# Set X range to 10 seconds
gyroXRange = [-10,0]
accelXRange = [-10,0]
magXRange = [-10,0]
eularXRange = [-10,0]
# Set Y range
gyroYRange = [-120,120] #Min Max Degrees per second
accelYRange = [-2,2] #Min Max G's
magYRange = [-5,5] #Min Max Gauss
eulerYRange = [-180,180] #Min Max Degrees
updateGyroX = create_plot(win, "Gyro X", getGyroX, getRefGyroX, XRange=gyroXRange, YRange=gyroYRange)
updateAccelX = create_plot(win, "Accel X", getAccelX, getRefAccelX, XRange=accelXRange, YRange=accelYRange)
updateMagX = create_plot(win, "Mag X", getMagX,getRefMagX, XRange=magXRange, YRange=magYRange)
updateEulerX = create_plot(win, "Euler X", getEulerX, getRefEulerX, XRange=eularXRange, YRange=eulerYRange)
win.nextRow()
updateGyroY = create_plot(win, "Gyro Y", getGyroY, getRefGyroY, XRange=gyroXRange, YRange=gyroYRange)
updateAccelY = create_plot(win, "Accel Y", getAccelY, getRefAccelY, XRange=accelXRange, YRange=accelYRange)
updateMagY = create_plot(win, "Mag Y", getMagY,getRefMagY, XRange=magXRange, YRange=magYRange)
updateEulerY = create_plot(win, "Euler Y", getEulerY, getRefEulerY, XRange=eularXRange, YRange=eulerYRange)
win.nextRow()
updateGyroZ = create_plot(win, "Gyro Z", getGyroZ, getRefGyroZ, XRange=gyroXRange, YRange=gyroYRange)
updateAccelZ = create_plot(win, "Accel Z", getAccelZ, getRefAccelZ, XRange=accelXRange, YRange=accelYRange)
updateMagZ = create_plot(win, "Mag Z", getMagZ, getRefMagZ,XRange=magXRange, YRange=magYRange)
updateEulerZ = create_plot(win, "Euler Z", getEulerZ, getRefEulerZ, XRange=eularXRange, YRange=eulerYRange)
win.nextRow()
updateTimestamp = create_plot(win, "Timestamp", getTimestamp, XRange=[-10,0], YRange=None, colspan=2)
updateAccelRejectCounter = create_plot(win, "Accel Reject Counter", getAccelRejectCounter, XRange=[-10,0], YRange=None, colspan=2)
updateMagRejectCounter = create_plot(win, "Mag Reject Counter", getMagRejectCounter, XRange=[-10,0], YRange=None, colspan=2)
def update():
pollAirData()
updateGyroX()
updateAccelX()
updateMagX()
updateEulerX()
updateGyroY()
updateAccelY()
updateMagY()
updateEulerY()
updateGyroZ()
updateAccelZ()
updateMagZ()
updateEulerZ()
updateTimestamp()
updateAccelRejectCounter()
updateMagRejectCounter()
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(50)
if __name__ == '__main__':
pg.exec()