@@ -12,7 +12,8 @@ PubSubClient::PubSubClient(Client& c) :
1212 _client(c),
1313 _parser(c),
1414 _max_retries(10 ),
15- isSubAckFound(false )
15+ isSubAckFound(false ),
16+ _getCurrentTime(millis)
1617{}
1718
1819PubSubClient::PubSubClient (Client& c, IPAddress &ip, uint16_t port) :
@@ -22,7 +23,8 @@ PubSubClient::PubSubClient(Client& c, IPAddress &ip, uint16_t port) :
2223 _max_retries(10 ),
2324 isSubAckFound(false ),
2425 server_ip(ip),
25- server_port(port)
26+ server_port(port),
27+ _getCurrentTime(millis)
2628{}
2729
2830PubSubClient::PubSubClient (Client& c, String hostname, uint16_t port) :
@@ -32,7 +34,8 @@ PubSubClient::PubSubClient(Client& c, String hostname, uint16_t port) :
3234 _max_retries(10 ),
3335 isSubAckFound(false ),
3436 server_port(port),
35- server_hostname(hostname)
37+ server_hostname(hostname),
38+ _getCurrentTime(millis)
3639{}
3740
3841PubSubClient& PubSubClient::set_server (IPAddress &ip, uint16_t port) {
@@ -51,7 +54,7 @@ PubSubClient& PubSubClient::set_server(String hostname, uint16_t port) {
5154MQTT::Message* PubSubClient::_recv_message (void ) {
5255 MQTT::Message *msg = _parser.parse ();
5356 if (msg != nullptr )
54- lastInActivity = millis ();
57+ lastInActivity = _getCurrentTime ();
5558 return msg;
5659}
5760
@@ -68,7 +71,7 @@ bool PubSubClient::_send_message(MQTT::Message& msg) {
6871 }
6972 return false ;
7073 }
71- lastOutActivity = millis ();
74+ lastOutActivity = _getCurrentTime ();
7275
7376 return true ;
7477}
@@ -86,7 +89,7 @@ MQTT::Message* PubSubClient::_send_message_with_response(MQTT::Message& msg) {
8689 }
8790 return nullptr ;
8891 }
89- lastOutActivity = millis ();
92+ lastOutActivity = _getCurrentTime ();
9093
9194 MQTT::Message *response = _wait_for (msg.response_type (), msg.packet_id ());
9295 if (response == nullptr ) {
@@ -145,12 +148,12 @@ void PubSubClient::_process_message(MQTT::Message* msg) {
145148
146149MQTT::Message* PubSubClient::_wait_for (MQTT::message_type wait_type, uint16_t wait_pid) {
147150 while (!_client.available ()) {
148- if (millis () - lastInActivity > keepalive * 1000UL )
151+ if (_getCurrentTime () - lastInActivity > keepalive * 1000UL )
149152 return nullptr ;
150153 yield ();
151154 }
152155
153- while (millis () < lastInActivity + (keepalive * 1000 )) {
156+ while (_getCurrentTime () < lastInActivity + (keepalive * 1000 )) {
154157 // Read the packet and check it
155158 MQTT::Message *msg = _recv_message ();
156159 if (msg != nullptr ) {
@@ -213,7 +216,7 @@ bool PubSubClient::connect(MQTT::Connect &conn) {
213216
214217 pingOutstanding = false ;
215218 nextMsgId = 1 ; // Init the next packet id
216- lastInActivity = millis (); // Init this so that _wait_for() doesn't think we've already timed-out
219+ lastInActivity = _getCurrentTime (); // Init this so that _wait_for() doesn't think we've already timed-out
217220 keepalive = conn.keepalive (); // Store the keepalive period from this connection
218221
219222 MQTT::Message *response = _send_message_with_response (conn);
@@ -239,7 +242,7 @@ bool PubSubClient::loop() {
239242 if (!connected ())
240243 return false ;
241244
242- unsigned long t = millis ();
245+ unsigned long t = _getCurrentTime ();
243246 if ((t - lastInActivity > keepalive * 1000UL ) || (t - lastOutActivity > keepalive * 1000UL )) {
244247 if (pingOutstanding) {
245248 _client.stop ();
@@ -396,3 +399,8 @@ bool PubSubClient::connected() {
396399
397400 return rc;
398401}
402+
403+ void PubSubClient::setTimeGetter (TimeGetter getter)
404+ {
405+ _getCurrentTime = getter;
406+ }
0 commit comments