diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 5916fb81978..67dbd5ebc02 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -367,6 +367,10 @@ Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule( { metatraffic_unicast_port_ += delta; } + else if (default_unicast_port_ == loc.port) + { + default_unicast_port_ += delta; + } loc.port += delta; return loc; } @@ -726,16 +730,17 @@ void RTPSParticipantImpl::setup_meta_traffic() void RTPSParticipantImpl::setup_user_traffic() { // Creation of user locator and receiver resources - //If no default locators are defined we define some. + // If no default locators are defined we define some. /* The reasoning here is the following. If the parameters of the RTPS Participant don't hold default listening locators for the creation of Endpoints, we make some for Unicast only. If there is at least one listen locator of any kind, we do not create any default ones. If there are no sending locators defined, we create default ones for the transports we implement. */ + default_unicast_port_ = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; if (m_att.defaultUnicastLocatorList.empty() && m_att.defaultMulticastLocatorList.empty()) { - //Default Unicast Locators in case they have not been provided + // Default Unicast Locators in case they have not been provided /* INSERT DEFAULT UNICAST LOCATORS FOR THE PARTICIPANT */ get_default_unicast_locators(); internal_default_locators_ = true; @@ -746,11 +751,10 @@ void RTPSParticipantImpl::setup_user_traffic() else { // Locator with port 0, calculate port. - uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; std::for_each(m_att.defaultUnicastLocatorList.begin(), m_att.defaultUnicastLocatorList.end(), [&](Locator_t& loc) { - m_network_Factory.fill_default_locator_port(loc, unicast_port); + m_network_Factory.fill_default_locator_port(loc, default_unicast_port_); }); m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList); @@ -918,7 +922,7 @@ RTPSParticipantImpl::~RTPSParticipantImpl() delete mp_mutex; } -template +template bool RTPSParticipantImpl::preprocess_endpoint_attributes( const EntityId_t& entity_id, std::atomic& id_counter, @@ -1200,7 +1204,7 @@ bool RTPSParticipantImpl::create_writer( return true; } -template +template bool RTPSParticipantImpl::create_reader( RTPSReader** reader_out, ReaderAttributes& param, @@ -1360,8 +1364,8 @@ bool RTPSParticipantImpl::createWriter( bool isBuiltin) { auto callback = [hist, listen, this] - (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller, - IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* + (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller, + IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* { if (is_reliable) { @@ -1409,8 +1413,8 @@ bool RTPSParticipantImpl::createWriter( } auto callback = [hist, listen, entityId, &payload_pool, this] - (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller, - IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* + (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller, + IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* { if (is_reliable) { @@ -1464,8 +1468,8 @@ bool RTPSParticipantImpl::create_writer( } auto callback = [hist, listen, &payload_pool, &change_pool, this] - (const GUID_t& guid, WriterAttributes& watt, fastdds::rtps::FlowController* flow_controller, - IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* + (const GUID_t& guid, WriterAttributes& watt, fastdds::rtps::FlowController* flow_controller, + IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* { if (is_reliable) { @@ -1507,8 +1511,8 @@ bool RTPSParticipantImpl::createReader( bool enable) { auto callback = [hist, listen, this] - (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence, - bool is_reliable) -> RTPSReader* + (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence, + bool is_reliable) -> RTPSReader* { if (is_reliable) { @@ -1553,8 +1557,8 @@ bool RTPSParticipantImpl::createReader( } auto callback = [hist, listen, &payload_pool, this] - (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence, - bool is_reliable) -> RTPSReader* + (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence, + bool is_reliable) -> RTPSReader* { if (is_reliable) { @@ -2500,7 +2504,7 @@ void RTPSParticipantImpl::normalize_endpoint_locators( EndpointAttributes& endpoint_att) { // Locators with port 0, calculate port. - uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; + uint32_t unicast_port = default_unicast_port_; for (Locator_t& loc : endpoint_att.unicastLocatorList) { m_network_Factory.fill_default_locator_port(loc, unicast_port); @@ -2616,11 +2620,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const #endif // if HAVE_SECURITY return (std::min)( - { - max_output_message_size_, - m_network_Factory.get_max_message_size_between_transports(), - max_receiver_buffer_size - }); + { + max_output_message_size_, + m_network_Factory.get_max_message_size_between_transports(), + max_receiver_buffer_size + }); } uint32_t RTPSParticipantImpl::getMaxDataSize() @@ -2840,7 +2844,7 @@ std::unique_ptr RTPSParticipantImpl::get_send_buffer( } void RTPSParticipantImpl::return_send_buffer( - std::unique_ptr && buffer) + std::unique_ptr&& buffer) { send_buffers_->return_buffer(std::move(buffer)); } @@ -3096,8 +3100,7 @@ void RTPSParticipantImpl::get_default_unicast_locators() void RTPSParticipantImpl::get_default_unicast_locators( RTPSParticipantAttributes& att) { - uint32_t unicast_port = metatraffic_unicast_port_ + att.port.offsetd3 - att.port.offsetd1; - m_network_Factory.getDefaultUnicastLocators(att.defaultUnicastLocatorList, unicast_port); + m_network_Factory.getDefaultUnicastLocators(att.defaultUnicastLocatorList, default_unicast_port_); m_network_Factory.NormalizeLocators(att.defaultUnicastLocatorList); } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index cd6b5bdc794..3111246bb89 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -148,14 +148,14 @@ class RTPSParticipantImpl MessageReceiver* mp_receiver; //Associated Readers/Writers inside of MessageReceiver ReceiverControlBlock( - std::shared_ptr& rec) + std::shared_ptr& rec) : Receiver(rec) , mp_receiver(nullptr) { } ReceiverControlBlock( - ReceiverControlBlock&& origen) + ReceiverControlBlock && origen) : Receiver(origen.Receiver) , mp_receiver(origen.mp_receiver) { @@ -174,7 +174,7 @@ class RTPSParticipantImpl private: ReceiverControlBlock( - const ReceiverControlBlock&) = delete; + const ReceiverControlBlock&) = delete; const ReceiverControlBlock& operator =( const ReceiverControlBlock&) = delete; @@ -515,7 +515,7 @@ class RTPSParticipantImpl std::unique_ptr get_send_buffer( const std::chrono::steady_clock::time_point& max_blocking_time); void return_send_buffer( - std::unique_ptr && buffer); + std::unique_ptr&& buffer); uint32_t get_domain_id() const; @@ -556,6 +556,8 @@ class RTPSParticipantImpl RTPSParticipantAttributes m_att; //! Metatraffic unicast port used by default on this participant uint32_t metatraffic_unicast_port_ = 0; + //! Default unicast port used by default on this participant + uint32_t default_unicast_port_ = 0; //!Guid of the RTPSParticipant. GUID_t m_guid; //! String containing the RTPSParticipant Guid. @@ -1141,7 +1143,7 @@ class RTPSParticipantImpl */ std::vector get_netmask_filter_info() const; - template + template static bool preprocess_endpoint_attributes( const EntityId_t& entity_id, std::atomic& id_count, diff --git a/test/dds/communication/CMakeLists.txt b/test/dds/communication/CMakeLists.txt index 9a48b083599..26838cc4060 100644 --- a/test/dds/communication/CMakeLists.txt +++ b/test/dds/communication/CMakeLists.txt @@ -116,6 +116,7 @@ list(APPEND TEST_DEFINITIONS mix_zero_copy_communication close_TCP_client simple_data_sharing_stress + custom_metatraffic_but_empty_default ) @@ -130,6 +131,8 @@ list(APPEND XML_CONFIGURATION_FILES liveliness_assertion_profile.xml liveliness_assertion.360_profile.xml shm_communication_subscriber_dies_while_processing_message_profile.xml + custom_metatraffic_but_empty_default_pub.xml + custom_metatraffic_but_empty_default_sub.xml ) list(APPEND PYTHON_FILES diff --git a/test/dds/communication/SubscriberMain.cpp b/test/dds/communication/SubscriberMain.cpp index eee9b3e0e1e..8a32be06ab7 100644 --- a/test/dds/communication/SubscriberMain.cpp +++ b/test/dds/communication/SubscriberMain.cpp @@ -107,7 +107,7 @@ int main( { if (++arg_count >= argc) { - std::cout << "--run-for expects a parameter" << std::endl; + std::cout << "--timeout expects a parameter" << std::endl; return -1; } diff --git a/test/dds/communication/custom_metatraffic_but_empty_default.json b/test/dds/communication/custom_metatraffic_but_empty_default.json new file mode 100644 index 00000000000..ffe30c4e6b8 --- /dev/null +++ b/test/dds/communication/custom_metatraffic_but_empty_default.json @@ -0,0 +1,19 @@ +{ + "description" : "Test that an additional call to set_qos() does not break communication if only the defaultUnicastLocatorList is not configured", + "participants" : [ + { + "kind" : "publisher", + "xmlfile" : "custom_metatraffic_but_empty_default_pub.xml", + "samples" : "60", + "seed" : "261" + }, + { + "kind" : "subscriber", + "samples" : "40", + "xmlfile" : "custom_metatraffic_but_empty_default_sub.xml", + "rescan" : "5", + "seed" : "261", + "sleep_before_exec" : "2" + } + ] +} diff --git a/test/dds/communication/custom_metatraffic_but_empty_default_pub.xml b/test/dds/communication/custom_metatraffic_but_empty_default_pub.xml new file mode 100644 index 00000000000..ee98bb23a63 --- /dev/null +++ b/test/dds/communication/custom_metatraffic_but_empty_default_pub.xml @@ -0,0 +1,29 @@ + + + + + udp_transport + UDPv4 + + + + + + false + + udp_transport + + + + + + + + RELIABLE + + + OFF + + + + diff --git a/test/dds/communication/custom_metatraffic_but_empty_default_sub.xml b/test/dds/communication/custom_metatraffic_but_empty_default_sub.xml new file mode 100644 index 00000000000..25a111135a5 --- /dev/null +++ b/test/dds/communication/custom_metatraffic_but_empty_default_sub.xml @@ -0,0 +1,55 @@ + + + + + udp_transport + UDPv4 + + + + + + false + + udp_transport + + + + + SIMPLE + + + + + +
239.255.0.1
+ 15150 +
+
+
+ + + + + 15162 + + + +
+ + + +
+
+ + + + + RELIABLE + + + OFF + + + +
diff --git a/test/dds/communication/test_build.py b/test/dds/communication/test_build.py index 509a7e1c01f..340cc5b17eb 100644 --- a/test/dds/communication/test_build.py +++ b/test/dds/communication/test_build.py @@ -21,6 +21,8 @@ import sys import time +SLEEP_TAG = "--sleep_before_exec" + script_dir = os.path.dirname(os.path.realpath(__file__)) seed = str(os.getpid()) @@ -60,7 +62,11 @@ def define_args(tests_definition): 'publishers', 'sleep_before_exec', 'interval', - 'timeout'] + 'timeout', + 'rescan', + 'seed'] + # Note that arg 'seed' is automatically set, but we can override it by + # passing the arg again. This is useful if we need to determine a specific domain for argument in possible_arguments: if argument in test.keys(): @@ -172,23 +178,72 @@ def execute_command(command): return subprocess.Popen(command) +class ScheduledCmd: + def __init__(self, when, kind, cmd): + self.when = when + self.kind = kind + self.cmd = cmd + + +def _extract_sleep(command: list[str]) -> tuple[list[str], int]: + """Return (cleaned_command, sleep_seconds).""" + if SLEEP_TAG not in command: + return command, 0 + + cmd = list(command) + i = cmd.index(SLEEP_TAG) + try: + sleep_s = int(cmd[i + 1]) + except (IndexError, ValueError): + raise ValueError(f"{SLEEP_TAG} must be followed by an integer seconds value: {command}") + + # remove tag and value + del cmd[i:i+2] + return cmd, sleep_s + + def execute_commands(pub_commands, sub_commands, pubsub_commands, logger): """Get test definitions in command lists and execute each process.""" - pubs_proc = [] - subs_proc = [] - pubsubs_proc = [] + # Manually program execution of subprocess + now = time.monotonic() + scheduled: list[ScheduledCmd] = [] for subscriber_command in sub_commands: logger.info(f'Executing subcriber: {subscriber_command}') - subs_proc.append(execute_command(subscriber_command)) + cmd, s = _extract_sleep(subscriber_command) + scheduled.append(ScheduledCmd(when=now + s, kind="sub", cmd=cmd)) for pubsub_command in pubsub_commands: logger.info(f'Executing pubsub: {pubsub_command}') - pubsubs_proc.append(execute_command(pubsub_command)) + cmd, s = _extract_sleep(pubsub_command) + scheduled.append(ScheduledCmd(when=now + s, kind="pubsub", cmd=cmd)) for publisher_command in pub_commands: logger.info(f'Executing publisher: {publisher_command}') - pubs_proc.append(execute_command(publisher_command)) + cmd, s = _extract_sleep(publisher_command) + scheduled.append(ScheduledCmd(when=now + s, kind="pub", cmd=cmd)) + + # Sort scheduled subprocesses + scheduled.sort(key=lambda x: x.when) + pubs_proc = [] + subs_proc = [] + pubsubs_proc = [] + + # Execution of subprocess with scheduling + for item in scheduled: + delay = item.when - time.monotonic() + if delay > 0: + time.sleep(delay) + + logger.info(f"Executing {item.kind}: {item.cmd}") + p = subprocess.Popen(item.cmd) + + if item.kind == "sub": + subs_proc.append(p) + elif item.kind == "pubsub": + pubsubs_proc.append(p) + else: + pubs_proc.append(p) ret_value = 0