Skip to content

Commit 82bb6d4

Browse files
committed
Moving from vector to list to minimize data movements when buffering AMSMessage
Signed-off-by: Loic Pottier <[email protected]>
1 parent 9f8e96b commit 82bb6d4

File tree

2 files changed

+21
-39
lines changed

2 files changed

+21
-39
lines changed

src/AMSlib/wf/basedb.hpp

+11-10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ namespace fs = std::experimental::filesystem;
6565
#include <chrono>
6666
#include <deque>
6767
#include <future>
68+
#include <list>
6869
#include <random>
6970
#include <thread>
7071
#include <tuple>
@@ -847,7 +848,7 @@ class AMSMessage
847848

848849
AMSMessage(const AMSMessage& other)
849850
{
850-
DBG(AMSMessage, "Copy AMSMessage : %p -- %d", other._data, other._id);
851+
DBG(AMSMessage, "Copy AMSMessage (%d, %p) <- (%d, %p)", _id, _data, other._id, other._data);
851852
swap(other);
852853
};
853854

@@ -863,7 +864,7 @@ class AMSMessage
863864

864865
AMSMessage& operator=(AMSMessage&& other) noexcept
865866
{
866-
// DBG(AMSMessage, "Move AMSMessage : %p -- %d", other._data, other._id);
867+
DBG(AMSMessage, "Move AMSMessage (%d, %p) <- (%d, %p)", _id, _data, other._id, other._data);
867868
if (this != &other) {
868869
swap(other);
869870
other._data = nullptr;
@@ -1325,7 +1326,7 @@ class RMQPublisherHandler final : public RMQHandler
13251326
/** @brief Mutex to protect multithread accesses to _messages */
13261327
std::mutex _mutex;
13271328
/** @brief Messages that have not been successfully acknowledged */
1328-
std::vector<AMSMessage> _messages;
1329+
std::list<AMSMessage> _messages;
13291330

13301331
public:
13311332
/**
@@ -1351,7 +1352,7 @@ class RMQPublisherHandler final : public RMQHandler
13511352
* @brief Return the messages that have NOT been acknowledged by the RabbitMQ server.
13521353
* @return A vector of AMSMessage
13531354
*/
1354-
std::vector<AMSMessage>& msgBuffer();
1355+
std::list<AMSMessage>& msgBuffer();
13551356

13561357
/**
13571358
* @brief Free AMSMessages held by the handler
@@ -1395,13 +1396,13 @@ class RMQPublisherHandler final : public RMQHandler
13951396
* @param[in] addr Address of memory to free.
13961397
* @param[in] buffer The vector containing memory buffers
13971398
*/
1398-
void freeMessage(int msg_id, std::vector<AMSMessage>& buf);
1399+
void freeMessage(int msg_id, std::list<AMSMessage>& buffer);
13991400

14001401
/**
14011402
* @brief Free the data pointed by each pointer in a vector.
14021403
* @param[in] buffer The vector containing memory buffers
14031404
*/
1404-
void freeAllMessages(std::vector<AMSMessage>& buffer);
1405+
void freeAllMessages(std::list<AMSMessage>& buffer);
14051406

14061407
}; // class RMQPublisherHandler
14071408

@@ -1428,7 +1429,7 @@ class RMQPublisher
14281429
/** @brief The handler which contains various callbacks for the sender */
14291430
std::shared_ptr<RMQPublisherHandler> _handler;
14301431
/** @brief Buffer holding unacknowledged messages in case of crash */
1431-
std::vector<AMSMessage> _buffer_msg;
1432+
std::list<AMSMessage> _buffer_msg;
14321433

14331434
public:
14341435
RMQPublisher(const RMQPublisher&) = delete;
@@ -1439,13 +1440,13 @@ class RMQPublisher
14391440
const AMQP::Address& address,
14401441
std::string cacert,
14411442
std::string queue,
1442-
std::vector<AMSMessage>&& msgs_to_send = std::vector<AMSMessage>());
1443+
std::list<AMSMessage>&& msgs_to_send = std::list<AMSMessage>());
14431444

14441445
/**
14451446
* @brief Check if the underlying RabbitMQ connection is ready and usable
14461447
* @return True if the publisher is ready to publish
14471448
*/
1448-
bool ready_publish();
1449+
bool readyPublish();
14491450

14501451
/**
14511452
* @brief Wait that the connection is ready (blocking call)
@@ -1481,7 +1482,7 @@ class RMQPublisher
14811482
* acknowledgements have not arrived yet.
14821483
* @return A vector of AMSMessage
14831484
*/
1484-
std::vector<AMSMessage>& getMsgBuffer();
1485+
std::list<AMSMessage>& getMsgBuffer();
14851486

14861487
/**
14871488
* @brief Total number of messages successfully acknowledged

src/AMSlib/wf/rmqdb.cpp

+10-29
Original file line numberDiff line numberDiff line change
@@ -634,33 +634,14 @@ RMQPublisherHandler::RMQPublisherHandler(
634634
{
635635
}
636636

637-
/**
638-
* @brief Return the messages that have NOT been acknowledged by the RabbitMQ server.
639-
* @return A vector of AMSMessage
640-
*/
641-
std::vector<AMSMessage>& RMQPublisherHandler::msgBuffer() { return _messages; }
637+
std::list<AMSMessage>& RMQPublisherHandler::msgBuffer() { return _messages; }
642638

643-
/**
644-
* @brief Free AMSMessages held by the handler
645-
*/
646639
void RMQPublisherHandler::cleanup() { freeAllMessages(_messages); }
647640

648-
/**
649-
* @brief Total number of messages sent
650-
* @return Number of messages
651-
*/
652641
int RMQPublisherHandler::msgSent() const { return _nb_msg; }
653642

654-
/**
655-
* @brief Total number of messages successfully acknowledged
656-
* @return Number of messages
657-
*/
658643
int RMQPublisherHandler::msgAcknowledged() const { return _nb_msg_ack; }
659644

660-
/**
661-
* @brief Total number of messages unacknowledged
662-
* @return Number of messages unacknowledged
663-
*/
664645
unsigned RMQPublisherHandler::unacknowledged() const
665646
{
666647
return _rchannel->unacknowledged();
@@ -770,26 +751,26 @@ void RMQPublisherHandler::onReady(AMQP::TcpConnection* connection)
770751
});
771752
}
772753

773-
void RMQPublisherHandler::freeMessage(int msg_id, std::vector<AMSMessage>& buf)
754+
void RMQPublisherHandler::freeMessage(int msg_id, std::list<AMSMessage>& buffer)
774755
{
775756
const std::lock_guard<std::mutex> lock(_mutex);
776757
auto it =
777-
std::find_if(buf.begin(), buf.end(), [&msg_id](const AMSMessage& obj) {
758+
std::find_if(buffer.begin(), buffer.end(), [&msg_id](const AMSMessage& obj) {
778759
return obj.id() == msg_id;
779760
});
780761
CFATAL(RMQPublisherHandler,
781-
it == buf.end(),
762+
it == buffer.end(),
782763
"Failed to deallocate msg #%d: not found",
783764
msg_id)
784765
auto& msg = *it;
785766
auto& rm = ams::ResourceManager::getInstance();
786767
rm.deallocate(msg.data(), AMSResourceType::AMS_HOST);
787768

788769
DBG(RMQPublisherHandler, "Deallocated msg #%d (%p)", msg.id(), msg.data())
789-
buf.erase(it);
770+
buffer.erase(it);
790771
}
791772

792-
void RMQPublisherHandler::freeAllMessages(std::vector<AMSMessage>& buffer)
773+
void RMQPublisherHandler::freeAllMessages(std::list<AMSMessage>& buffer)
793774
{
794775
const std::lock_guard<std::mutex> lock(_mutex);
795776
auto& rm = ams::ResourceManager::getInstance();
@@ -822,7 +803,7 @@ RMQPublisher::RMQPublisher(uint64_t rId,
822803
const AMQP::Address& address,
823804
std::string cacert,
824805
std::string queue,
825-
std::vector<AMSMessage>&& msgs_to_send)
806+
std::list<AMSMessage>&& msgs_to_send)
826807
: _rId(rId),
827808
_queue(queue),
828809
_cacert(cacert),
@@ -880,7 +861,7 @@ void RMQPublisher::publish(AMSMessage&& message)
880861
_handler->publish(std::move(message));
881862
}
882863

883-
bool RMQPublisher::ready_publish()
864+
bool RMQPublisher::readyPublish()
884865
{
885866
return _connection->ready() && _connection->usable();
886867
}
@@ -901,7 +882,7 @@ void RMQPublisher::stop() { event_base_loopexit(_loop.get(), NULL); }
901882

902883
bool RMQPublisher::connectionValid() { return _handler->connectionValid(); }
903884

904-
std::vector<AMSMessage>& RMQPublisher::getMsgBuffer()
885+
std::list<AMSMessage>& RMQPublisher::getMsgBuffer()
905886
{
906887
return _handler->msgBuffer();
907888
}
@@ -991,7 +972,7 @@ void RMQInterface::restartPublisher()
991972
if (_publisher->connectionValid()) return;
992973

993974
CALIPER(CALI_MARK_BEGIN("RMQ_RESTART_PUBLISHER");)
994-
std::vector<AMSMessage> messages = _publisher->getMsgBuffer();
975+
auto messages = _publisher->getMsgBuffer();
995976
_publisher_connected = false;
996977
if (messages.size() > 0) {
997978
AMSMessage& msg_min =

0 commit comments

Comments
 (0)