diff --git a/src/cpp/fastdds/publisher/DataWriterHistory.cpp b/src/cpp/fastdds/publisher/DataWriterHistory.cpp index 844f63c8cfb..5679bff0348 100644 --- a/src/cpp/fastdds/publisher/DataWriterHistory.cpp +++ b/src/cpp/fastdds/publisher/DataWriterHistory.cpp @@ -27,6 +27,7 @@ #include #include +#include #include namespace eprosima { @@ -48,13 +49,23 @@ HistoryAttributes DataWriterHistory::to_history_attributes( if (history_qos.kind != KEEP_ALL_HISTORY_QOS) { - max_samples = history_qos.depth; + max_samples = get_min_max_samples(history_qos.depth, resource_limits_qos.max_samples_per_instance); if (topic_kind != NO_KEY) { - max_samples *= resource_limits_qos.max_instances; + if (0 < resource_limits_qos.max_instances) + { + max_samples *= resource_limits_qos.max_instances; + } + else + { + max_samples = LENGTH_UNLIMITED; + } } - initial_samples = std::min(initial_samples, max_samples); + if (0 < initial_samples) + { + initial_samples = get_min_max_samples(initial_samples, max_samples); + } } return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples, extra_samples); diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 5f32b3903cf..08b2ae481b3 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -336,6 +336,20 @@ ReturnCode_t DataWriterImpl::enable() datasharing.add_domain_id(utils::default_domain_id()); } w_att.endpoint.set_data_sharing_configuration(datasharing); + + // Update pool config for KEEP_ALL when max_samples is infinite + if ((0 == pool_config_.maximum_size) && (KEEP_ALL_HISTORY_QOS == qos_.history().kind)) + { + // Override infinite with old default value for max_samples + extra samples + pool_config_.maximum_size = 5000; + if (0 < qos_.resource_limits().extra_samples) + { + pool_config_.maximum_size += static_cast(qos_.resource_limits().extra_samples); + } + EPROSIMA_LOG_ERROR(DATA_WRITER, + "DataWriter with KEEP_ALL history and infinite max_samples is not compatible with DataSharing. " + "Setting max_samples to " << pool_config_.maximum_size); + } } else { @@ -2043,7 +2057,8 @@ ReturnCode_t DataWriterImpl::check_qos( { EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "HISTORY DEPTH '" << qos.history().depth << - "' is inconsistent with max_samples_per_instance: '" << qos.resource_limits().max_samples_per_instance << + "' is inconsistent with max_samples_per_instance: '" << + qos.resource_limits().max_samples_per_instance << "'. Consistency rule: depth <= max_samples_per_instance." << " Effectively using max_samples_per_instance as depth."); } @@ -2053,19 +2068,19 @@ ReturnCode_t DataWriterImpl::check_qos( ReturnCode_t DataWriterImpl::check_allocation_consistency( const DataWriterQos& qos) { - if ((qos.resource_limits().max_samples > 0) && - (qos.resource_limits().max_samples < - (qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance))) + if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) && + (qos.resource_limits().max_samples > 0)) { EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, - "max_samples should be greater than max_instances * max_samples_per_instance"); + "max_samples should be infinite when max_instances or max_samples_per_instance are infinite"); return RETCODE_INCONSISTENT_POLICY; } - if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) && - (qos.resource_limits().max_samples > 0)) + if ((qos.resource_limits().max_samples > 0) && + (qos.resource_limits().max_samples < + (qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance))) { EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, - "max_samples should be infinite when max_instances or max_samples_per_instance are infinite"); + "max_samples should be greater than max_instances * max_samples_per_instance"); return RETCODE_INCONSISTENT_POLICY; } return RETCODE_OK; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 7b2ddd59002..22a5feb2c9a 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -1616,7 +1616,8 @@ ReturnCode_t DataReaderImpl::check_qos( { EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "HISTORY DEPTH '" << qos.history().depth << - "' is inconsistent with max_samples_per_instance: '" << qos.resource_limits().max_samples_per_instance << + "' is inconsistent with max_samples_per_instance: '" << + qos.resource_limits().max_samples_per_instance << "'. Consistency rule: depth <= max_samples_per_instance." << " Effectively using max_samples_per_instance as depth."); } @@ -1626,19 +1627,19 @@ ReturnCode_t DataReaderImpl::check_qos( ReturnCode_t DataReaderImpl::check_allocation_consistency( const DataReaderQos& qos) { - if ((qos.resource_limits().max_samples > 0) && - (qos.resource_limits().max_samples < - (qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance))) + if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) && + (qos.resource_limits().max_samples > 0)) { EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, - "max_samples should be greater than max_instances * max_samples_per_instance"); + "max_samples should be infinite when max_instances or max_samples_per_instance are infinite"); return RETCODE_INCONSISTENT_POLICY; } - if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) && - (qos.resource_limits().max_samples > 0)) + if ((qos.resource_limits().max_samples > 0) && + (qos.resource_limits().max_samples < + (qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance))) { EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, - "max_samples should be infinite when max_instances or max_samples_per_instance are infinite"); + "max_samples should be greater than max_instances * max_samples_per_instance"); return RETCODE_INCONSISTENT_POLICY; } return RETCODE_OK; diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 6eb2c2b8e38..728decc1442 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -96,9 +96,16 @@ DataReaderHistory::DataReaderHistory( { resource_limited_qos_.max_instances = 1; resource_limited_qos_.max_samples_per_instance = resource_limited_qos_.max_samples; - key_changes_allocation_.initial = resource_limited_qos_.allocated_samples; - key_changes_allocation_.maximum = resource_limited_qos_.max_samples; + if (0 < resource_limited_qos_.allocated_samples) + { + key_changes_allocation_.initial = resource_limited_qos_.allocated_samples; + } + + if (resource_limited_qos_.max_samples_per_instance < std::numeric_limits::max()) + { + key_changes_allocation_.maximum = resource_limited_qos_.max_samples_per_instance; + } instances_.emplace(c_InstanceHandle_Unknown, std::make_shared(key_changes_allocation_, key_writers_allocation_)); data_available_instances_[c_InstanceHandle_Unknown] = instances_[c_InstanceHandle_Unknown]; @@ -260,7 +267,8 @@ bool DataReaderHistory::received_change_keep_last( if (find_key(a_change->instanceHandle, vit)) { DataReaderInstance::ChangeCollection& instance_changes = vit->second->cache_changes; - if (instance_changes.size() < static_cast(history_qos_.depth)) + auto effective_depth = std::min(history_qos_.depth, resource_limited_qos_.max_samples_per_instance); + if (instance_changes.size() < static_cast(effective_depth)) { ret_value = true; } @@ -795,7 +803,8 @@ bool DataReaderHistory::completed_change_keep_last( { bool ret_value = false; DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes; - if (instance_changes.size() < static_cast(history_qos_.depth)) + auto effective_depth = std::min(history_qos_.depth, resource_limited_qos_.max_samples_per_instance); + if (instance_changes.size() < static_cast(effective_depth)) { ret_value = true; } diff --git a/src/cpp/rtps/history/HistoryAttributesExtension.hpp b/src/cpp/rtps/history/HistoryAttributesExtension.hpp index 44443207b6c..4dbc55f2df6 100644 --- a/src/cpp/rtps/history/HistoryAttributesExtension.hpp +++ b/src/cpp/rtps/history/HistoryAttributesExtension.hpp @@ -49,6 +49,21 @@ static inline ResourceLimitedContainerConfig resource_limits_from_history( }; } +/** + * Get the minimum value between two sample counts, considering that <= 0 means unlimited. + * + * @param a First sample count. + * @param b Second sample count. + * + * @return Minimum sample count. + */ +static constexpr int32_t get_min_max_samples( + int32_t a, + int32_t b) +{ + return (a > 0 && b > 0) ? (a < b ? a : b) : (a > 0 ? a : b); +} + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/test/mock/dds/DataWriterHistory/fastdds/publisher/DataWriterHistory.hpp b/test/mock/dds/DataWriterHistory/fastdds/publisher/DataWriterHistory.hpp index f04652bcf75..1c6c91391f0 100644 --- a/test/mock/dds/DataWriterHistory/fastdds/publisher/DataWriterHistory.hpp +++ b/test/mock/dds/DataWriterHistory/fastdds/publisher/DataWriterHistory.hpp @@ -34,6 +34,8 @@ #include +#include + namespace eprosima { namespace fastdds { namespace dds { @@ -57,13 +59,20 @@ class DataWriterHistory : public WriterHistory if (history_qos.kind != KEEP_ALL_HISTORY_QOS) { - max_samples = history_qos.depth; + max_samples = get_min_max_samples(history_qos.depth, resource_limits_qos.max_samples_per_instance); if (topic_kind != NO_KEY) { - max_samples *= resource_limits_qos.max_instances; + if (0 < resource_limits_qos.max_instances) + { + max_samples *= resource_limits_qos.max_instances; + } + else + { + max_samples = LENGTH_UNLIMITED; + } } - initial_samples = std::min(initial_samples, max_samples); + initial_samples = get_min_max_samples(initial_samples, max_samples); } return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples, extra_samples); diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index a88925674e2..e2075137f79 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -2058,7 +2058,7 @@ TEST_F(DataReaderTests, sample_info) struct arraybuf : public std::streambuf { - template arraybuf( + template arraybuf( std::array& array) { this->setp(array.data(), array.data() + Size - 1); @@ -2069,7 +2069,7 @@ struct arraybuf : public std::streambuf struct oarraystream : virtual arraybuf, std::ostream { - template oarraystream( + template oarraystream( std::array& array) : arraybuf(array) , std::ostream(this) @@ -3901,6 +3901,277 @@ TEST_F(DataReaderTests, data_type_is_plain_data_representation) DomainParticipantFactory::get_instance()->delete_participant(participant); } +// Test parameter structure +struct HistoryDepthTestParams +{ + std::string test_name; + bool use_keyed_type; + HistoryQosPolicyKind history_kind; + int32_t depth; + int32_t max_samples; + int32_t max_samples_per_instance; + int32_t num_samples_to_write; + int32_t expected_samples; + std::string description; +}; + +class HistoryDepthParameterizedTest : + public DataReaderTests, + public ::testing::WithParamInterface +{ +protected: + + void SetUp() override + { + DataReaderTests::SetUp(); + + // Override type based on parameter + if (!GetParam().use_keyed_type) + { + type_.reset(new FooBoundedTypeSupport()); + } + } + + void RunHistoryDepthTest() + { + const auto& params = GetParam(); + + // Configure writer QoS + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + writer_qos.history().kind = params.history_kind; + writer_qos.history().depth = params.depth; + writer_qos.resource_limits().max_samples = params.max_samples; + writer_qos.resource_limits().max_instances = params.use_keyed_type ? 10 : 1; + writer_qos.resource_limits().max_samples_per_instance = params.max_samples_per_instance; + writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + + // Configure reader QoS + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + reader_qos.history().kind = params.history_kind; + reader_qos.history().depth = params.depth; + reader_qos.resource_limits().max_samples = params.max_samples; + reader_qos.resource_limits().max_instances = params.use_keyed_type ? 10 : 1; + reader_qos.resource_limits().max_samples_per_instance = params.max_samples_per_instance; + reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + + // Create entities (handles for keyed types) + if (params.use_keyed_type) + { + create_instance_handles(); + } + create_entities(nullptr, reader_qos, SUBSCRIBER_QOS_DEFAULT, writer_qos); + + // Write samples, wait for some time, and take them from the reader. + int valid_samples = 0; + if (params.use_keyed_type) + { + WriteSamplesKeyed(params.num_samples_to_write); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + valid_samples = TakeSamplesAndCount(); + } + else + { + WriteSamplesNoKey(params.num_samples_to_write); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + valid_samples = TakeSamplesAndCount(); + } + + // Verify expected samples + ASSERT_EQ(params.expected_samples, valid_samples) + << params.description; + } + +private: + + void WriteSamplesKeyed( + int32_t num_samples) + { + FooType data; + data.index(0); + int samples_written = 0; + for (int32_t i = 0; i < num_samples; ++i) + { + data.message()[0] = static_cast('0' + i); + data.message()[1] = '\0'; + ReturnCode_t ret = data_writer_->write(&data, handle_ok_); + if (ret == RETCODE_OK) + { + samples_written++; + } + else if (ret == RETCODE_TIMEOUT) + { + // With KEEP_ALL, this is expected after reaching the limit + EXPECT_EQ(KEEP_ALL_HISTORY_QOS, GetParam().history_kind) + << "Unexpected TIMEOUT with KEEP_LAST"; + EXPECT_GE(samples_written, GetParam().max_samples_per_instance) + << "TIMEOUT before reaching max_samples_per_instance"; + break; // Stop trying to write more + } + else + { + FAIL() << "Unexpected return code: " << ret; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + void WriteSamplesNoKey( + int32_t num_samples) + { + FooBoundedType data; + + for (int32_t i = 0; i < num_samples; ++i) + { + data.index(i); + ASSERT_EQ(RETCODE_OK, data_writer_->write(&data, HANDLE_NIL)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + template + int TakeSamplesAndCount() + { + int valid_samples = 0; + SeqType data_seq; + SampleInfoSeq info_seq; + EXPECT_EQ(RETCODE_OK, data_reader_->take(data_seq, info_seq, LENGTH_UNLIMITED)); + for (LoanableCollection::size_type i = 0; i < info_seq.length(); ++i) + { + if (info_seq[i].valid_data) + { + valid_samples++; + } + } + EXPECT_EQ(RETCODE_OK, data_reader_->return_loan(data_seq, info_seq)); + return valid_samples; + } + +}; + +// The following tests are only available in 3.4.x and backwards since in 3.5.0 +// a history depth higher than max_samples_per_instance throws an error now +// Parameterized test +TEST_P(HistoryDepthParameterizedTest, VerifyHistoryBehavior) +{ + RunHistoryDepthTest(); +} + +// Test parameters +INSTANTIATE_TEST_SUITE_P( + HistoryDepthTests, + HistoryDepthParameterizedTest, + ::testing::Values( + // Keyed type: max_samples_per_instance takes precedence over depth + HistoryDepthTestParams{ + "KeyedType_MaxSamplesPerInstance", + true, // use_keyed_type + KEEP_LAST_HISTORY_QOS, + 100, // depth + 400, // max_samples + 5, // max_samples_per_instance + 120, // num_samples_to_write + 5, // expected_samples + "Expected max_samples_per_instance (5) samples, not depth (100)" + }, + + // Keyed type: unlimited resources, depth takes precedence + HistoryDepthTestParams{ + "KeyedType_UnlimitedResources", + true, // use_keyed_type + KEEP_LAST_HISTORY_QOS, + 10, // depth + -1, // max_samples (unlimited) + -1, // max_samples_per_instance (unlimited) + 12, // num_samples_to_write + 10, // expected_samples + "Expected depth (10) samples, not num_samples_to_write (12)" + }, + + // No-key type: max_samples overrides max_samples_per_instance + HistoryDepthTestParams{ + "NoKeyType_MaxSamplesOverride", + false, // use_keyed_type + KEEP_LAST_HISTORY_QOS, + 10, // depth + 8, // max_samples + 5, // max_samples_per_instance + 12, // num_samples_to_write + 8, // expected_samples + "NO_KEY topic should respect max_samples (8) not max_samples_per_instance (5) or depth (10)" + }, + + // No-key type: unlimited resources, depth takes precedence + HistoryDepthTestParams{ + "NoKeyType_UnlimitedResources", + false, // use_keyed_type + KEEP_LAST_HISTORY_QOS, + 10, // depth + -1, // max_samples (unlimited) + -1, // max_samples_per_instance (unlimited) + 12, // num_samples_to_write + 10, // expected_samples + "Expected depth (10) not num_samples_to_write (12)" + }, + + // Keyed type: max_samples_per_instance takes precedence over depth + HistoryDepthTestParams{ + "KeyedType_KeepAllMaxSamplesPerInstance", + true, // use_keyed_type + KEEP_ALL_HISTORY_QOS, + -1, // depth + 400, // max_samples + 5, // max_samples_per_instance + 12, // num_samples_to_write + 5, // expected_samples + "Expected max_samples_per_instance (5)" + }, + + + // Keyed type: unlimited resources + HistoryDepthTestParams{ + "KeyedType_KeepAllUnlimitedResources", + true, // use_keyed_type + KEEP_ALL_HISTORY_QOS, + -1, // depth + -1, // max_samples (unlimited) + -1, // max_samples_per_instance (unlimited) + 12, // num_samples_to_write + 12, // expected_samples + "Expected num_samples_to_write (12)" + }, + + // No-key type: KEEP_ALL with unlimited depth + HistoryDepthTestParams{ + "NoKeyType_KeepAllUnlimited", + false, // use_keyed_type + KEEP_ALL_HISTORY_QOS, + -1, // depth (unlimited for KEEP_ALL) + -1, // max_samples (unlimited) + -1, // max_samples_per_instance (unlimited) + 12, // num_samples_to_write + 12, // expected_samples + "Expected num_samples_to_write (12)" + }, + + // No-key type: KEEP_ALL with limited max_samples + HistoryDepthTestParams{ + "NoKeyType_KeepAllLimitedMaxSamples", + false, // use_keyed_type + KEEP_ALL_HISTORY_QOS, + -1, // depth (unlimited for KEEP_ALL) + 10, // max_samples + 5, // max_samples_per_instance + 12, // num_samples_to_write + 10, // expected_samples + "Expected max_samples (10) not max_samples_per_instance (5)" + } + ), + [](const testing::TestParamInfo& info) + { + return info.param.test_name; + } + ); + int main( int argc, char** argv)