Skip to content

Commit 05d6fae

Browse files
committed
loopback: support locality and local delivery for same-session messages
- Deliver PUT/DELETE, query, reply, and reply-final messages for a key expression that is defined in the same local session locally, without causing network activity. - Extend z_api_source_info_test to validate deliveries both to remote and local subscribers. - Support locality of samples to be received by subscribers or targeted by publishers. - Enable the locality-specific CMake options in CI. User can now set allowed_origin / allowed_destination in z_put_options_t, z_publisher_put_options_t, z_subscriber_options_t, z_queryable_options_t, etc. with the following values: - ZP_LOCALITY_SESSION_LOCAL - stay inside the session (don't send anything to network) - ZP_LOCALITY_REMOTE - send/accept only over the transport (no same-session samples) - ZP_LOCALITY_ANY - allow both, exactly as with zenoh-c's zc_locality_t. Co-developed-by: Denis Biryukov <[email protected]>
1 parent 04d2611 commit 05d6fae

36 files changed

+2454
-186
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
- name: Build & run tests
3333
run: |
3434
sudo apt install -y ninja-build libmbedtls-dev
35-
Z_FEATURE_LINK_TLS=1 Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_PERIODIC_TASKS=1 CMAKE_GENERATOR=Ninja ASAN=ON make BUILD_TYPE=Debug test
35+
Z_FEATURE_LINK_TLS=1 Z_FEATURE_LOCAL_QUERYABLE=1 Z_FEATURE_LOCAL_SUBSCRIBER=1 Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_PERIODIC_TASKS=1 CMAKE_GENERATOR=Ninja ASAN=ON make BUILD_TYPE=Debug test
3636
3737
run_windows_test:
3838
name: Run peer unicast test on windows-latest
@@ -62,7 +62,7 @@ jobs:
6262
- name: Build tests
6363
run: |
6464
sudo apt install -y ninja-build libmbedtls-dev
65-
Z_FEATURE_LINK_TLS=1 Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_PERIODIC_TASKS=1 CMAKE_GENERATOR=Ninja make BUILD_TYPE=Debug
65+
Z_FEATURE_LINK_TLS=1 Z_FEATURE_LOCAL_QUERYABLE=1 Z_FEATURE_LOCAL_SUBSCRIBER=1 Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_PERIODIC_TASKS=1 CMAKE_GENERATOR=Ninja make BUILD_TYPE=Debug
6666
6767
- name: Install valgrind
6868
run: |
@@ -423,7 +423,7 @@ jobs:
423423
- name: Build project
424424
run: |
425425
sudo apt install -y ninja-build
426-
Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_LIVELINESS=1 CMAKE_GENERATOR=Ninja make
426+
Z_FEATURE_LINK_TLS=1 Z_FEATURE_LOCAL_QUERYABLE=1 Z_FEATURE_LOCAL_SUBSCRIBER=1 Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_LIVELINESS=1 CMAKE_GENERATOR=Ninja make
427427
428428
- name: Run test
429429
run: python3 -u ./build/tests/memory_leak.py

.github/workflows/integration.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ jobs:
4747
BUILD_INTEGRATION: ON
4848
Z_FEATURE_LINK_TLS: 1
4949
Z_FEATURE_UNSTABLE_API: 1
50+
Z_FEATURE_LOCAL_QUERYABLE: 1
5051
Z_FEATURE_LOCAL_SUBSCRIBER: 1
5152
Z_FEATURE_PERIODIC_TASKS: 1
5253
Z_FEATURE_ADVANCED_PUBLICATION: 1
@@ -60,8 +61,9 @@ jobs:
6061
BUILD_TESTING: OFF # Workaround for Windows as it seems the previous step is being ignored
6162
BUILD_INTEGRATION: ON # Workaround for Windows as it seems the previous step is being ignored
6263
Z_FEATURE_LINK_TLS: 1
63-
Z_FEATURE_UNSTABLE_API: 1
64+
Z_FEATURE_LOCAL_QUERYABLE: 1
6465
Z_FEATURE_LOCAL_SUBSCRIBER: 1
66+
Z_FEATURE_UNSTABLE_API: 1
6567
ZENOH_BRANCH: main
6668

6769
asan-build:

CMakeLists.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ set(Z_FEATURE_UNICAST_PEER 1 CACHE STRING "Toggle Unicast peer mode")
278278
set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection")
279279
set(Z_FEATURE_MULTICAST_DECLARATIONS 0 CACHE STRING "Toggle multicast resource declarations")
280280
set(Z_FEATURE_PERIODIC_TASKS 0 CACHE STRING "Toggle periodic task support")
281+
set(Z_FEATURE_LOCAL_QUERYABLE 0 CACHE STRING "Toggle local queriables")
281282

282283
# Add a warning message if someone tries to enable Z_FEATURE_LINK_SERIAL_USB directly
283284
if(Z_FEATURE_LINK_SERIAL_USB AND NOT Z_FEATURE_UNSTABLE_API)
@@ -695,6 +696,7 @@ if(UNIX OR MSVC)
695696
add_executable(z_api_queryable_test ${PROJECT_SOURCE_DIR}/tests/z_api_queryable_test.c)
696697
add_executable(z_api_scheduler_test ${PROJECT_SOURCE_DIR}/tests/z_api_scheduler_test.c)
697698
add_executable(z_api_cancellation_test ${PROJECT_SOURCE_DIR}/tests/z_api_cancellation_test.c)
699+
add_executable(z_local_loopback_test ${PROJECT_SOURCE_DIR}/tests/z_local_loopback_test.c)
698700
if(UNIX)
699701
add_executable(z_api_advanced_pubsub_test ${PROJECT_SOURCE_DIR}/tests/z_api_advanced_pubsub_test.c
700702
${PROJECT_SOURCE_DIR}/tests/utils/tcp_proxy.c)
@@ -712,6 +714,15 @@ endif()
712714
target_link_libraries(z_api_source_info_test zenohpico::lib)
713715
target_link_libraries(z_api_queryable_test zenohpico::lib)
714716
target_link_libraries(z_api_scheduler_test zenohpico::lib)
717+
target_link_libraries(z_local_loopback_test zenohpico::lib)
718+
target_include_directories(z_local_loopback_test PRIVATE ${PROJECT_SOURCE_DIR}/include)
719+
target_compile_definitions(z_local_loopback_test PRIVATE Z_LOOPBACK_TESTING=1)
720+
if(PICO_SHARED)
721+
target_compile_definitions(${Libname}_shared PRIVATE Z_LOOPBACK_TESTING=1)
722+
endif()
723+
if(PICO_STATIC)
724+
target_compile_definitions(${Libname}_static PRIVATE Z_LOOPBACK_TESTING=1)
725+
endif()
715726
target_link_libraries(z_api_advanced_pubsub_test zenohpico::lib)
716727
target_link_libraries(z_api_cancellation_test zenohpico::lib)
717728

@@ -735,6 +746,7 @@ endif()
735746
add_test(z_api_source_info_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_source_info_test)
736747
add_test(z_api_queryable_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_queryable_test)
737748
add_test(z_api_scheduler_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_scheduler_test)
749+
add_test(z_local_loopback_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_local_loopback_test)
738750
add_test(z_api_advanced_pubsub_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_advanced_pubsub_test)
739751
endif()
740752
endif()

docs/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,7 @@ Constants
11281128
.. autocenum:: constants.h::z_congestion_control_t
11291129
.. autocenum:: constants.h::z_priority_t
11301130
.. autocenum:: constants.h::z_reliability_t
1131+
.. autocenum:: constants.h::z_locality_t
11311132
11321133
Functions
11331134
---------

include/zenoh-pico/api/constants.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,22 @@ typedef enum z_whatami_t {
5454
} z_whatami_t;
5555
#define Z_WHATAMI_DEFAULT Z_WHATAMI_ROUTER;
5656

57+
/**
58+
* The locality of samples to be received by subscribers or targeted by publishers.
59+
*
60+
* Enumerators:
61+
* Z_LOCALITY_ANY: Allow both session-local and remote traffic.
62+
* Z_LOCALITY_SESSION_LOCAL: Allow session-local traffic only.
63+
* Z_LOCALITY_REMOTE: Allow remote traffic only.
64+
*/
65+
typedef enum z_locality_t {
66+
Z_LOCALITY_ANY = 0,
67+
Z_LOCALITY_SESSION_LOCAL = 1,
68+
Z_LOCALITY_REMOTE = 2,
69+
} z_locality_t;
70+
71+
static inline z_locality_t z_locality_default(void) { return Z_LOCALITY_ANY; }
72+
5773
/**
5874
* Status values for keyexpr canonization operation.
5975
* Used as return value of canonization-related functions,

include/zenoh-pico/api/types.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,11 @@ typedef _z_matching_status_t z_matching_status_t;
165165
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
166166
*/
167167
typedef struct {
168-
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
168+
#if Z_FEATURE_LOCAL_SUBSCRIBER == 1
169+
z_locality_t allowed_origin;
170+
#else
171+
uint8_t __dummy; // keep struct non-empty when locality is disabled
172+
#endif
169173
} z_subscriber_options_t;
170174

171175
/**
@@ -218,6 +222,9 @@ typedef struct {
218222
#ifdef Z_FEATURE_UNSTABLE_API
219223
z_reliability_t reliability;
220224
#endif
225+
#if Z_FEATURE_LOCAL_SUBSCRIBER == 1
226+
z_locality_t allowed_destination;
227+
#endif
221228
} z_publisher_options_t;
222229

223230
/**
@@ -241,6 +248,9 @@ typedef struct z_querier_options_t {
241248
z_query_consolidation_t consolidation;
242249
z_congestion_control_t congestion_control;
243250
bool is_express;
251+
#if Z_FEATURE_LOCAL_QUERYABLE == 1
252+
z_locality_t allowed_destination;
253+
#endif
244254
z_priority_t priority;
245255
uint64_t timeout_ms;
246256
} z_querier_options_t;
@@ -271,6 +281,9 @@ typedef struct z_querier_get_options_t {
271281
*/
272282
typedef struct {
273283
bool complete;
284+
#if Z_FEATURE_LOCAL_QUERYABLE == 1
285+
z_locality_t allowed_origin;
286+
#endif
274287
} z_queryable_options_t;
275288

276289
/**
@@ -349,6 +362,9 @@ typedef struct {
349362
z_timestamp_t *timestamp;
350363
bool is_express;
351364
z_moved_bytes_t *attachment;
365+
#if Z_FEATURE_LOCAL_SUBSCRIBER == 1
366+
z_locality_t allowed_destination;
367+
#endif
352368
#ifdef Z_FEATURE_UNSTABLE_API
353369
z_reliability_t reliability;
354370
z_moved_source_info_t *source_info;
@@ -371,6 +387,9 @@ typedef struct {
371387
z_priority_t priority;
372388
bool is_express;
373389
z_timestamp_t *timestamp;
390+
#if Z_FEATURE_LOCAL_SUBSCRIBER == 1
391+
z_locality_t allowed_destination;
392+
#endif
374393
#ifdef Z_FEATURE_UNSTABLE_API
375394
z_reliability_t reliability;
376395
z_moved_source_info_t *source_info;
@@ -433,6 +452,9 @@ typedef struct {
433452
z_congestion_control_t congestion_control;
434453
z_priority_t priority;
435454
bool is_express;
455+
#if Z_FEATURE_LOCAL_QUERYABLE == 1
456+
z_locality_t allowed_destination;
457+
#endif
436458
z_query_target_t target;
437459
uint64_t timeout_ms;
438460
z_moved_bytes_t *attachment;

include/zenoh-pico/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
#define Z_FEATURE_ENCODING_VALUES 1
5555
#define Z_FEATURE_TCP_NODELAY 1
5656
#define Z_FEATURE_LOCAL_SUBSCRIBER 0
57+
#define Z_FEATURE_LOCAL_QUERYABLE 0
5758
#define Z_FEATURE_SESSION_CHECK 1
5859
#define Z_FEATURE_BATCHING 1
5960
#define Z_FEATURE_BATCH_TX_MUTEX 0

include/zenoh-pico/config.h.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
#define Z_FEATURE_ENCODING_VALUES @Z_FEATURE_ENCODING_VALUES@
5555
#define Z_FEATURE_TCP_NODELAY @Z_FEATURE_TCP_NODELAY@
5656
#define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@
57+
#define Z_FEATURE_LOCAL_QUERYABLE @Z_FEATURE_LOCAL_QUERYABLE@
5758
#define Z_FEATURE_SESSION_CHECK @Z_FEATURE_SESSION_CHECK@
5859
#define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@
5960
#define Z_FEATURE_BATCH_TX_MUTEX @Z_FEATURE_BATCH_TX_MUTEX@

include/zenoh-pico/net/filtering.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "zenoh-pico/api/constants.h"
2020
#include "zenoh-pico/net/session.h"
2121
#include "zenoh-pico/protocol/core.h"
22+
#include "zenoh-pico/utils/locality.h"
2223

2324
#ifdef __cplusplus
2425
extern "C" {
@@ -39,6 +40,13 @@ typedef enum {
3940
WRITE_FILTER_OFF = 1,
4041
} _z_write_filter_state_t;
4142

43+
struct _z_write_filter_registration_t;
44+
45+
typedef enum {
46+
_Z_WRITE_FILTER_SUBSCRIBER = 0,
47+
_Z_WRITE_FILTER_QUERYABLE = 1,
48+
} _z_write_filter_target_type_t;
49+
4250
typedef struct {
4351
_z_session_weak_t zn;
4452
#if Z_FEATURE_MULTI_THREAD == 1
@@ -52,6 +60,11 @@ typedef struct {
5260
uint8_t state;
5361
bool is_complete;
5462
bool is_aggregate;
63+
bool allow_local;
64+
bool allow_remote;
65+
_z_write_filter_target_type_t target_type;
66+
size_t local_targets;
67+
struct _z_write_filter_registration_t *registration;
5568
} _z_write_filter_ctx_t;
5669

5770
z_result_t _z_write_filter_ctx_clear(_z_write_filter_ctx_t *filter);
@@ -67,8 +80,12 @@ typedef struct _z_write_filter_t {
6780
} _z_write_filter_t;
6881

6982
z_result_t _z_write_filter_create(const _z_session_rc_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr,
70-
uint8_t interest_flag, bool complete);
83+
uint8_t interest_flag, bool complete, z_locality_t locality);
7184
z_result_t _z_write_filter_clear(_z_write_filter_t *filter);
85+
void _z_write_filter_notify_subscriber(struct _z_session_t *session, const _z_keyexpr_t *key,
86+
z_locality_t allowed_origin, bool add);
87+
void _z_write_filter_notify_queryable(struct _z_session_t *session, const _z_keyexpr_t *key,
88+
z_locality_t allowed_origin, bool is_complete, bool add);
7289

7390
#if Z_FEATURE_MATCHING
7491
z_result_t _z_write_filter_ctx_add_callback(_z_write_filter_ctx_t *filter, size_t id, _z_closure_matching_status_t *v);

include/zenoh-pico/net/primitives.h

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ _z_keyexpr_t _z_update_keyexpr_to_declared(_z_session_t *zs, _z_keyexpr_t keyexp
110110
*/
111111
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
112112
z_congestion_control_t congestion_control, z_priority_t priority, bool is_express,
113-
z_reliability_t reliability);
113+
z_reliability_t reliability, z_locality_t allowed_destination);
114114

115115
/**
116116
* Undeclare a :c:type:`_z_publisher_t`.
@@ -141,13 +141,15 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub);
141141
* attachment: An optional attachment to this write.
142142
* reliability: The message reliability.
143143
* source_info: The message source info.
144+
* allowed_destination: The allowed destination locality.
144145
* Returns:
145146
* ``0`` in case of success, ``-1`` in case of failure.
146147
*/
147148
z_result_t _z_write(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
148149
const _z_encoding_t *encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
149150
z_priority_t priority, bool is_express, const _z_timestamp_t *timestamp,
150-
const _z_bytes_t *attachment, z_reliability_t reliability, const _z_source_info_t *source_info);
151+
const _z_bytes_t *attachment, z_reliability_t reliability, const _z_source_info_t *source_info,
152+
z_locality_t allowed_destination);
151153
#endif
152154

153155
#if Z_FEATURE_SUBSCRIPTION == 1
@@ -165,7 +167,8 @@ z_result_t _z_write(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_byte
165167
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
166168
*/
167169
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
168-
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper, void *arg);
170+
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper, void *arg,
171+
z_locality_t allowed_origin);
169172

170173
/**
171174
* Undeclare a :c:type:`_z_subscriber_t`.
@@ -195,7 +198,8 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub);
195198
* The created :c:type:`_z_queryable_t` (in null state if the declaration failed)..
196199
*/
197200
_z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, bool complete,
198-
_z_closure_query_callback_t callback, _z_drop_handler_t dropper, void *arg);
201+
_z_closure_query_callback_t callback, _z_drop_handler_t dropper, void *arg,
202+
z_locality_t allowed_origin);
199203

200204
/**
201205
* Undeclare a :c:type:`_z_queryable_t`.
@@ -270,7 +274,7 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
270274
_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
271275
z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control,
272276
z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms,
273-
_z_encoding_t *encoding, z_reliability_t reliability);
277+
_z_encoding_t *encoding, z_reliability_t reliability, z_locality_t allowed_destination);
274278

275279
/**
276280
* Undeclare a :c:type:`_z_querier_t`.
@@ -301,14 +305,15 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier);
301305
* timeout_ms: The timeout value of this query.
302306
* attachment: An optional attachment to this query.
303307
* qos: QoS to apply when routing this query.
308+
* allowed_destination: Locality restrictions for delivery.
304309
* out_id: In case of success the query id will be written to it.
305310
*
306311
*/
307312
z_result_t _z_query(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const char *parameters, size_t parameters_len,
308313
z_query_target_t target, z_consolidation_mode_t consolidation, const _z_bytes_t *payload,
309314
const _z_encoding_t *encoding, _z_closure_reply_callback_t callback, _z_drop_handler_t dropper,
310-
void *arg, uint64_t timeout_ms, const _z_bytes_t *attachment, _z_n_qos_t qos, _z_zint_t *out_id);
311-
315+
void *arg, uint64_t timeout_ms, const _z_bytes_t *attachment, _z_n_qos_t qos,
316+
z_locality_t allowed_destination, _z_zint_t *out_id);
312317
#endif
313318

314319
#if Z_FEATURE_INTEREST == 1

0 commit comments

Comments
 (0)