DUPLICATE_CLIENTID disconnect error #495
-
Hi All, I am getting error "disconnectReason": "DUPLICATE_CLIENTID" each time I publish a message to the topic. The lifecycle events goes as below :
The issue I think is as soon as the payload is published, something triggers a connect from code and gets error DUPLICATE_CLIENTID and the return payload is not received on the subscribed, as all these happens in the same window. Can someone look into this issue ? Also the disconnect function is not working from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import sys
import threading
import time
import json
import os
from starter_app.utils import Constants
class AWSIoT:
def __init__(self, hole_number):
self.target_ep = Constants.TARGET_EP
self.cert_filepath = Constants.CERT_FILEPATH
self.private_key_filepath = Constants.PRIVATE_KEY_FILEPATH
self.ca_filepath = Constants.CA_FILEPATH
self.hole_number = str(hole_number)
laser_topic = Constants.TOPICS.get(self.hole_number)
self.thing_name = laser_topic.get("thing_name")
self.pub_topic = laser_topic.get("pub_topic")
self.sub_topic = laser_topic.get("sub_topic")
# Callback when connection is accidentally lost.
def on_connection_interrupted(self, connection, error, **kwargs):
print("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(self, connection, return_code, session_present, **kwargs):
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(self.on_resubscribe_complete)
def on_resubscribe_complete(self, resubscribe_future):
resubscribe_results = resubscribe_future.result()
print("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(self, topic, payload, dup, qos, retain, **kwargs):
print("Received message from topic '{}': {}".format(topic, payload))
# Callback when the connection successfully connects
def on_connection_success(self, connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print("Connection Successful with return code: {} session present: {}".format(
callback_data.return_code,
callback_data.session_present)
)
# Callback when a connection attempt fails
def on_connection_failure(self, connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailuredata)
print("Connection failed with error code: {}".format(callback_data.error))
# Callback when a connection has been disconnected or shutdown successfully
def on_connection_closed(self, connection, callback_data):
print("Connection closed")
def call_iot(self, laser_payload):
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
proxy_options = None
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=self.target_ep,
port=8883,
cert_filepath=self.cert_filepath,
pri_key_filepath=self.private_key_filepath,
client_bootstrap=client_bootstrap,
ca_filepath=self.ca_filepath,
on_connection_interrupted=self.on_connection_interrupted,
on_connection_resumed=self.on_connection_resumed,
client_id=self.thing_name,
clean_session=True,
keep_alive_secs=5,
http_proxy_options=proxy_options,
on_connection_success=self.on_connection_success,
on_connection_failure=self.on_connection_failure,
on_connection_closed=self.on_connection_closed,
)
print("Connecting to {} with client ID '{}'...".format(
self.target_ep, self.thing_name))
# import pdb; pdb.set_trace()
# Connect to the gateway
while True:
try:
connect_future = mqtt_connection.connect()
print("connect_future called..")
# Future.result() waits until a result is available
connect_future.result()
except:
print("Connection to IoT Core failed... retrying in 5s.")
time.sleep(5)
continue
else:
print("Connected!")
break
# Subscribe
print("Un Subscribing to topic " + self.sub_topic)
unsubscribe_future = mqtt_connection.unsubscribe(
topic=self.sub_topic,
)
print("unsubscribe_future")
# Subscribe
print("Subscribing to topic " + self.sub_topic)
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=self.sub_topic,
qos=mqtt.QoS.AT_MOST_ONCE,
callback=self.on_message_received,
)
subscribe_result = subscribe_future.result()
print("Subscribed with {}".format(str(subscribe_result['qos'])))
print("Packet id for subscribe: {}".format(packet_id) )
print ('Publishing message on topic {}'.format(self.pub_topic))
## For testing from local
laser_payload = {
"TransactionType": "point",
"Hole": 1,
"Tee": 1,
"Laser": {
"Mode": 1,
"LX": 20,
"LY": 0,
"LZ": 0,
"LH": 192,
"LXA": 0,
"LYA": 0
},
"Ball": {
"BX": 143,
"BY": 110,
"BZ": 0
}
}
# Check on clearing the message queue
# Send Message to the PW Admin if hole doesn't rrespond
# This particular hole is down (Course Manager Email)
"""
- QoS - 0 - At most once
- QoS - 1 - At least once
- QoS - 2 - Exactly once
"""
print("QOS {}".format(mqtt.QoS.AT_MOST_ONCE)) ## Should be 0
message_json = json.dumps(laser_payload)
print(message_json)
mqtt_connection.publish(
topic=self.pub_topic,
payload=message_json,
qos=mqtt.QoS.AT_MOST_ONCE,
)
time.sleep(2)
# Disconnect
print("Disconnecting.....")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Sleep 5 second")
time.sleep(5)
print("Disconnected") |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 29 replies
-
@github-actions proposed-answer connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!") Let me know if you are having any connection issues after changing this. |
Beta Was this translation helpful? Give feedback.
Sorry, I didn't notice you posted your policy already. No, that is not the issue then. The
iot:*
just limits the policy to all iot actions while*
allows all actions including the IoT actions.Can you try running the sample with a different clientID? As @MikeDombo said: