Skip to content

Commit cb86145

Browse files
committed
first commit
1 parent 4559cad commit cb86145

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Purple Air AQI Display
2+
# for Metro M4 Airlift with RGB Matrix Shield
3+
# or Matrix Portal
4+
# and 64 x 32 RGB LED Matrix
5+
6+
import time
7+
import board
8+
import terminalio
9+
from adafruit_matrixportal.matrixportal import MatrixPortal
10+
11+
def aqi_transform(val):
12+
aqi = pm_to_aqi(val) # derive Air Quality Index from Particulate Matter 2.5 value
13+
return "AQI: %d" % aqi
14+
15+
def message_transform(val): # picks message based on thresholds
16+
index = aqi_to_list_index(pm_to_aqi(val))
17+
messages = (
18+
"Hazardous",
19+
"Very Unhealthy",
20+
"Unhealthy",
21+
"Unhealthy for Sensitive Groups",
22+
"Moderate",
23+
"Good",
24+
)
25+
26+
if index is not None:
27+
return messages[index]
28+
return "Unknown"
29+
30+
SENSOR_ID = 3085 # Poughkeepsie # 30183 LA outdoor / 37823 oregon / 21441 NYC
31+
SENSOR_REFRESH_PERIOD = 30 # seconds
32+
DATA_SOURCE = "https://www.purpleair.com/json?show=" + str(SENSOR_ID)
33+
SCROLL_DELAY = 0.02
34+
DATA_LOCATION = ["results", 0, "PM2_5Value"] # navigate the JSON response
35+
36+
# --- Display setup ---
37+
matrixportal = MatrixPortal(
38+
status_neopixel=board.NEOPIXEL,
39+
debug=True,
40+
url=DATA_SOURCE,
41+
json_path=(DATA_LOCATION, DATA_LOCATION),
42+
)
43+
44+
# Create a static label to show AQI
45+
matrixportal.add_text(
46+
text_font=terminalio.FONT,
47+
text_position=(8, 7),
48+
text_transform=aqi_transform,
49+
)
50+
51+
# Create a scrolling label to show level message
52+
matrixportal.add_text(
53+
text_font=terminalio.FONT,
54+
text_position=(0, 23),
55+
scrolling=True,
56+
text_transform=message_transform,
57+
)
58+
# pylint: disable=too-many-return-statements
59+
def aqi_to_list_index(aqi):
60+
aqi_groups = (301, 201, 151, 101, 51, 0)
61+
for index, group in enumerate(aqi_groups):
62+
if aqi >= group:
63+
return index
64+
return None
65+
66+
def calculate_aqi(Cp, Ih, Il, BPh, BPl): # wikipedia.org/wiki/Air_quality_index#Computing_the_AQI
67+
return round(((Ih - Il)/(BPh - BPl)) * (Cp - BPl) + Il)
68+
69+
def pm_to_aqi(pm):
70+
pm = float(pm)
71+
if pm < 0:
72+
return pm
73+
if pm > 1000:
74+
return 1000
75+
if pm > 350.5:
76+
return calculate_aqi(pm, 500, 401, 500, 350.5)
77+
elif pm > 250.5:
78+
return calculate_aqi(pm, 400, 301, 350.4, 250.5)
79+
elif pm > 150.5:
80+
return calculate_aqi(pm, 300, 201, 250.4, 150.5)
81+
elif pm > 55.5:
82+
return calculate_aqi(pm, 200, 151, 150.4, 55.5)
83+
elif pm > 35.5:
84+
return calculate_aqi(pm, 150, 101, 55.4, 35.5)
85+
elif pm > 12.1:
86+
return calculate_aqi(pm, 100, 51, 35.4, 12.1)
87+
elif pm >= 0:
88+
return calculate_aqi(pm, 50, 0, 12, 0)
89+
else:
90+
return None
91+
92+
def get_color(aqi):
93+
index = aqi_to_list_index(aqi)
94+
colors = (
95+
(115, 20, 37),
96+
(140, 26, 75),
97+
(234, 51, 36),
98+
(239, 133, 51),
99+
(255, 255, 85),
100+
(104, 225, 67),
101+
)
102+
if index is not None:
103+
return colors[index]
104+
return (150, 150, 150)
105+
106+
sensor_refresh = None
107+
while True:
108+
# only query the weather every 10 minutes (and on first run)
109+
if (not sensor_refresh) or (time.monotonic() - sensor_refresh) > SENSOR_REFRESH_PERIOD:
110+
try:
111+
value = matrixportal.fetch()
112+
print("Response is", value)
113+
matrixportal.set_text_color(get_color(pm_to_aqi(value[0])))
114+
sensor_refresh = time.monotonic()
115+
except RuntimeError as e:
116+
print("Some error occured, retrying! -", e)
117+
continue
118+
119+
# Scroll it
120+
matrixportal.scroll_text(SCROLL_DELAY)

0 commit comments

Comments
 (0)