Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve memory management of AMSMessage #101

Merged
merged 2 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions scripts/gitlab/setup-env.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env bash

# Need to use modules
source /etc/profile.d/z00_lmod.sh

host=$(hostname)
host=${host//[0-9]/}

Expand Down
21 changes: 11 additions & 10 deletions src/AMSlib/wf/basedb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ namespace fs = std::experimental::filesystem;
#include <chrono>
#include <deque>
#include <future>
#include <list>
#include <random>
#include <thread>
#include <tuple>
Expand Down Expand Up @@ -847,7 +848,7 @@ class AMSMessage

AMSMessage(const AMSMessage& other)
{
DBG(AMSMessage, "Copy AMSMessage : %p -- %d", other._data, other._id);
DBG(AMSMessage, "Copy AMSMessage (%d, %p) <- (%d, %p)", _id, _data, other._id, other._data);
swap(other);
};

Expand All @@ -863,7 +864,7 @@ class AMSMessage

AMSMessage& operator=(AMSMessage&& other) noexcept
{
// DBG(AMSMessage, "Move AMSMessage : %p -- %d", other._data, other._id);
DBG(AMSMessage, "Move AMSMessage (%d, %p) <- (%d, %p)", _id, _data, other._id, other._data);
if (this != &other) {
swap(other);
other._data = nullptr;
Expand Down Expand Up @@ -1325,7 +1326,7 @@ class RMQPublisherHandler final : public RMQHandler
/** @brief Mutex to protect multithread accesses to _messages */
std::mutex _mutex;
/** @brief Messages that have not been successfully acknowledged */
std::vector<AMSMessage> _messages;
std::list<AMSMessage> _messages;

public:
/**
Expand All @@ -1351,7 +1352,7 @@ class RMQPublisherHandler final : public RMQHandler
* @brief Return the messages that have NOT been acknowledged by the RabbitMQ server.
* @return A vector of AMSMessage
*/
std::vector<AMSMessage>& msgBuffer();
std::list<AMSMessage>& msgBuffer();

/**
* @brief Free AMSMessages held by the handler
Expand Down Expand Up @@ -1395,13 +1396,13 @@ class RMQPublisherHandler final : public RMQHandler
* @param[in] addr Address of memory to free.
* @param[in] buffer The vector containing memory buffers
*/
void freeMessage(int msg_id, std::vector<AMSMessage>& buf);
void freeMessage(int msg_id, std::list<AMSMessage>& buffer);

/**
* @brief Free the data pointed by each pointer in a vector.
* @param[in] buffer The vector containing memory buffers
*/
void freeAllMessages(std::vector<AMSMessage>& buffer);
void freeAllMessages(std::list<AMSMessage>& buffer);

}; // class RMQPublisherHandler

Expand All @@ -1428,7 +1429,7 @@ class RMQPublisher
/** @brief The handler which contains various callbacks for the sender */
std::shared_ptr<RMQPublisherHandler> _handler;
/** @brief Buffer holding unacknowledged messages in case of crash */
std::vector<AMSMessage> _buffer_msg;
std::list<AMSMessage> _buffer_msg;

public:
RMQPublisher(const RMQPublisher&) = delete;
Expand All @@ -1439,13 +1440,13 @@ class RMQPublisher
const AMQP::Address& address,
std::string cacert,
std::string queue,
std::vector<AMSMessage>&& msgs_to_send = std::vector<AMSMessage>());
std::list<AMSMessage>&& msgs_to_send = std::list<AMSMessage>());

/**
* @brief Check if the underlying RabbitMQ connection is ready and usable
* @return True if the publisher is ready to publish
*/
bool ready_publish();
bool readyPublish();

/**
* @brief Wait that the connection is ready (blocking call)
Expand Down Expand Up @@ -1481,7 +1482,7 @@ class RMQPublisher
* acknowledgements have not arrived yet.
* @return A vector of AMSMessage
*/
std::vector<AMSMessage>& getMsgBuffer();
std::list<AMSMessage>& getMsgBuffer();

/**
* @brief Total number of messages successfully acknowledged
Expand Down
39 changes: 10 additions & 29 deletions src/AMSlib/wf/rmqdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,33 +634,14 @@ RMQPublisherHandler::RMQPublisherHandler(
{
}

/**
* @brief Return the messages that have NOT been acknowledged by the RabbitMQ server.
* @return A vector of AMSMessage
*/
std::vector<AMSMessage>& RMQPublisherHandler::msgBuffer() { return _messages; }
std::list<AMSMessage>& RMQPublisherHandler::msgBuffer() { return _messages; }

/**
* @brief Free AMSMessages held by the handler
*/
void RMQPublisherHandler::cleanup() { freeAllMessages(_messages); }

/**
* @brief Total number of messages sent
* @return Number of messages
*/
int RMQPublisherHandler::msgSent() const { return _nb_msg; }

/**
* @brief Total number of messages successfully acknowledged
* @return Number of messages
*/
int RMQPublisherHandler::msgAcknowledged() const { return _nb_msg_ack; }

/**
* @brief Total number of messages unacknowledged
* @return Number of messages unacknowledged
*/
unsigned RMQPublisherHandler::unacknowledged() const
{
return _rchannel->unacknowledged();
Expand Down Expand Up @@ -770,26 +751,26 @@ void RMQPublisherHandler::onReady(AMQP::TcpConnection* connection)
});
}

void RMQPublisherHandler::freeMessage(int msg_id, std::vector<AMSMessage>& buf)
void RMQPublisherHandler::freeMessage(int msg_id, std::list<AMSMessage>& buffer)
{
const std::lock_guard<std::mutex> lock(_mutex);
auto it =
std::find_if(buf.begin(), buf.end(), [&msg_id](const AMSMessage& obj) {
std::find_if(buffer.begin(), buffer.end(), [&msg_id](const AMSMessage& obj) {
return obj.id() == msg_id;
});
CFATAL(RMQPublisherHandler,
it == buf.end(),
it == buffer.end(),
"Failed to deallocate msg #%d: not found",
msg_id)
auto& msg = *it;
auto& rm = ams::ResourceManager::getInstance();
rm.deallocate(msg.data(), AMSResourceType::AMS_HOST);

DBG(RMQPublisherHandler, "Deallocated msg #%d (%p)", msg.id(), msg.data())
buf.erase(it);
buffer.erase(it);
}

void RMQPublisherHandler::freeAllMessages(std::vector<AMSMessage>& buffer)
void RMQPublisherHandler::freeAllMessages(std::list<AMSMessage>& buffer)
{
const std::lock_guard<std::mutex> lock(_mutex);
auto& rm = ams::ResourceManager::getInstance();
Expand Down Expand Up @@ -822,7 +803,7 @@ RMQPublisher::RMQPublisher(uint64_t rId,
const AMQP::Address& address,
std::string cacert,
std::string queue,
std::vector<AMSMessage>&& msgs_to_send)
std::list<AMSMessage>&& msgs_to_send)
: _rId(rId),
_queue(queue),
_cacert(cacert),
Expand Down Expand Up @@ -880,7 +861,7 @@ void RMQPublisher::publish(AMSMessage&& message)
_handler->publish(std::move(message));
}

bool RMQPublisher::ready_publish()
bool RMQPublisher::readyPublish()
{
return _connection->ready() && _connection->usable();
}
Expand All @@ -901,7 +882,7 @@ void RMQPublisher::stop() { event_base_loopexit(_loop.get(), NULL); }

bool RMQPublisher::connectionValid() { return _handler->connectionValid(); }

std::vector<AMSMessage>& RMQPublisher::getMsgBuffer()
std::list<AMSMessage>& RMQPublisher::getMsgBuffer()
{
return _handler->msgBuffer();
}
Expand Down Expand Up @@ -991,7 +972,7 @@ void RMQInterface::restartPublisher()
if (_publisher->connectionValid()) return;

CALIPER(CALI_MARK_BEGIN("RMQ_RESTART_PUBLISHER");)
std::vector<AMSMessage> messages = _publisher->getMsgBuffer();
auto messages = _publisher->getMsgBuffer();
_publisher_connected = false;
if (messages.size() > 0) {
AMSMessage& msg_min =
Expand Down