Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/cpp/fastdds/publisher/DataWriterHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/writer/RTPSWriter.hpp>

#include <rtps/history/HistoryAttributesExtension.hpp>
#include <rtps/writer/BaseWriter.hpp>

namespace eprosima {
Expand All @@ -48,13 +49,23 @@ HistoryAttributes DataWriterHistory::to_history_attributes(

if (history_qos.kind != KEEP_ALL_HISTORY_QOS)
{
max_samples = history_qos.depth;
max_samples = get_min_max_samples(history_qos.depth, resource_limits_qos.max_samples_per_instance);
if (topic_kind != NO_KEY)
{
max_samples *= resource_limits_qos.max_instances;
if (0 < resource_limits_qos.max_instances)
{
max_samples *= resource_limits_qos.max_instances;
}
else
{
max_samples = LENGTH_UNLIMITED;
}
}

initial_samples = std::min(initial_samples, max_samples);
if (0 < initial_samples)
{
initial_samples = get_min_max_samples(initial_samples, max_samples);
}
}

return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples, extra_samples);
Expand Down
31 changes: 23 additions & 8 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,20 @@ ReturnCode_t DataWriterImpl::enable()
datasharing.add_domain_id(utils::default_domain_id());
}
w_att.endpoint.set_data_sharing_configuration(datasharing);

// Update pool config for KEEP_ALL when max_samples is infinite
if ((0 == pool_config_.maximum_size) && (KEEP_ALL_HISTORY_QOS == qos_.history().kind))
{
// Override infinite with old default value for max_samples + extra samples
pool_config_.maximum_size = 5000;
if (0 < qos_.resource_limits().extra_samples)
{
pool_config_.maximum_size += static_cast<uint32_t>(qos_.resource_limits().extra_samples);
}
EPROSIMA_LOG_ERROR(DATA_WRITER,
"DataWriter with KEEP_ALL history and infinite max_samples is not compatible with DataSharing. "
"Setting max_samples to " << pool_config_.maximum_size);
}
}
else
{
Expand Down Expand Up @@ -2043,7 +2057,8 @@ ReturnCode_t DataWriterImpl::check_qos(
{
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"HISTORY DEPTH '" << qos.history().depth <<
"' is inconsistent with max_samples_per_instance: '" << qos.resource_limits().max_samples_per_instance <<
"' is inconsistent with max_samples_per_instance: '" <<
qos.resource_limits().max_samples_per_instance <<
"'. Consistency rule: depth <= max_samples_per_instance." <<
" Effectively using max_samples_per_instance as depth.");
}
Expand All @@ -2053,19 +2068,19 @@ ReturnCode_t DataWriterImpl::check_qos(
ReturnCode_t DataWriterImpl::check_allocation_consistency(
const DataWriterQos& qos)
{
if ((qos.resource_limits().max_samples > 0) &&
(qos.resource_limits().max_samples <
(qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance)))
if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) &&
(qos.resource_limits().max_samples > 0))
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK,
"max_samples should be greater than max_instances * max_samples_per_instance");
"max_samples should be infinite when max_instances or max_samples_per_instance are infinite");
return RETCODE_INCONSISTENT_POLICY;
}
if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) &&
(qos.resource_limits().max_samples > 0))
if ((qos.resource_limits().max_samples > 0) &&
(qos.resource_limits().max_samples <
(qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance)))
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK,
"max_samples should be infinite when max_instances or max_samples_per_instance are infinite");
"max_samples should be greater than max_instances * max_samples_per_instance");
return RETCODE_INCONSISTENT_POLICY;
}
return RETCODE_OK;
Expand Down
17 changes: 9 additions & 8 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,8 @@ ReturnCode_t DataReaderImpl::check_qos(
{
EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
"HISTORY DEPTH '" << qos.history().depth <<
"' is inconsistent with max_samples_per_instance: '" << qos.resource_limits().max_samples_per_instance <<
"' is inconsistent with max_samples_per_instance: '" <<
qos.resource_limits().max_samples_per_instance <<
"'. Consistency rule: depth <= max_samples_per_instance." <<
" Effectively using max_samples_per_instance as depth.");
}
Expand All @@ -1626,19 +1627,19 @@ ReturnCode_t DataReaderImpl::check_qos(
ReturnCode_t DataReaderImpl::check_allocation_consistency(
const DataReaderQos& qos)
{
if ((qos.resource_limits().max_samples > 0) &&
(qos.resource_limits().max_samples <
(qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance)))
if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) &&
(qos.resource_limits().max_samples > 0))
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK,
"max_samples should be greater than max_instances * max_samples_per_instance");
"max_samples should be infinite when max_instances or max_samples_per_instance are infinite");
return RETCODE_INCONSISTENT_POLICY;
}
if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) &&
(qos.resource_limits().max_samples > 0))
if ((qos.resource_limits().max_samples > 0) &&
(qos.resource_limits().max_samples <
(qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance)))
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK,
"max_samples should be infinite when max_instances or max_samples_per_instance are infinite");
"max_samples should be greater than max_instances * max_samples_per_instance");
return RETCODE_INCONSISTENT_POLICY;
}
return RETCODE_OK;
Expand Down
17 changes: 13 additions & 4 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,16 @@ DataReaderHistory::DataReaderHistory(
{
resource_limited_qos_.max_instances = 1;
resource_limited_qos_.max_samples_per_instance = resource_limited_qos_.max_samples;
key_changes_allocation_.initial = resource_limited_qos_.allocated_samples;
key_changes_allocation_.maximum = resource_limited_qos_.max_samples;

if (0 < resource_limited_qos_.allocated_samples)
{
key_changes_allocation_.initial = resource_limited_qos_.allocated_samples;
}

if (resource_limited_qos_.max_samples_per_instance < std::numeric_limits<int32_t>::max())
{
key_changes_allocation_.maximum = resource_limited_qos_.max_samples_per_instance;
}
instances_.emplace(c_InstanceHandle_Unknown,
std::make_shared<DataReaderInstance>(key_changes_allocation_, key_writers_allocation_));
data_available_instances_[c_InstanceHandle_Unknown] = instances_[c_InstanceHandle_Unknown];
Expand Down Expand Up @@ -260,7 +267,8 @@ bool DataReaderHistory::received_change_keep_last(
if (find_key(a_change->instanceHandle, vit))
{
DataReaderInstance::ChangeCollection& instance_changes = vit->second->cache_changes;
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
auto effective_depth = std::min(history_qos_.depth, resource_limited_qos_.max_samples_per_instance);
if (instance_changes.size() < static_cast<size_t>(effective_depth))
{
ret_value = true;
}
Expand Down Expand Up @@ -795,7 +803,8 @@ bool DataReaderHistory::completed_change_keep_last(
{
bool ret_value = false;
DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes;
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
auto effective_depth = std::min(history_qos_.depth, resource_limited_qos_.max_samples_per_instance);
if (instance_changes.size() < static_cast<size_t>(effective_depth))
{
ret_value = true;
}
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/rtps/history/HistoryAttributesExtension.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ static inline ResourceLimitedContainerConfig resource_limits_from_history(
};
}

/**
* Get the minimum value between two sample counts, considering that <= 0 means unlimited.
*
* @param a First sample count.
* @param b Second sample count.
*
* @return Minimum sample count.
*/
static constexpr int32_t get_min_max_samples(
int32_t a,
int32_t b)
{
return (a > 0 && b > 0) ? (a < b ? a : b) : (a > 0 ? a : b);
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include <fastdds/publisher/history/DataWriterInstance.hpp>

#include <rtps/history/HistoryAttributesExtension.hpp>

namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -57,13 +59,20 @@ class DataWriterHistory : public WriterHistory

if (history_qos.kind != KEEP_ALL_HISTORY_QOS)
{
max_samples = history_qos.depth;
max_samples = get_min_max_samples(history_qos.depth, resource_limits_qos.max_samples_per_instance);
if (topic_kind != NO_KEY)
{
max_samples *= resource_limits_qos.max_instances;
if (0 < resource_limits_qos.max_instances)
{
max_samples *= resource_limits_qos.max_instances;
}
else
{
max_samples = LENGTH_UNLIMITED;
}
}

initial_samples = std::min(initial_samples, max_samples);
initial_samples = get_min_max_samples(initial_samples, max_samples);
}

return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples, extra_samples);
Expand Down
Loading
Loading