diff --git a/scripts/gitlab/setup-env.sh b/scripts/gitlab/setup-env.sh index 805f4e4..7b3c2fd 100644 --- a/scripts/gitlab/setup-env.sh +++ b/scripts/gitlab/setup-env.sh @@ -1,5 +1,8 @@ #!/usr/bin/env bash +# Need to use modules +source /etc/profile.d/z00_lmod.sh + host=$(hostname) host=${host//[0-9]/} diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 8ecc370..6c7953b 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -65,6 +65,7 @@ namespace fs = std::experimental::filesystem; #include #include #include +#include #include #include #include @@ -847,7 +848,7 @@ class AMSMessage AMSMessage(const AMSMessage& other) { - DBG(AMSMessage, "Copy AMSMessage : %p -- %d", other._data, other._id); + DBG(AMSMessage, "Copy AMSMessage (%d, %p) <- (%d, %p)", _id, _data, other._id, other._data); swap(other); }; @@ -863,7 +864,7 @@ class AMSMessage AMSMessage& operator=(AMSMessage&& other) noexcept { - // DBG(AMSMessage, "Move AMSMessage : %p -- %d", other._data, other._id); + DBG(AMSMessage, "Move AMSMessage (%d, %p) <- (%d, %p)", _id, _data, other._id, other._data); if (this != &other) { swap(other); other._data = nullptr; @@ -1325,7 +1326,7 @@ class RMQPublisherHandler final : public RMQHandler /** @brief Mutex to protect multithread accesses to _messages */ std::mutex _mutex; /** @brief Messages that have not been successfully acknowledged */ - std::vector _messages; + std::list _messages; public: /** @@ -1351,7 +1352,7 @@ class RMQPublisherHandler final : public RMQHandler * @brief Return the messages that have NOT been acknowledged by the RabbitMQ server. * @return A vector of AMSMessage */ - std::vector& msgBuffer(); + std::list& msgBuffer(); /** * @brief Free AMSMessages held by the handler @@ -1395,13 +1396,13 @@ class RMQPublisherHandler final : public RMQHandler * @param[in] addr Address of memory to free. * @param[in] buffer The vector containing memory buffers */ - void freeMessage(int msg_id, std::vector& buf); + void freeMessage(int msg_id, std::list& buffer); /** * @brief Free the data pointed by each pointer in a vector. * @param[in] buffer The vector containing memory buffers */ - void freeAllMessages(std::vector& buffer); + void freeAllMessages(std::list& buffer); }; // class RMQPublisherHandler @@ -1428,7 +1429,7 @@ class RMQPublisher /** @brief The handler which contains various callbacks for the sender */ std::shared_ptr _handler; /** @brief Buffer holding unacknowledged messages in case of crash */ - std::vector _buffer_msg; + std::list _buffer_msg; public: RMQPublisher(const RMQPublisher&) = delete; @@ -1439,13 +1440,13 @@ class RMQPublisher const AMQP::Address& address, std::string cacert, std::string queue, - std::vector&& msgs_to_send = std::vector()); + std::list&& msgs_to_send = std::list()); /** * @brief Check if the underlying RabbitMQ connection is ready and usable * @return True if the publisher is ready to publish */ - bool ready_publish(); + bool readyPublish(); /** * @brief Wait that the connection is ready (blocking call) @@ -1481,7 +1482,7 @@ class RMQPublisher * acknowledgements have not arrived yet. * @return A vector of AMSMessage */ - std::vector& getMsgBuffer(); + std::list& getMsgBuffer(); /** * @brief Total number of messages successfully acknowledged diff --git a/src/AMSlib/wf/rmqdb.cpp b/src/AMSlib/wf/rmqdb.cpp index 97e89b6..7363172 100644 --- a/src/AMSlib/wf/rmqdb.cpp +++ b/src/AMSlib/wf/rmqdb.cpp @@ -634,33 +634,14 @@ RMQPublisherHandler::RMQPublisherHandler( { } -/** - * @brief Return the messages that have NOT been acknowledged by the RabbitMQ server. - * @return A vector of AMSMessage - */ -std::vector& RMQPublisherHandler::msgBuffer() { return _messages; } +std::list& RMQPublisherHandler::msgBuffer() { return _messages; } -/** - * @brief Free AMSMessages held by the handler - */ void RMQPublisherHandler::cleanup() { freeAllMessages(_messages); } -/** - * @brief Total number of messages sent - * @return Number of messages - */ int RMQPublisherHandler::msgSent() const { return _nb_msg; } -/** - * @brief Total number of messages successfully acknowledged - * @return Number of messages - */ int RMQPublisherHandler::msgAcknowledged() const { return _nb_msg_ack; } -/** - * @brief Total number of messages unacknowledged - * @return Number of messages unacknowledged - */ unsigned RMQPublisherHandler::unacknowledged() const { return _rchannel->unacknowledged(); @@ -770,15 +751,15 @@ void RMQPublisherHandler::onReady(AMQP::TcpConnection* connection) }); } -void RMQPublisherHandler::freeMessage(int msg_id, std::vector& buf) +void RMQPublisherHandler::freeMessage(int msg_id, std::list& buffer) { const std::lock_guard lock(_mutex); auto it = - std::find_if(buf.begin(), buf.end(), [&msg_id](const AMSMessage& obj) { + std::find_if(buffer.begin(), buffer.end(), [&msg_id](const AMSMessage& obj) { return obj.id() == msg_id; }); CFATAL(RMQPublisherHandler, - it == buf.end(), + it == buffer.end(), "Failed to deallocate msg #%d: not found", msg_id) auto& msg = *it; @@ -786,10 +767,10 @@ void RMQPublisherHandler::freeMessage(int msg_id, std::vector& buf) rm.deallocate(msg.data(), AMSResourceType::AMS_HOST); DBG(RMQPublisherHandler, "Deallocated msg #%d (%p)", msg.id(), msg.data()) - buf.erase(it); + buffer.erase(it); } -void RMQPublisherHandler::freeAllMessages(std::vector& buffer) +void RMQPublisherHandler::freeAllMessages(std::list& buffer) { const std::lock_guard lock(_mutex); auto& rm = ams::ResourceManager::getInstance(); @@ -822,7 +803,7 @@ RMQPublisher::RMQPublisher(uint64_t rId, const AMQP::Address& address, std::string cacert, std::string queue, - std::vector&& msgs_to_send) + std::list&& msgs_to_send) : _rId(rId), _queue(queue), _cacert(cacert), @@ -880,7 +861,7 @@ void RMQPublisher::publish(AMSMessage&& message) _handler->publish(std::move(message)); } -bool RMQPublisher::ready_publish() +bool RMQPublisher::readyPublish() { return _connection->ready() && _connection->usable(); } @@ -901,7 +882,7 @@ void RMQPublisher::stop() { event_base_loopexit(_loop.get(), NULL); } bool RMQPublisher::connectionValid() { return _handler->connectionValid(); } -std::vector& RMQPublisher::getMsgBuffer() +std::list& RMQPublisher::getMsgBuffer() { return _handler->msgBuffer(); } @@ -991,7 +972,7 @@ void RMQInterface::restartPublisher() if (_publisher->connectionValid()) return; CALIPER(CALI_MARK_BEGIN("RMQ_RESTART_PUBLISHER");) - std::vector messages = _publisher->getMsgBuffer(); + auto messages = _publisher->getMsgBuffer(); _publisher_connected = false; if (messages.size() > 0) { AMSMessage& msg_min =