-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathMQTTClient.cpp
1842 lines (1621 loc) · 79.8 KB
/
MQTTClient.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// We need our implementation
#include "Network/Clients/MQTT.hpp"
#if MQTTUseAuth == 1
/** Used to track reentrancy in the AUTH recursive scheme */
enum AuthReentrancy
{
FromConnect = 0x80000000,
AuthMask = 0x7FFFFFFF,
};
#endif
#if MQTTOnlyBSDSocket != 1
// We need socket declaration
#include "include/Network/Socket.hpp"
// We need SSL socket declaration too
#include "include/Network/SSLSocket.hpp"
// We need FastLock too
#include "include/Threading/Lock.hpp"
#if MQTTDumpCommunication == 1
// We need hexDump
#include "include/Utils/Dump.hpp"
#endif
#endif
#if MQTTUseTLS == 1
// We need MBedTLS code
// #include <mbedtls/certs.h>
#include <mbedtls/ctr_drbg.h>
#include <mbedtls/entropy.h>
#include <mbedtls/error.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/platform.h>
#include <mbedtls/ssl.h>
#endif
// We need StackHeapBuffer to avoid stressing the heap allocator when it's not required
#include "include/Platform/StackHeapBuffer.hpp"
// This is the maximum allocation that'll be performed on the stack before it's being replaced by heap allocation
// This also means that the stack size for the thread using such function must be larger than this value
#define StackSizeAllocationLimit CONFIG_ESP_EMQTT5_STACK_SIZE
namespace Network { namespace Client {
#pragma pack(push, 1)
/** A packet ID buffer that's allocated on an existing buffer (using variable length array).
This should allow to use a single allocation for the whole lifetime of the client
PacketID are 16 bits but we use 32 bits here because:
1. We use bit 16 for storing the communication direction (1 is for broker to client, 0 for client to broker), since packet ID allocation is independent of direction
2. We use bit 31 for storing the QoS level (0 is for QoS1, 1 for QoS2)
3. We use bit 30 for storing the publish cycle step (0 for non ACKed QoS2, 1 for PUBREC or PUBREL depending on direction)
*/
struct Buffers
{
uint8 * recvBuffer() { return end() * sizeof(uint32) + buffer; }
const uint8 * recvBuffer() const { return end() * sizeof(uint32) + buffer; }
uint8 findID(uint32 ID)
{
for (uint8 i = 0; i < end(); i++)
if ((packetsID()[i] & 0x1FFFF) == ID)
return i;
return maxID;
}
bool clearSetID(uint32 set, uint32 clear = 0)
{
uint8 i = findID(clear);
if (i == end()) return false;
packetsID()[i] = set;
return true;
}
static inline bool isSending(uint32 ID) { return (ID & 0x10000) == 0 && ID; }
static inline bool isQoS1(uint32 ID) { return (ID & 0x80000000) == 0; }
static inline bool isQoS2Step2(uint32 ID) { return (ID & 0x40000000) != 0; }
inline bool storeQoS1ID(uint32 ID) { return clearSetID((uint32)ID, 0); }
inline bool storeQoS2ID(uint32 ID) { return clearSetID((uint32)ID | 0x80000000, 0); }
inline bool avanceQoS2(uint32 ID) { uint8 p = findID(ID); if (p == maxID) return false; packetsID()[p] |= 0x40000000; return true; }
inline bool releaseID(uint32 ID) { return clearSetID(0, (uint32)ID); }
inline uint8 end() const { return maxID; }
inline uint8 packetsCount() const { return maxID / 3; }
inline void reset() { memset(packetsID(), 0, maxID * 3 * sizeof(uint32)); }
inline uint32 packetID(uint8 i) const { return packetsID()[i]; }
inline uint8 countSentID() const {
uint8 count = 0;
for (uint8 i = 0; i < end(); i++)
if ((packetsID()[i] & 0x10000) == 0 && packetsID()[i])
count++;
return count;
}
Buffers(uint32 size, uint32 maxID) : size(size), buffer((uint8*)::calloc(size + maxID * 3 * sizeof(uint32), 1)), maxID((uint8)(maxID * 3)) {}
~Buffers() { ::free(buffer); buffer = 0; size = 0; maxID = 0; }
uint32 size;
private:
uint32 * packetsID() { return (uint32*)buffer; }
const uint32 * packetsID() const { return (const uint32*)buffer; }
uint8 * buffer;
uint8 maxID;
};
#pragma pack(pop)
template <typename T, typename U> constexpr inline U bitU(T value) { return 1 << (U)value; }
template <typename T> constexpr inline uint16 bit(T value) { return 1 << (uint16)value; }
namespace State
{
/** The current connection state */
enum MQTT
{
Unknown = 0,
Connecting = 1,
Authenticating = 2,
Running = 3,
Subscribing = 4,
Unsubscribing = 5,
Pinging = 6,
Disconnecting = 7,
Disconnected = 8,
Count = 9,
};
static constexpr uint16 publishMask = bit(Protocol::MQTT::V5::ControlPacketType::PUBLISH) | bit(Protocol::MQTT::V5::ControlPacketType::PUBACK) | bit(Protocol::MQTT::V5::ControlPacketType::PUBREC) | bit(Protocol::MQTT::V5::ControlPacketType::PUBREL) | bit(Protocol::MQTT::V5::ControlPacketType::PUBCOMP);
static constexpr uint16 releaseIDMask = bit(Protocol::MQTT::V5::ControlPacketType::PUBACK) | bit(Protocol::MQTT::V5::ControlPacketType::PUBCOMP);
static constexpr uint16 releaseBufferMask = bit(Protocol::MQTT::V5::ControlPacketType::PUBACK) | bit(Protocol::MQTT::V5::ControlPacketType::PUBREC);
static constexpr uint16 expectedPacketMask[] =
{
0,
bit(Protocol::MQTT::V5::ControlPacketType::CONNACK) | bit(Protocol::MQTT::V5::ControlPacketType::AUTH),
bit(Protocol::MQTT::V5::ControlPacketType::CONNACK) | bit(Protocol::MQTT::V5::ControlPacketType::AUTH),
publishMask,
bit(Protocol::MQTT::V5::ControlPacketType::SUBACK) | publishMask,
bit(Protocol::MQTT::V5::ControlPacketType::UNSUBACK) | publishMask,
bit(Protocol::MQTT::V5::ControlPacketType::PINGRESP) | publishMask,
bit(Protocol::MQTT::V5::ControlPacketType::DISCONNECT),
0
};
}
static void dumpBufferAsPacket(const char * prompt, const uint8* buffer, uint32 length);
#if MQTTQoSSupportLevel == 1
struct PacketBookmark
{
uint16 ID;
uint32 size;
uint32 pos;
inline void set(uint16 ID, uint32 size, uint32 pos) { this->ID = ID; this->size = size; this->pos = pos; }
PacketBookmark(uint16 ID = 0, uint32 size = 0, uint32 pos = 0) : ID(ID), size(size), pos(pos) {}
};
struct RingBufferStorage::Impl
{
/** Read and write pointer in the ring buffer */
uint32 r, w;
/** Buffer size minus 1 in bytes */
const uint32 sm1;
/** The buffer to write packets into */
uint8 * buffer;
/** The metadata about the packets */
PacketBookmark * packets;
/** Maximum number of packets in the metadata array */
uint8 packetsCount;
/** Find the packet with the given ID */
uint8 findID(uint32 ID)
{
for (uint8 i = 0; i < packetsCount; i++)
if (packets[i].ID == ID)
return i;
return packetsCount;
}
/** Get the consumed size in the buffer */
inline uint32 getSize() const { return w >= r ? w - r : (sm1 - r + w + 1); }
/** Get the available size in the buffer */
inline uint32 freeSize() const { return sm1 - getSize(); }
/** Add a packet to this buffer (no allocation is done at this time) */
bool save(const uint16 packetID, const uint8 * packet, uint32 size)
{
// Check we can fit the packet
if (size > sm1 || freeSize() < size) return false;
// Check if we have a free space for storing the packet's information
uint8 i = findID(0);
if (i == packetsCount) return false;
const uint32 part1 = min(size, sm1 - w + 1);
const uint32 part2 = size - part1;
memcpy((buffer + w), packet, part1);
memcpy((buffer), packet + part1, part2);
packets[i].set(packetID, size, w);
w = (w + size) & sm1;
return true;
}
/** Get a packet from the buffer.
Since this is used to resend packet over a TCP (thus streaming) socket,
we can skip one copy to rebuilt a contiguous packet by simply returning
the two part in the ring buffer and let the application send them successively.
From the receiving side, they'll be received as one contiguous packet. */
bool load(const uint16 packetID, const uint8 *& packetHead, uint32 & sizeHead, const uint8 *& packetTail, uint32 & sizeTail)
{
// Look for the packet
uint8 i = findID(packetID);
if (i == packetsCount) return false;
// Check if the packet is split
packetHead = buffer + packets[i].pos;
sizeHead = min(packets[i].size, (sm1 - packets[i].pos + 1));
sizeTail = packets[i].size - sizeHead;
packetTail = buffer;
return true;
}
/** Remove a packet from the buffer */
bool release(const uint16 packetID)
{
uint8 i = findID(packetID);
if (i == packetsCount) return false;
PacketBookmark & packet = packets[i];
// Here, we have 2 cases. Either the packet is on the read position of the ring buffer
// and in that case, we just need to advance the read position.
// Either it's in the middle of the ring buffer and we need to move all the data around to remove it
// Let's deal with the former case first
uint32 pos = packet.pos, size = packet.size, end = (packet.pos + packet.size) & sm1;
packet.set(0, 0, 0);
if (pos == r)
{
r = (r + size) & sm1;
return true;
}
// Another optimization step is when the write position is at the end of this packet
// We can just revert the storage of the packet directly
if (end == w)
{
w = pos;
return true;
}
// Ok, now we have to move the data around here
// First let's move memory to remove that packet.
// We'll fix the packet's position later on
// We are in this case here:
// bbbbccw r p eaaaaaaaaa with p/e (pos/end) and a / b / c the next packet
// | | | |
// [-------------------------]
// After move, it should look like:
// ccw r aaaaaaaaabbbb with p/e (pos/end) and a / b / c the next packet
// | | | |
// [-------------------------]
// We see they are 3 sections: a is the data between e and the buffer end
// b is the data whose size is equal to buffer end - p - sizeof(a) (the part that was moved from
// the beginning of the buffer to the end of the buffer)
// and c is the part that was move from the end of the packet to the beginning of the buffer
// We have to perform move a to p, move b to buffer.end - a.size and move c to buffer begin
// It's hard to think without unwrapping the buffer, so let's imagine we are doing so (we'll rewrap after discussion)
// Let's set w' = w+sm1+1, e' = e + (sm1 - 1)
// It'll lead to this diagram:
// bbbbccw r p eaaaaaaaaa BBBBCCW with p/e (pos/end) and a / b / c the next packet
// | | | | |
// [-------------------------:-------------------------]
// Or
// eaacccccw r p EAACCCCCW
// | | | | | |
// [-------------------------:-------------------------]
// Or
// r p eaaaaa w
// | | | | |
// [-------------------------:-------------------------]
// In that case, we are doing a single memory move operation here, but we simply wrap the position
uint32 s = sm1 + 1, W = w < pos ? w + s : w, E = end < pos ? end + s : end;
for (uint32 u = 0; u < W - E; u++)
buffer[(u + pos) & sm1] = buffer[(u + pos + size) & sm1];
// Adjust the new write position
w = (W - size) & sm1;
// We can split the packets in 2 cases: before or after the packet to remove.
// We'll iterate each packet and decide if we need to move it (it's after the packet to remove)
// This isn't the most efficient algorithm, but since the number of packets to store is small
// there's no point in optimizing it further
bool continueSearching = true;
while (continueSearching)
{
continueSearching = false;
for (uint8 j = 0; j < packetsCount; j++)
{
PacketBookmark & iter = packets[j];
if (iter.pos == end)
{
end = (iter.pos + iter.size) & sm1;
iter.pos = pos;
pos = (iter.pos + iter.size) & sm1;
continueSearching = true;
break;
}
}
}
return true;
}
#if 0
/** This is only used in the test code to ensure it's working as expected */
bool selfCheck() const
{
for (uint8 j = 0; j < packetsCount; j++)
{
const PacketBookmark & i = packets[j];
if (!i.ID) continue;
const uint32 end = (i.pos + i.size) & sm1;
if (end == w) continue;
// Find if any packet starts with the next slot
bool found = false;
for (uint8 k = 0; k < packetsCount; k++)
{
if (packets[k].pos == end)
{
// Found one
found = true;
break;
}
}
if (!found)
{
fprintf(stderr, "Error while checking packet %u (%u,s:%u), no next packet found and not tail position\n", i.ID, i.pos, i.size);
return false;
}
}
fprintf(stdout, "RB: r(%u) w(%u)\n", r, w);
return true;
}
#endif
Impl(size_t size, uint8 * buffer, uint8 packetsCount, PacketBookmark * packets) : r(0), w(0), sm1(size - 1), buffer(buffer), packets(packets), packetsCount(packetsCount) {}
};
/** Helper function to perform a single allocation for all data so it avoids stressing the allocator */
static RingBufferStorage::Impl * allocImpl(const uint32 bufferSize, const uint32 maxPacketCount)
{
uint8 * p = (uint8*)::calloc(1, sizeof(RingBufferStorage::Impl) + bufferSize + maxPacketCount * sizeof(PacketBookmark));
return new (p) RingBufferStorage::Impl(bufferSize, p + sizeof(RingBufferStorage::Impl), maxPacketCount, (PacketBookmark*)(p + sizeof(RingBufferStorage::Impl) + bufferSize));
}
bool RingBufferStorage::savePacketBuffer(const uint16 packetID, const uint8 * buffer, const uint32 size) { return impl->save(packetID, buffer, size); }
bool RingBufferStorage::releasePacketBuffer(const uint16 packetID) { return impl->release(packetID); }
bool RingBufferStorage::loadPacketBuffer(const uint16 packetID, const uint8 *& bufferHead, uint32 & sizeHead, const uint8 *& bufferTail, uint32 & sizeTail)
{
return impl->load(packetID, bufferHead, sizeHead, bufferTail, sizeTail);
}
/** The ring buffer storage size. Must be a power of 2 */
RingBufferStorage::RingBufferStorage(const size_t bufferSize, const size_t maxPacketCount) : impl(allocImpl(bufferSize, maxPacketCount)) {}
RingBufferStorage::~RingBufferStorage() { ::free0(impl); }
#endif
/** Common base interface that's common to all implementation using CRTP to avoid code duplication */
template <typename Child>
struct ImplBase
{
typedef MQTTv5::ErrorType ErrorType;
/** The DER encoded certificate (if provided) */
const Protocol::MQTT::Common::DynamicBinDataView * brokerCert;
/** This client unique identifier */
Protocol::MQTT::Common::DynamicString clientID;
/** The message received callback to use */
MessageReceived * cb;
/** The last communication time in second */
uint32 lastCommunication;
/** The publish current default identifier allocator */
uint16 publishCurrentId;
/** The keep alive delay in seconds */
uint16 keepAlive;
#if MQTTUseUnsubscribe == 1
/** The last unsubscribe id */
uint16 unsubscribeId;
/** The last unsubscribe error code */
MQTTv5::ErrorType::Type lastUnsubscribeError;
#endif
#if MQTTQoSSupportLevel == 1
/** The storage interface */
PacketStorage * storage;
#endif
/** The reading state. Because data on a TCP stream is
a stream, we have to remember what state we are currently following while parsing data */
enum RecvState
{
Ready = 0,
GotType,
GotLength,
GotCompletePacket,
} recvState;
/** The maximum packet size the server is willing to accept */
uint32 maxPacketSize;
/** The available data in the buffer */
uint32 available;
/** The receiving buffer */
Buffers buffers;
/** The receiving VBInt size for the packet header */
uint8 packetExpectedVBSize;
/** The current MQTT state in the state machine */
State::MQTT state;
uint16 allocatePacketID()
{
return ++publishCurrentId;
}
ImplBase(const char * clientID, MessageReceived * callback, const Protocol::MQTT::Common::DynamicBinDataView * brokerCert, PacketStorage * storage)
: brokerCert(brokerCert), clientID(clientID), cb(callback), lastCommunication(0), publishCurrentId(0), keepAlive(300),
#if MQTTUseUnsubscribe == 1
unsubscribeId(0), lastUnsubscribeError(ErrorType::WaitingForResult),
#endif
#if MQTTQoSSupportLevel == 1
storage(storage),
#endif
recvState(Ready), maxPacketSize(65535), available(0), buffers(max(callback->maxPacketSize(), (uint32)8UL), min(callback->maxUnACKedPackets(), (uint32)127UL)), packetExpectedVBSize(Protocol::MQTT::Common::VBInt(max(callback->maxPacketSize(), (uint32)8UL)).getSize()), state(State::Unknown)
{
#if MQTTQoSSupportLevel == 1
if (!storage) this->storage = new RingBufferStorage(buffers.size, buffers.packetsCount() * 2);
#else
(void)storage; // Prevent variable unused warning
#endif
}
#if MQTTQoSSupportLevel == 1
~ImplBase() { delete0(storage); }
#endif
bool shouldPing()
{
return (((uint32)time(NULL) - lastCommunication) >= keepAlive);
}
void setConnectionState(State::MQTT connState) { state = connState; }
bool hasValidLength() const
{
Protocol::MQTT::Common::VBInt l;
return l.readFrom(buffers.recvBuffer() + 1, available - 1) != Protocol::MQTT::Common::BadData;
}
/** Receive a control packet from the socket in the given time.
@retval positive The number of bytes received
@retval 0 Protocol error, you should close the socket
@retval -1 Socket error
@retval -2 Timeout */
int receiveControlPacket(const bool lowLatency = false)
{
if (!that()->socket) return -1;
// Depending on the current state, we need to fetch as many bytes as possible within the given timeoutMs
// This is a complex problem here because we want both to optimize for
// - latency (returns as fast as possible when we've received a complete packet)
// - blocking time (don't return immediately if the data is currently in transfer, need to wait for it to arrive)
// - minimal syscalls (don't call recv byte per byte as the overhead will be significant)
// - network queue (don't fetch more byte than necessary for getting a single control packet)
// - streaming usage (this can be called while a control packet was being received and we timed out)
// So the algorithm used here depends on the current receiving state
// If we haven't received packet length yet, we have to fetch the header very carefully
// Else, we can enter a more general receiving loop until we have all bytes from the control packet
int ret = 0;
Protocol::MQTT::Common::VBInt len;
#if MQTTLowLatency == 1
// In low latency mode, return as early as possible
if (lowLatency && !this->socket->select(true, false, 0)) return -2;
#endif
// We want to keep track of complete timeout time over multiple operations
auto timeout = that()->getTimeout();
switch (recvState)
{
case Ready:
case GotType:
{ // Here, make sure we only fetch the length first
// The minimal size is 2 bytes for PINGRESP, DISCONNECT and AUTH.
// Because of this, we can't really outsmart the system everytime
ret = that()->recv((char*)&buffers.recvBuffer()[available], 2 - available, timeout);
if (ret > 0) available += ret;
// Deal with timeout first
if (timeout == 0) return -2;
// Deal with socket errors here
if (ret < 0 || available < 2) return -1;
// Depending on the packet type, let's wait for more data
if (buffers.recvBuffer()[0] < 0xD0 || buffers.recvBuffer()[1]) // Below ping response or packet size larger than 2 bytes
{
int querySize = (packetExpectedVBSize + 1) - available;
ret = that()->recv((char*)&buffers.recvBuffer()[available], querySize, timeout);
if (ret > 0) available += ret;
// Deal with timeout first
if (timeout == 0) return -2;
// Deal with socket errors here
if (ret < 0) return ret;
}
recvState = GotLength;
break;
}
default: break;
}
// Here we should either have a valid control packet header
uint32 r = len.readFrom(&buffers.recvBuffer()[1], available - 1);
if (r == Protocol::MQTT::Common::BadData)
return 0; // Close the socket here, the given data are wrong or not the right protocol
if (r == Protocol::MQTT::Common::NotEnoughData)
{
if (available >= (packetExpectedVBSize+1))
{ // The server sends us a packet that's larger than the expected maximum size,
// In MQTTv5 it's a protocol error, so let's disconnect
return 0;
}
// We haven't received enough data in the given timeout to make progress, let's report a timeout
recvState = GotType;
return -2;
}
uint32 remainingLength = len;
uint32 totalPacketSize = remainingLength + 1 + len.getSize();
ret = totalPacketSize == available ? 0 : that()->recv((char*)&buffers.recvBuffer()[available], (totalPacketSize - available), timeout);
if (ret > 0) available += ret;
if (timeout == 0) return -2;
if (ret < 0) return ret;
// Ok, let's check if we have received the complete packet
if (available == totalPacketSize)
{
recvState = GotCompletePacket;
#if MQTTDumpCommunication == 1
dumpBufferAsPacket("< Received packet", buffers.recvBuffer(), available);
#endif
lastCommunication = (uint32)time(NULL);
return (int)available;
}
// No yet, but we probably timed-out.
return -2;
}
/** Get the last received packet type */
Protocol::MQTT::V5::ControlPacketType getLastPacketType() const
{
if (recvState != GotCompletePacket) return Protocol::MQTT::V5::RESERVED;
Protocol::MQTT::V5::FixedHeader header;
header.raw = buffers.recvBuffer()[0];
return (Protocol::MQTT::V5::ControlPacketType)(uint8)header.type;
}
/** Extract a control packet of the given type */
int extractControlPacket(const Protocol::MQTT::V5::ControlPacketType type, Protocol::MQTT::Common::Serializable & packet)
{
if (recvState != GotCompletePacket)
{
int ret = receiveControlPacket();
if (ret <= 0) return ret;
if (recvState != GotCompletePacket)
return -2;
}
// Check the packet is the last expected type
if (getLastPacketType() != type) return -3;
// Seems to be the expected type, let's unserialize it
uint32 r = packet.readFrom(buffers.recvBuffer(), buffers.size);
if (Protocol::MQTT::Common::isError(r)) return -4; // Parsing error
// Done with receiving the packet let's remember it
resetPacketReceivingState();
return (int)r;
}
void resetPacketReceivingState() { recvState = Ready; available = 0; }
inline Child * that() { return static_cast<Child*>(this); }
inline const Child * that() const { return static_cast<const Child*>(this); }
void close(const Protocol::MQTT::V5::ReasonCodes code = Protocol::MQTT::V5::ReasonCodes::UnspecifiedError)
{
delete0(that()->socket);
cb->connectionLost(code);
state = State::Unknown;
}
bool isOpen()
{
return that()->socket != nullptr;
}
int send(const char * buffer, const uint32 length)
{
if (!that()->socket) return -1;
// Prevent mixing sending packet on the wire here, only one thread can send a complete packet at once.
#if MQTTDumpCommunication == 1
dumpBufferAsPacket("> Sending packet", (const uint8*)buffer, length);
#endif
return that()->sendImpl(buffer, length);
}
ErrorType sendAndReceive(const void * buffer, const uint32 packetSize, bool withAnswer)
{
// Make sure we are on a clean receiving state
resetPacketReceivingState();
if (send((const char*)buffer, packetSize) != packetSize)
return ErrorType::NetworkError;
if (!withAnswer) return ErrorType::Success;
// Next, we'll wait for server's CONNACK or AUTH coming here (or error)
int receivedPacketSize = receiveControlPacket();
if (receivedPacketSize <= 0)
{ // This will also comes here
if (receivedPacketSize == 0) close();
return receivedPacketSize == -2 ? ErrorType::TimedOut : ErrorType::NetworkError;
}
return ErrorType::Success;
}
ErrorType prepareSAR(Protocol::MQTT::V5::ControlPacketSerializable & packet, bool withAnswer = true, bool isPublish = false)
{
// Ok, setting are done, let's build this packet now
uint32 packetSize = packet.computePacketSize();
DeclareStackHeapBuffer(buffer, packetSize, StackSizeAllocationLimit);
if (packet.copyInto(buffer) != packetSize)
return ErrorType::UnknownError;
#if MQTTQoSSupportLevel == -1
#else
// Check for saving publish packet if required
if (isPublish)
{
Protocol::MQTT::V5::PublishPacket publish = (Protocol::MQTT::V5::PublishPacket&)packet;
uint8 QoS = publish.header.getQoS();
if (QoS > 0) {
uint16 packetID = publish.fixedVariableHeader.packetID;
#if MQTTQoSSupportLevel == 1
// Save packet
if (!storage->savePacketBuffer(packetID, buffer, packetSize))
return ErrorType::StorageError;
#endif
// Save packet ID too
if ((QoS == 1 && !buffers.storeQoS1ID(packetID)) || (QoS == 2 && !buffers.storeQoS2ID(packetID)))
return ErrorType::StorageError;
}
}
#endif
#if MQTTDumpCommunication == 1
// String out;
// packet.dump(out, 2);
// printf("Prepared:\n%s\n", (const char*)out);
#endif
return sendAndReceive(buffer, packetSize, withAnswer);
}
ErrorType requestOneLoop(Protocol::MQTT::V5::ControlPacketSerializable & packet)
{
ErrorType ret = prepareSAR(packet, true);
if (ret) return ret;
while(true)
{
ret = dealWithNoise();
if (ret != ErrorType::TranscientPacket) break;
// Receive a new packet to continue the loop
int receivedPacketSize = receiveControlPacket();
if (receivedPacketSize <= 0)
{ // This will also comes here
if (receivedPacketSize == 0) close();
return receivedPacketSize == -2 ? ErrorType::TimedOut : ErrorType::NetworkError;
}
}
// Exit the special state if any the reply packet is the one expected
if (ret == ErrorType::Success &&
state > State::Running && state < State::Disconnecting &&
(bit(getLastPacketType()) & State::publishMask) == 0)
state = State::Running;
return ret;
}
/** Deal with answer packet noise here.
This is called after receiving a control packet.
The mask is used to filter the allowed packet types we expect to see.
Typically, depending on the client state, the mask will be either:
* At CONNECT stage: AUTH / CONNACK / DISCONNECT
* At AUTH stage: AUTH / CONNACK / DISCONNECT
* After connected stage: PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP / DISCONNECT
* At PINGREQ state: PINGRESP / PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP / DISCONNECT
* At DISCONNECT state: DISCONNECT
* At SUBSCRIBE state: SUBACK / PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP / DISCONNECT
* At UNSUBSCRIBE state: UNSUBACK / PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP / DISCONNECT
If a packet insn't in the mask, it's a protocol error.
Notice that for running states (not the connect/auth/disconnect), you have to deal with
spurious PUBLISH packets.
Dealing with those packet can be as simple as storing the work to perform later on, and that's what this
method does.
@return ErrorType::Success upon success (a packet for the current state is received)
ErrorType::TranscientPacket upon receiving a OOB publish-like packet (or QoS stuff). In that case, it's a good idea to restart the receive loop and recall the method
any other error upon this error
*/
ErrorType dealWithNoise()
{
Protocol::MQTT::V5::ControlPacketType type = getLastPacketType();
uint16 typeMask = bit(type);
if (type == Protocol::MQTT::V5::DISCONNECT)
{ // Disconnect is a special packet that can happens at any state
Protocol::MQTT::V5::RODisconnectPacket packet;
Protocol::MQTT::V5::ReasonCodes reason = Protocol::MQTT::V5::NormalDisconnection;
int ret = extractControlPacket(type, packet);
if (ret > 0) reason = packet.fixedVariableHeader.reason();
close(reason);
return ErrorType::NotConnected; // No work to perform upon server sending disconnect
}
// Check for unexpected packet
if ((State::expectedPacketMask[state] & typeMask) == 0)
return ErrorType::NetworkError;
// Handle publish packets if needed
if (typeMask & State::publishMask)
{
uint16 packetID = 0;
Protocol::MQTT::V5::ControlPacketType next = Protocol::MQTT::V5::RESERVED;
if (type == Protocol::MQTT::V5::PUBLISH)
{
Protocol::MQTT::V5::ROPublishPacket packet;
int ret = extractControlPacket(type, packet);
if (ret == 0) { close(); return ErrorType::NotConnected; }
if (ret < 0) return ErrorType::NetworkError;
// Call the user as soon as possible to limit latency
// Notice that the user might be PUBLISH'ing here
cb->messageReceived(packet.fixedVariableHeader.topicName, Protocol::MQTT::Common::DynamicBinDataView(packet.payload.size, packet.payload.data), packet.fixedVariableHeader.packetID, packet.props);
// Save the ID if QoS
uint8 QoS = packet.header.getQoS();
if (QoS == 0)
{
// Done with this packet
resetPacketReceivingState();
return ErrorType::TranscientPacket;
}
#if MQTTQoSSupportLevel == -1
return ErrorType::NetworkError;
#else
packetID = packet.fixedVariableHeader.packetID;
bool store = (QoS == 1) ? buffers.storeQoS1ID(packetID | 0x10000) : buffers.storeQoS2ID(packetID | 0x10000);
if (!store) return ErrorType::StorageError;
next = (QoS == 1) ? Protocol::MQTT::V5::ControlPacketType::PUBACK : Protocol::MQTT::V5::ControlPacketType::PUBREC;
#endif
} else
{
#if MQTTQoSSupportLevel == -1
return ErrorType::NetworkError;
#else
Protocol::MQTT::V5::PublishReplyPacket reply(type);
int ret = extractControlPacket(type, reply);
if (ret <= 0) return ErrorType::NetworkError;
packetID = reply.fixedVariableHeader.packetID;
#if MQTTQoSSupportLevel == 1
if (typeMask & State::releaseBufferMask)
{
if (!storage->releasePacketBuffer(packetID)) // They always come from us
return ErrorType::StorageError;
}
#endif
if (typeMask & State::releaseIDMask)
{
if (!buffers.releaseID(packetID)) // They always come from us
return ErrorType::StorageError;
} else
{ // We need to reply to the broker (for example: PUBREL or PUBREC)
next = Protocol::MQTT::Common::Helper::getNextPacketType(type);
}
#endif
}
#if MQTTQoSSupportLevel == -1
#else
if (next != Protocol::MQTT::V5::RESERVED)
{ // We need to reply to the broker
// Send the answer
Protocol::MQTT::V5::PublishReplyPacket answer(next);
answer.fixedVariableHeader.packetID = packetID;
next = Protocol::MQTT::Common::Helper::getNextPacketType(next);
if (ErrorType err = prepareSAR(answer, false))
return err;
// Check we need to advance the QoS2 processing now
if (type == Protocol::MQTT::V5::PUBREC && !buffers.avanceQoS2(packetID))
return ErrorType::StorageError;
// Or remove the ID for if there's no next packet to send (ACK or REL)
else if (next == Protocol::MQTT::V5::RESERVED && !buffers.releaseID(packetID | 0x10000))
return ErrorType::StorageError;
}
resetPacketReceivingState();
return ErrorType::TranscientPacket;
#endif
}
// Ok, done
return ErrorType::Success;
}
#if MQTTUseAuth == 1
ErrorType handleAuth()
{
Protocol::MQTT::V5::ROAuthPacket packet;
int ret = extractControlPacket(Protocol::MQTT::V5::AUTH, packet);
if (ret > 0)
{
// Parse the Auth packet and call the user method
// Try to find the auth method, and the auth data
DynamicStringView authMethod;
DynamicBinDataView authData;
Protocol::MQTT::V5::VisitorVariant visitor;
while (packet.props.getProperty(visitor) && (authMethod.length == 0 || authData.length == 0))
{
if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationMethod)
{
auto view = visitor.as< DynamicStringView >();
authMethod = *view;
}
else if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationData)
{
auto data = visitor.as< DynamicBinDataView >();
authData = *data;
}
}
return cb->authReceived(packet.fixedVariableHeader.reason(), authMethod, authData, packet.props) ? MQTTv5::ErrorType::Success : MQTTv5::ErrorType::NetworkError;
}
return ErrorType::NetworkError;
}
#endif
ErrorType handleConnACK()
{
// Parse the ConnACK packet;
Protocol::MQTT::V5::ROConnACKPacket packet;
int ret = extractControlPacket(Protocol::MQTT::V5::CONNACK, packet);
if (ret > 0)
{
// We are only interested in the result of the connection
if (packet.fixedVariableHeader.acknowledgeFlag & 1)
{ // Session is present on the server. For now, we don't care, do we ?
}
if (packet.fixedVariableHeader.reasonCode != 0
#if MQTTUseAuth == 1
&& packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::NotAuthorized
&& packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::BadAuthenticationMethod
#endif
)
{
// We have failed connection with the following reason:
return (MQTTv5::ReasonCodes)packet.fixedVariableHeader.reasonCode;
}
// Now, we are going to parse the other properties
#if MQTTUseAuth == 1
DynamicStringView authMethod;
DynamicBinDataView authData;
#endif
Protocol::MQTT::V5::VisitorVariant visitor;
while (packet.props.getProperty(visitor))
{
switch (visitor.propertyType())
{
case Protocol::MQTT::V5::PacketSizeMax:
{
auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor<uint32> >();
maxPacketSize = pod->getValue();
break;
}
case Protocol::MQTT::V5::AssignedClientID:
{
auto view = visitor.as< Protocol::MQTT::V5::DynamicStringView >();
clientID.from(view->data, view->length); // This allocates memory for holding the copy
break;
}
case Protocol::MQTT::V5::ServerKeepAlive:
{
auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor<uint16> >();
keepAlive = (pod->getValue() + (pod->getValue()>>1)) >> 1; // Use 0.75 of the server's told value
break;
}
#if MQTTUseAuth == 1
case Protocol::MQTT::V5::AuthenticationMethod:
{
auto view = visitor.as<DynamicStringView>();
authMethod = *view;
} break;
case Protocol::MQTT::V5::AuthenticationData:
{
auto data = visitor.as<DynamicBinDataView>();
authData = *data;
} break;
#endif
// Actually, we don't care about other properties. Maybe we should ?
default: break;
}
}
#if MQTTUseAuth == 1
if (packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::NotAuthorized
|| packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::BadAuthenticationMethod)
{ // Let the user be aware of the required authentication properties so next connect will/can contains them
cb->authReceived((ReasonCodes)packet.fixedVariableHeader.reasonCode, authMethod, authData, packet.props);
return ErrorType::NetworkError; // Force close the connection as per 4.12.0-1
}
#endif
// Ok, the connection was accepted (and authentication cleared).
state = State::Running;
#if MQTTQoSSupportLevel == 1
// Check if we need to resend some unACK'ed packets
uint8 resend = buffers.countSentID();
if (resend)
{
// Loop over the buffers and resend them
uint8 i = 0;
while (true)
{
uint32 packetID = buffers.packetID(i);
if (buffers.isSending(packetID))
{
if (buffers.isQoS2Step2(packetID))
{
// We've already received the PUBREC packet so ownership is on the broker.
// We need to resend the PUBREL packet here
Protocol::MQTT::V5::PublishReplyPacket answer(Protocol::MQTT::V5::PUBREL);
answer.fixedVariableHeader.packetID = (uint16)(packetID & 0xFFFF);
if (ErrorType err = prepareSAR(answer, true))
return err;
} else
{
// No PUBACK or no PUBREC received, we need to resend the packet
uint16 id = packetID & 0xFFFF;
const uint8 * packetH = 0, * packetT = 0; uint32 sizeH = 0, sizeT = 0;
if (!storage->loadPacketBuffer(id, packetH, sizeH, packetT, sizeT))
return ErrorType::StorageError;
// Send the packet and run the event loop to purge the acknowledgement.
ErrorType ret = ErrorType::Success;
if ((ret = sendAndReceive(packetH, sizeH, sizeT == 0)))
return ret;
if (sizeT && (ret = sendAndReceive(packetT, sizeT, true)))
return ret;
}
// As per 4.9 flow control, we can't send all other packet without processing the
// QoS dance. At this step of communication, we can't receive any PUBLISH packet since
// we haven't subscribed yet. We can only receive DISCONNECT & QoS packets here
if (ErrorType ret = dealWithNoise())
return ret;
}
if (i == buffers.end()) break;
i++; // This works because the buffers is a ring buffer, so when an ID is released in the dealWithNoise() above, the
// position of the next ID don't move. At this step, any new ID will be processed later on.
}
}
#else
buffers.reset();
#endif
return ErrorType::Success;
}
return Protocol::MQTT::V5::ProtocolError;
}
};
#if MQTTOnlyBSDSocket != 1
/* The socket class we are using for socket operations.
There's a default implementation for Berkeley socket and (Open)SSL socket in the ClassPath, but
you can implement any library you want, like, for example, lwIP, so change this if you do */
typedef Network::Socket::BerkeleySocket Socket;
#if MQTTUseTLS == 1
/* The SSL socket we are using (when using SSL/TLS connection).
There's a default implementation for (Open/Libre)SSL socket in ClassPath, but you can implement
one class with, for example, MBEDTLS here if you want. Change this if you do */
typedef Network::Socket::SSL_TLS SSLSocket;
/** The SSL context to (re)use. If you need to skip negotiating, you'll need to modify this context */
typedef SSLSocket::SSLContext SSLContext;
#endif
#if MQTTDumpCommunication == 1
static void dumpBufferAsPacket(const char * prompt, const uint8* buffer, uint32 length)
{