3
3
4
4
#include " PubSubFeature.h"
5
5
#include " ../../logging/LoggerFactory.h"
6
+ #include " ../../util/FileUtils.h"
6
7
7
8
#include < aws/common/byte_buf.h>
8
9
#include < aws/crt/Api.h>
9
10
#include < aws/iotdevicecommon/IotDevice.h>
10
11
#include < iostream>
12
+ #include < sys/stat.h>
11
13
12
14
#include < utility>
13
15
@@ -22,6 +24,8 @@ using namespace Aws::Iot::DeviceClient::Util;
22
24
using namespace Aws ::Iot::DeviceClient::Logging;
23
25
24
26
constexpr char PubSubFeature::TAG[];
27
+ constexpr char PubSubFeature::DEFAULT_PUBLISH_FILE[];
28
+ constexpr char PubSubFeature::DEFAULT_SUBSCRIBE_FILE[];
25
29
26
30
#define MAX_IOT_CORE_MQTT_MESSAGE_SIZE_BYTES 128000
27
31
@@ -30,6 +34,61 @@ string PubSubFeature::getName()
30
34
return " Pub Sub Sample" ;
31
35
}
32
36
37
+ bool PubSubFeature::createPubSub (const PlainConfig &config, std::string filePath)
38
+ {
39
+ std::string pubSubFileDir = FileUtils::ExtractParentDirectory (filePath);
40
+ LOGM_INFO (TAG, " Creating Pub/Sub file: %s" , filePath.c_str ());
41
+ if (!FileUtils::DirectoryExists (pubSubFileDir))
42
+ {
43
+ // Create an empty directory with the expected permissions.
44
+ if (!FileUtils::CreateDirectoryWithPermissions (pubSubFileDir.c_str (), S_IRWXU | S_IRGRP | S_IROTH | S_IXOTH))
45
+ {
46
+ return false ;
47
+ }
48
+ }
49
+ else
50
+ {
51
+ // Verify the directory permissions.
52
+ auto rcvDirPermissions = FileUtils::GetFilePermissions (pubSubFileDir);
53
+ if (Permissions::PUBSUB_DIR != rcvDirPermissions)
54
+ {
55
+ LOGM_ERROR (
56
+ TAG,
57
+ " Incorrect directory permissions for pubsub file: %s expected: %d received: %d" ,
58
+ Sanitize (pubSubFileDir).c_str (),
59
+ Permissions::PUBSUB_DIR,
60
+ rcvDirPermissions);
61
+ return false ;
62
+ }
63
+ }
64
+
65
+ if (!FileUtils::FileExists (filePath))
66
+ {
67
+ // Create an empty file with the expected permissions.
68
+ if (!FileUtils::CreateEmptyFileWithPermissions (filePath, S_IRUSR | S_IWUSR))
69
+ {
70
+ return false ;
71
+ }
72
+ }
73
+ else
74
+ {
75
+ // Verify the file permissions.
76
+ auto rcvFilePermissions = FileUtils::GetFilePermissions (filePath);
77
+ if (Permissions::PUB_SUB_FILES != rcvFilePermissions)
78
+ {
79
+ LOGM_ERROR (
80
+ TAG,
81
+ " Incorrect file permissions for pubsub file: %s expected: %d received: %d" ,
82
+ Sanitize (filePath).c_str (),
83
+ Permissions::PUB_SUB_FILES,
84
+ rcvFilePermissions);
85
+ return false ;
86
+ }
87
+ }
88
+
89
+ return true ;
90
+ }
91
+
33
92
int PubSubFeature::init (
34
93
shared_ptr<SharedCrtResourceManager> manager,
35
94
shared_ptr<ClientBaseNotifier> notifier,
@@ -40,8 +99,28 @@ int PubSubFeature::init(
40
99
thingName = *config.thingName ;
41
100
pubTopic = config.pubSub .publishTopic .value ();
42
101
subTopic = config.pubSub .subscribeTopic .value ();
43
- pubFile = config.pubSub .publishFile .has_value () ? config.pubSub .publishFile .value () : " " ;
44
- subFile = config.pubSub .subscribeFile .has_value () ? config.pubSub .subscribeFile .value () : " " ;
102
+
103
+ if (config.pubSub .publishFile .has_value () && !config.pubSub .publishFile ->empty ())
104
+ {
105
+ pubFile = config.pubSub .publishFile ->c_str ();
106
+ }
107
+ pubFile = FileUtils::ExtractExpandedPath (pubFile);
108
+
109
+ if (!createPubSub (config, pubFile))
110
+ {
111
+ LOG_ERROR (TAG, " Failed to create publish directory or file" );
112
+ }
113
+
114
+ if (config.pubSub .subscribeFile .has_value () && !config.pubSub .subscribeFile ->empty ())
115
+ {
116
+ subFile = config.pubSub .subscribeFile ->c_str ();
117
+ }
118
+ subFile = FileUtils::ExtractExpandedPath (subFile);
119
+
120
+ if (!createPubSub (config, subFile))
121
+ {
122
+ LOG_ERROR (TAG, " Failed to create subscribe directory or file" );
123
+ }
45
124
46
125
return AWS_OP_SUCCESS;
47
126
}
@@ -52,7 +131,7 @@ int PubSubFeature::getPublishFileData(aws_byte_buf *buf)
52
131
if (publishFileSize > MAX_IOT_CORE_MQTT_MESSAGE_SIZE_BYTES)
53
132
{
54
133
LOGM_ERROR (
55
- TAG, " Publish file too large: %zu > %i bytes" , publishFileSize, MAX_IOT_CORE_MQTT_MESSAGE_SIZE_BYTES);
134
+ TAG, " Publish file too large: %zu > %d bytes" , publishFileSize, MAX_IOT_CORE_MQTT_MESSAGE_SIZE_BYTES);
56
135
return AWS_OP_ERR;
57
136
}
58
137
if (publishFileSize == 0 )
@@ -84,7 +163,7 @@ void PubSubFeature::publishFileData()
84
163
return ;
85
164
}
86
165
auto onPublishComplete = [payload, this ](Mqtt::MqttConnection &, uint16_t packetId, int errorCode) mutable {
87
- LOGM_DEBUG (TAG, " PublishCompAck: PacketId:(%u ), ErrorCode:%i " , getName ().c_str (), errorCode);
166
+ LOGM_DEBUG (TAG, " PublishCompAck: PacketId:(%s ), ErrorCode:%d " , getName ().c_str (), errorCode);
88
167
aws_byte_buf_clean_up_secure (&payload);
89
168
};
90
169
resourceManager->getConnection ()->Publish (
@@ -97,11 +176,11 @@ int PubSubFeature::start()
97
176
98
177
auto onSubAck =
99
178
[&](MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode) -> void {
100
- LOGM_DEBUG (TAG, " SubAck: PacketId:(%u ), ErrorCode:%i " , getName ().c_str (), errorCode);
179
+ LOGM_DEBUG (TAG, " SubAck: PacketId:(%s ), ErrorCode:%d " , getName ().c_str (), errorCode);
101
180
};
102
181
auto onRecvData = [&](MqttConnection &connection, const String &topic, const ByteBuf &payload) -> void {
103
182
LOGM_DEBUG (TAG, " Message received on subscribe topic, size: %zu bytes" , payload.len );
104
- if (string ((char *)payload.buffer ) == PUBLISH_TRIGGER_PAYLOAD)
183
+ if (string ((char *)payload.buffer , payload. len ) == PUBLISH_TRIGGER_PAYLOAD)
105
184
{
106
185
publishFileData ();
107
186
}
@@ -127,7 +206,7 @@ int PubSubFeature::start()
127
206
int PubSubFeature::stop ()
128
207
{
129
208
auto onUnsubscribe = [&](MqttConnection &connection, uint16_t packetId, int errorCode) -> void {
130
- LOGM_DEBUG (TAG, " Unsubscribing: PacketId:%u, ErrorCode:%i " , packetId, errorCode);
209
+ LOGM_DEBUG (TAG, " Unsubscribing: PacketId:%u, ErrorCode:%d " , packetId, errorCode);
131
210
};
132
211
133
212
resourceManager->getConnection ()->Unsubscribe (subTopic.c_str (), onUnsubscribe);
0 commit comments