Skip to content

Commit 023c2bb

Browse files
committed
Periodically reset window if not configured to broadcast stats
1 parent f99b95c commit 023c2bb

File tree

3 files changed

+446
-0
lines changed

3 files changed

+446
-0
lines changed

src/modules/ReplayModule.cpp

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
#include "ReplayModule.h"
2+
#include "memGet.h"
3+
#include "modules/RoutingStatsModule.h"
4+
5+
ReplayModule *replayModule{};
6+
7+
/**
8+
* Constructor
9+
*/
10+
ReplayModule::ReplayModule() : SinglePortModule("Replay", meshtastic_PortNum_REPLAY_APP), NotifiedWorkerThread("Replay")
11+
{
12+
notify((uint32_t)ReplayNotification::INIT, true);
13+
}
14+
15+
/**
16+
* Adopt a packet into the replay buffer
17+
*/
18+
ReplayPacket *ReplayModule::adoptPacket(const meshtastic_MeshPacket *p, bool from_tx)
19+
{
20+
ReplayPacket *r = learnPacket(PACKET_HASH(p->from, p->id), getPriority(p));
21+
if (from_tx)
22+
r->last_tx_millis = millis();
23+
else
24+
r->last_rx_millis = millis();
25+
26+
if (!r->cache) {
27+
r->cache = packetCache.cache(p, false);
28+
if (r->cache) {
29+
cache_packets++;
30+
cache_bytes += sizeof(*r->cache) + r->cache->payload_len;
31+
dirty.set(r->idx);
32+
dirty_fast.set(r->idx);
33+
routingStats->logEvent(RoutingEvent::REPLAY_CACHE_PACKETS, NULL, cache_packets);
34+
routingStats->logEvent(RoutingEvent::REPLAY_CACHE_BYTES, NULL, cache_bytes);
35+
pruneCache();
36+
LOG_DEBUG("Replay: Added packet to cache (packets=%lu bytes=%lu hash=0x%04x from=0x%08x id=0x%08x)", cache_packets,
37+
cache_bytes, r->hash, p->from, p->id);
38+
}
39+
}
40+
r->missed = false;
41+
42+
return r;
43+
}
44+
45+
/**
46+
* Get the replay priority for a packet
47+
*/
48+
ReplayPriority ReplayModule::getPriority(const meshtastic_MeshPacket *p)
49+
{
50+
ReplayPriority prio = meshtastic_Config_ReplayConfig_ReplayPriority_NORMAL;
51+
if (!p)
52+
return prio; // This isn't a packet
53+
54+
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
55+
56+
// Adjustments for decrypted packets
57+
if (p->decoded.portnum >= 64)
58+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_LOW; // Non-core apps are assumed low priority by default
59+
// App-specific adjustments
60+
switch (p->decoded.portnum) {
61+
case meshtastic_PortNum_UNKNOWN_APP: // Opaque app traffic, we don't understand it
62+
case meshtastic_PortNum_POSITION_APP: // Position updates are frequent and automatic
63+
case meshtastic_PortNum_AUDIO_APP: // Large payload, very inefficient, demote it to bottom of the heap
64+
case meshtastic_PortNum_IP_TUNNEL_APP: // Potentially large source of traffic, it's abusive, demote it
65+
case meshtastic_PortNum_STORE_FORWARD_APP: // Potentially large source of traffic, demote it
66+
case meshtastic_PortNum_RANGE_TEST_APP: // These are zero-hop anyway
67+
case meshtastic_PortNum_TELEMETRY_APP: // Frequent and automatic, demote it
68+
case meshtastic_PortNum_NEIGHBORINFO_APP: // Automatic and repeated, reliability via redundancy rather than replay
69+
case meshtastic_PortNum_RETICULUM_TUNNEL_APP: // Potentially large source of foreign traffic, demote it
70+
case meshtastic_PortNum_CAYENNE_APP: // LoRaWAN sensor data
71+
case _meshtastic_PortNum_MAX: // This isn't a real app, somebody probably screwed up, so deprioritise it
72+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_BACKGROUND;
73+
break;
74+
case meshtastic_PortNum_NODEINFO_APP: // Nodeinfo is repeated often and likely cached, so give way to other stuff:
75+
case meshtastic_PortNum_SERIAL_APP: // Chatty, not as bad as IP
76+
case meshtastic_PortNum_TRACEROUTE_APP: // Meshsense gets abusive with this, demote despite potentially
77+
// human-initiated
78+
case meshtastic_PortNum_ATAK_PLUGIN: // Can get very chatty, degrade gracefully by being more eager to drop this
79+
case meshtastic_PortNum_ATAK_FORWARDER: // Can get very chatty, degrade gracefully by being more eager to drop this
80+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_LOW;
81+
break;
82+
case meshtastic_PortNum_REMOTE_HARDWARE_APP: // Some user likely cares about this, especially if it's for HW control
83+
case meshtastic_PortNum_ROUTING_APP: // Routing packets matter, but are secondary to priority user traffic
84+
case meshtastic_PortNum_DETECTION_SENSOR_APP: // Event-triggered, it matters but not as much as human-initiated stuff
85+
case meshtastic_PortNum_REPLY_APP: // Likely human-initiated, but it's just a ping
86+
case meshtastic_PortNum_PAXCOUNTER_APP: // Event-triggered, it matters but not as much as human-initiated stuff
87+
case meshtastic_PortNum_REPLAY_APP: // Anything from the replay module that's important is sent zero-hop
88+
case meshtastic_PortNum_POWERSTRESS_APP: // Seems like a temporary thing that a human will care about
89+
case meshtastic_PortNum_PRIVATE_APP: // Something bespoke, leave it alone
90+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_NORMAL;
91+
break;
92+
case meshtastic_PortNum_ADMIN_APP: // Remote admin is critical
93+
case meshtastic_PortNum_TEXT_MESSAGE_APP: // Text messages are important
94+
case meshtastic_PortNum_TEXT_MESSAGE_COMPRESSED_APP:
95+
case meshtastic_PortNum_WAYPOINT_APP: // Explicitly user-initiated and often important for a group to reliably RX
96+
case meshtastic_PortNum_ALERT_APP:
97+
case meshtastic_PortNum_KEY_VERIFICATION_APP: // If this breaks, DMs won't work properly
98+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_HIGH;
99+
break;
100+
default:
101+
break; // Don't adjust priority for apps not explicitly handled above
102+
}
103+
104+
/**
105+
* Apps not specifically given a priority:
106+
* ZPS_APP - I don't know enough about it to have an opinion
107+
* SIMULATOR_APP - I don't know enough about it to have an opinion
108+
* MAP_REPORT_APP - I don't know enough about it to have an opinion
109+
*
110+
*/
111+
112+
if (prio > meshtastic_Config_ReplayConfig_ReplayPriority_BACKGROUND && p->to != NODENUM_BROADCAST &&
113+
(p->decoded.want_response || p->want_ack))
114+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_HIGH; // Unicast messages that want a response or ack are
115+
// important unless from a backgrounded app
116+
} else {
117+
// Adjustments for packets we can't decrypt
118+
if (p->to != NODENUM_BROADCAST) {
119+
if (p->want_ack)
120+
// ACK-wanted unicast packets are clearly more important
121+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_HIGH;
122+
else
123+
// Assume that unicast stuff has a human who wants it to arrive
124+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_NORMAL;
125+
} else {
126+
if (p->want_ack)
127+
// ACK-wanted broadcasts likely matter, but are untargeted
128+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_NORMAL;
129+
else
130+
// Broadcast and no ACK is fire-and-forget
131+
prio = meshtastic_Config_ReplayConfig_ReplayPriority_LOW;
132+
}
133+
}
134+
135+
return prio;
136+
}
137+
138+
/**
139+
* Learn about a packet if we don't already have an entry for it, and return the entry
140+
*/
141+
ReplayPacket *ReplayModule::learnPacket(PacketHash h, meshtastic_Config_ReplayConfig_ReplayPriority prio)
142+
{
143+
ReplayPacket *r = htFind(h);
144+
if (r)
145+
return r;
146+
147+
off_t idx = next_idx++ & REPLAY_ENTRY_MASK;
148+
recycleSlot(idx);
149+
r = &buffer[idx];
150+
r->hash = h;
151+
r->idx = idx;
152+
r->missed = true;
153+
r->priority = prio;
154+
htInsert(r);
155+
156+
return r;
157+
}
158+
159+
/**
160+
* Update the priority of a packet we have learned about
161+
*/
162+
void ReplayModule::updatePriority(const meshtastic_MeshPacket *p)
163+
{
164+
ReplayPacket *r = htFind(p);
165+
if (r)
166+
r->priority = getPriority(p);
167+
}
168+
169+
/**
170+
* Get a list of dirty entries pending advertisement
171+
*/
172+
size_t ReplayModule::getDirty(ReplayPacket *buf, size_t buf_max, bool fast, bool clear)
173+
{
174+
ReplayPacket *pos = buf;
175+
for (off_t i = meshtastic_Config_ReplayConfig_ReplayPriority_HIGH; i >= config.replay.min_advert_priority; i--) {
176+
for (off_t j = next_idx + REPLAY_ENTRY_MASK; j >= next_idx && static_cast<size_t>(pos - buf) < buf_max; j--) {
177+
uint8_t idx = j & REPLAY_ENTRY_MASK;
178+
if (fast) {
179+
if (dirty_fast.test(idx)) {
180+
*pos++ = buffer[idx];
181+
if (clear)
182+
dirty_fast.reset(idx);
183+
}
184+
} else {
185+
if (dirty.test(idx)) {
186+
*pos++ = buffer[idx];
187+
if (clear) {
188+
dirty.reset(idx);
189+
dirty_fast.reset(idx);
190+
}
191+
}
192+
}
193+
}
194+
}
195+
return pos - buf;
196+
}
197+
198+
/**
199+
* Find a packet in the hash table
200+
*/
201+
ReplayPacket *ReplayModule::htFind(PacketHash hash)
202+
{
203+
off_t bucket = REPLAY_BUCKET(hash);
204+
for (ReplayPacket *r = ht_buckets[bucket]; r; r = r->next) {
205+
if (r->hash == hash)
206+
return r;
207+
}
208+
return NULL;
209+
}
210+
211+
/**
212+
* Insert a packet into the hash table
213+
*/
214+
void ReplayModule::htInsert(ReplayPacket *r)
215+
{
216+
off_t bucket = REPLAY_BUCKET(r->hash);
217+
r->next = ht_buckets[bucket];
218+
ht_buckets[bucket] = r;
219+
}
220+
221+
/**
222+
* Remove a packet from the hash table
223+
*/
224+
void ReplayModule::htRemove(ReplayPacket *r)
225+
{
226+
off_t bucket = REPLAY_BUCKET(r->hash);
227+
for (ReplayPacket **target = &ht_buckets[bucket]; *target; target = &(*target)->next) {
228+
if (*target == r) {
229+
*target = r->next;
230+
r->next = NULL;
231+
return;
232+
}
233+
}
234+
}
235+
236+
/**
237+
* Handle thread notifications
238+
*/
239+
void ReplayModule::onNotify(uint32_t notification)
240+
{
241+
switch ((ReplayNotification)notification) {
242+
case ReplayNotification::INIT:
243+
// Nothing to do yet
244+
break;
245+
case ReplayNotification::TICK:
246+
// Periodic tasks
247+
break;
248+
default:
249+
LOG_WARN("ReplayModule: unknown notification %u", notification);
250+
break;
251+
}
252+
}
253+
254+
/**
255+
* Prune the packet cache if necessary
256+
*/
257+
void ReplayModule::pruneCache()
258+
{
259+
if (cache_bytes <= REPLAY_CACHE_RESERVE)
260+
return; // Cache is already smaller than the reserve
261+
size_t heap_free_bytes = memGet.getFreeHeap();
262+
if (heap_free_bytes >= REPLAY_PRUNE_HEAP_BYTES)
263+
return; // We have enough free heap already
264+
size_t heap_total_bytes = memGet.getHeapSize();
265+
float heap_free_pct = (float)heap_free_bytes / (float)heap_total_bytes;
266+
if (heap_free_pct >= REPLAY_PRUNE_HEAP_PCT)
267+
return; // We have enough free heap already
268+
size_t target_bytes = REPLAY_PRUNE_TARGET;
269+
if (cache_bytes - REPLAY_CACHE_RESERVE < target_bytes)
270+
target_bytes = cache_bytes - REPLAY_CACHE_RESERVE;
271+
unsigned int pruned_packets = 0;
272+
unsigned long pruned_bytes = 0;
273+
for (off_t i = next_idx; i <= next_idx + REPLAY_ENTRY_MASK && pruned_bytes < target_bytes; i++) {
274+
uint8_t idx = i & REPLAY_ENTRY_MASK;
275+
ReplayPacket *entry = &buffer[idx];
276+
if (entry->cache) {
277+
pruned_bytes += sizeof(*entry->cache) + entry->cache->payload_len;
278+
cache_bytes -= sizeof(*entry->cache) + entry->cache->payload_len;
279+
cache_packets--;
280+
packetCache.release(entry->cache);
281+
entry->cache = NULL;
282+
dirty.reset(idx);
283+
dirty_fast.reset(idx);
284+
pruned_packets++;
285+
if (pruned_bytes >= target_bytes)
286+
break;
287+
}
288+
}
289+
routingStats->logEvent(RoutingEvent::REPLAY_CACHE_PACKETS, NULL, cache_packets);
290+
routingStats->logEvent(RoutingEvent::REPLAY_CACHE_BYTES, NULL, cache_bytes);
291+
}
292+
293+
/**
294+
* Recycle a slot in the replay buffer
295+
*/
296+
void ReplayModule::recycleSlot(uint8_t idx)
297+
{
298+
idx &= REPLAY_ENTRY_MASK;
299+
ReplayPacket *entry = &buffer[idx];
300+
dirty.reset(idx);
301+
dirty_fast.reset(idx);
302+
if (entry->cache) {
303+
cache_packets--;
304+
cache_bytes -= sizeof(*entry->cache) + entry->cache->payload_len;
305+
packetCache.release(entry->cache);
306+
}
307+
*entry = {};
308+
}

0 commit comments

Comments
 (0)