Skip to content

Commit 30b1771

Browse files
committed
loopback: support locality and local delivery for same-session messages
- Deliver PUT/DELETE, query, reply, and reply-final messages for a key expression that is defined in the same local session locally, without causing network activity. - Extend z_api_source_info_test to validate deliveries both to remote and local subscribers. - Support locality of samples to be received by subscribers or targeted by publishers. User can now set allowed_origin / allowed_destination in z_put_options_t, z_publisher_put_options_t, z_subscriber_options_t, z_queryable_options_t, etc. with the following values: - ZP_LOCALITY_SESSION_LOCAL - stay inside the session (don't send anything to network) - ZP_LOCALITY_REMOTE - send/accept only over the transport (no same-session samples) - ZP_LOCALITY_ANY - allow both, exactly as with zenoh-c's zc_locality_t.
1 parent ef600d8 commit 30b1771

File tree

23 files changed

+1648
-118
lines changed

23 files changed

+1648
-118
lines changed

CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ if(UNIX OR MSVC)
684684
add_executable(z_api_source_info_test ${PROJECT_SOURCE_DIR}/tests/z_api_source_info_test.c)
685685
add_executable(z_api_queryable_test ${PROJECT_SOURCE_DIR}/tests/z_api_queryable_test.c)
686686
add_executable(z_api_scheduler_test ${PROJECT_SOURCE_DIR}/tests/z_api_scheduler_test.c)
687+
add_executable(z_local_loopback_test ${PROJECT_SOURCE_DIR}/tests/z_local_loopback_test.c)
687688
if(UNIX)
688689
add_executable(z_api_advanced_pubsub_test ${PROJECT_SOURCE_DIR}/tests/z_api_advanced_pubsub_test.c
689690
${PROJECT_SOURCE_DIR}/tests/utils/tcp_proxy.c)
@@ -701,6 +702,15 @@ endif()
701702
target_link_libraries(z_api_source_info_test zenohpico::lib)
702703
target_link_libraries(z_api_queryable_test zenohpico::lib)
703704
target_link_libraries(z_api_scheduler_test zenohpico::lib)
705+
target_link_libraries(z_local_loopback_test zenohpico::lib)
706+
target_include_directories(z_local_loopback_test PRIVATE ${PROJECT_SOURCE_DIR}/include)
707+
target_compile_definitions(z_local_loopback_test PRIVATE Z_LOOPBACK_TESTING=1)
708+
if(PICO_SHARED)
709+
target_compile_definitions(${Libname}_shared PRIVATE Z_LOOPBACK_TESTING=1)
710+
endif()
711+
if(PICO_STATIC)
712+
target_compile_definitions(${Libname}_static PRIVATE Z_LOOPBACK_TESTING=1)
713+
endif()
704714
target_link_libraries(z_api_advanced_pubsub_test zenohpico::lib)
705715

706716
configure_file(${PROJECT_SOURCE_DIR}/tests/routed.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed.sh COPYONLY)
@@ -722,6 +732,7 @@ endif()
722732
add_test(z_api_source_info_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_source_info_test)
723733
add_test(z_api_queryable_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_queryable_test)
724734
add_test(z_api_scheduler_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_scheduler_test)
735+
add_test(z_local_loopback_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_local_loopback_test)
725736
add_test(z_api_advanced_pubsub_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_advanced_pubsub_test)
726737
endif()
727738
endif()

include/zenoh-pico/api/constants.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#ifndef ZENOH_PICO_API_CONSTANTS_H
1515
#define ZENOH_PICO_API_CONSTANTS_H
1616

17+
#include <stdbool.h>
18+
1719
#define Z_SELECTOR_TIME "_time="
1820
#define Z_SELECTOR_QUERY_MATCH "_anyke"
1921

@@ -54,6 +56,44 @@ typedef enum z_whatami_t {
5456
} z_whatami_t;
5557
#define Z_WHATAMI_DEFAULT Z_WHATAMI_ROUTER;
5658

59+
/**
60+
* The locality of samples to be received by subscribers or targeted by publishers.
61+
*
62+
* Enumerators:
63+
* ZP_LOCALITY_ANY: Allow both session-local and remote traffic.
64+
* ZP_LOCALITY_SESSION_LOCAL: Allow session-local traffic only.
65+
* ZP_LOCALITY_REMOTE: Allow remote traffic only.
66+
*/
67+
typedef enum zp_locality_t {
68+
ZP_LOCALITY_ANY = 0,
69+
ZP_LOCALITY_SESSION_LOCAL = 1,
70+
ZP_LOCALITY_REMOTE = 2,
71+
} zp_locality_t;
72+
73+
static inline zp_locality_t zp_locality_default(void) { return ZP_LOCALITY_ANY; }
74+
75+
static inline bool zp_locality_allows_local(zp_locality_t locality) {
76+
switch (locality) {
77+
case ZP_LOCALITY_REMOTE:
78+
return false;
79+
case ZP_LOCALITY_ANY:
80+
case ZP_LOCALITY_SESSION_LOCAL:
81+
default:
82+
return true;
83+
}
84+
}
85+
86+
static inline bool zp_locality_allows_remote(zp_locality_t locality) {
87+
switch (locality) {
88+
case ZP_LOCALITY_SESSION_LOCAL:
89+
return false;
90+
case ZP_LOCALITY_ANY:
91+
case ZP_LOCALITY_REMOTE:
92+
default:
93+
return true;
94+
}
95+
}
96+
5797
/**
5898
* Status values for keyexpr canonization operation.
5999
* Used as return value of canonization-related functions,

include/zenoh-pico/api/types.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ typedef _z_matching_status_t z_matching_status_t;
164164
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
165165
*/
166166
typedef struct {
167-
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
167+
zp_locality_t allowed_origin;
168168
} z_subscriber_options_t;
169169

170170
/**
@@ -210,6 +210,7 @@ typedef struct {
210210
#ifdef Z_FEATURE_UNSTABLE_API
211211
z_reliability_t reliability;
212212
#endif
213+
zp_locality_t allowed_destination;
213214
} z_publisher_options_t;
214215

215216
/**
@@ -233,6 +234,7 @@ typedef struct z_querier_options_t {
233234
z_query_consolidation_t consolidation;
234235
z_congestion_control_t congestion_control;
235236
bool is_express;
237+
zp_locality_t allowed_destination;
236238
z_priority_t priority;
237239
uint64_t timeout_ms;
238240
} z_querier_options_t;
@@ -259,6 +261,7 @@ typedef struct z_querier_get_options_t {
259261
*/
260262
typedef struct {
261263
bool complete;
264+
zp_locality_t allowed_origin;
262265
} z_queryable_options_t;
263266

264267
/**
@@ -337,6 +340,7 @@ typedef struct {
337340
z_timestamp_t *timestamp;
338341
bool is_express;
339342
z_moved_bytes_t *attachment;
343+
zp_locality_t allowed_destination;
340344
#ifdef Z_FEATURE_UNSTABLE_API
341345
z_reliability_t reliability;
342346
z_moved_source_info_t *source_info;
@@ -359,6 +363,7 @@ typedef struct {
359363
z_priority_t priority;
360364
bool is_express;
361365
z_timestamp_t *timestamp;
366+
zp_locality_t allowed_destination;
362367
#ifdef Z_FEATURE_UNSTABLE_API
363368
z_reliability_t reliability;
364369
z_moved_source_info_t *source_info;
@@ -419,6 +424,7 @@ typedef struct {
419424
z_congestion_control_t congestion_control;
420425
z_priority_t priority;
421426
bool is_express;
427+
zp_locality_t allowed_destination;
422428
z_query_target_t target;
423429
uint64_t timeout_ms;
424430
z_moved_bytes_t *attachment;

include/zenoh-pico/net/primitives.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ _z_keyexpr_t _z_update_keyexpr_to_declared(_z_session_t *zs, _z_keyexpr_t keyexp
110110
*/
111111
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
112112
z_congestion_control_t congestion_control, z_priority_t priority, bool is_express,
113-
z_reliability_t reliability);
113+
z_reliability_t reliability, zp_locality_t allowed_destination);
114114

115115
/**
116116
* Undeclare a :c:type:`_z_publisher_t`.
@@ -147,7 +147,8 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub);
147147
z_result_t _z_write(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
148148
const _z_encoding_t *encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
149149
z_priority_t priority, bool is_express, const _z_timestamp_t *timestamp,
150-
const _z_bytes_t *attachment, z_reliability_t reliability, const _z_source_info_t *source_info);
150+
const _z_bytes_t *attachment, z_reliability_t reliability, const _z_source_info_t *source_info,
151+
zp_locality_t allowed_destination);
151152
#endif
152153

153154
#if Z_FEATURE_SUBSCRIPTION == 1
@@ -165,7 +166,8 @@ z_result_t _z_write(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_byte
165166
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
166167
*/
167168
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
168-
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper, void *arg);
169+
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper, void *arg,
170+
zp_locality_t allowed_origin);
169171

170172
/**
171173
* Undeclare a :c:type:`_z_subscriber_t`.
@@ -195,7 +197,8 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub);
195197
* The created :c:type:`_z_queryable_t` (in null state if the declaration failed)..
196198
*/
197199
_z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, bool complete,
198-
_z_closure_query_callback_t callback, _z_drop_handler_t dropper, void *arg);
200+
_z_closure_query_callback_t callback, _z_drop_handler_t dropper, void *arg,
201+
zp_locality_t allowed_origin);
199202

200203
/**
201204
* Undeclare a :c:type:`_z_queryable_t`.
@@ -270,7 +273,8 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
270273
_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
271274
z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control,
272275
z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms,
273-
_z_encoding_t *encoding, z_reliability_t reliability);
276+
_z_encoding_t *encoding, z_reliability_t reliability,
277+
zp_locality_t allowed_destination);
274278

275279
/**
276280
* Undeclare a :c:type:`_z_querier_t`.
@@ -308,7 +312,7 @@ z_result_t _z_query(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const char *p
308312
z_query_target_t target, z_consolidation_mode_t consolidation, const _z_bytes_t *payload,
309313
const _z_encoding_t *encoding, _z_closure_reply_callback_t callback, _z_drop_handler_t dropper,
310314
void *arg, uint64_t timeout_ms, const _z_bytes_t *attachment, _z_n_qos_t qos,
311-
z_congestion_control_t cong_ctrl);
315+
z_congestion_control_t cong_ctrl, zp_locality_t allowed_destination);
312316
#endif
313317

314318
#if Z_FEATURE_INTEREST == 1

include/zenoh-pico/net/publish.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef INCLUDE_ZENOH_PICO_NET_PUBLISH_H
1616
#define INCLUDE_ZENOH_PICO_NET_PUBLISH_H
1717

18+
#include "zenoh-pico/api/constants.h"
1819
#include "zenoh-pico/net/filtering.h"
1920
#include "zenoh-pico/net/session.h"
2021
#include "zenoh-pico/protocol/core.h"
@@ -35,6 +36,7 @@ typedef struct _z_publisher_t {
3536
z_priority_t _priority;
3637
z_reliability_t reliability;
3738
bool _is_express;
39+
zp_locality_t _allowed_destination;
3840
_z_write_filter_t _filter;
3941
} _z_publisher_t;
4042

include/zenoh-pico/net/query.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ typedef struct _z_querier_t {
6767
z_reliability_t reliability;
6868
bool _is_express;
6969
uint64_t _timeout_ms;
70+
zp_locality_t _allowed_destination;
7071
_z_write_filter_t _filter;
7172
} _z_querier_t;
7273

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//
2+
// Copyright (c) 2025 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <[email protected]>
13+
//
14+
15+
#ifndef ZENOH_PICO_SESSION_LOOPBACK_H
16+
#define ZENOH_PICO_SESSION_LOOPBACK_H
17+
18+
#include <stdbool.h>
19+
#include <stdint.h>
20+
21+
#include "zenoh-pico/api/constants.h"
22+
#include "zenoh-pico/net/query.h"
23+
#include "zenoh-pico/protocol/core.h"
24+
#include "zenoh-pico/protocol/definitions/network.h"
25+
#include "zenoh-pico/session/session.h"
26+
#include "zenoh-pico/transport/transport.h"
27+
28+
#ifdef __cplusplus
29+
extern "C" {
30+
#endif
31+
32+
bool _z_session_deliver_push_locally(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
33+
const _z_encoding_t *encoding, z_sample_kind_t kind, _z_n_qos_t qos,
34+
const _z_timestamp_t *timestamp, const _z_bytes_t *attachment,
35+
z_reliability_t reliability, const _z_source_info_t *source_info,
36+
zp_locality_t allowed_destination);
37+
38+
bool _z_session_deliver_query_locally(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_slice_t *parameters,
39+
z_consolidation_mode_t consolidation, const _z_bytes_t *payload,
40+
const _z_encoding_t *encoding, const _z_bytes_t *attachment,
41+
const _z_source_info_t *source_info, _z_zint_t qid, uint64_t timeout_ms,
42+
_z_n_qos_t qos, zp_locality_t allowed_destination);
43+
44+
bool _z_session_deliver_reply_locally(const _z_query_t *query, const _z_session_rc_t *responder,
45+
const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
46+
const _z_encoding_t *encoding, z_sample_kind_t kind, _z_n_qos_t qos,
47+
const _z_timestamp_t *timestamp, const _z_bytes_t *attachment,
48+
const _z_source_info_t *source_info);
49+
50+
bool _z_session_deliver_reply_err_locally(const _z_query_t *query, const _z_session_rc_t *responder,
51+
const _z_bytes_t *payload, const _z_encoding_t *encoding, _z_n_qos_t qos);
52+
53+
bool _z_session_deliver_reply_final_locally(_z_session_t *zn, _z_zint_t rid, bool allow_remote);
54+
55+
#if defined(Z_LOOPBACK_TESTING)
56+
typedef _z_transport_common_t *(*_z_session_transport_override_fn)(_z_session_t *);
57+
void _z_session_set_transport_common_override(_z_session_transport_override_fn fn);
58+
59+
typedef z_result_t (*_z_session_send_override_fn)(_z_session_t *zn, const _z_network_message_t *n_msg,
60+
z_reliability_t reliability, z_congestion_control_t cong_ctrl,
61+
void *peer, bool *handled);
62+
void _z_transport_set_send_n_msg_override(_z_session_send_override_fn fn);
63+
#endif
64+
65+
#ifdef __cplusplus
66+
}
67+
#endif
68+
69+
#endif /* ZENOH_PICO_SESSION_LOOPBACK_H */

include/zenoh-pico/session/query.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_cont
3636
_z_transport_peer_common_t *peer);
3737
z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg,
3838
_z_entity_global_id_t *replier_id);
39-
z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
39+
z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id, bool finalize);
4040
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
4141
void _z_flush_pending_queries(_z_session_t *zn);
4242
#endif

include/zenoh-pico/session/queryable.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ typedef struct {
3939
_z_keyexpr_t ke_out;
4040
_z_queryable_infos_svec_t infos;
4141
size_t qle_nb;
42+
bool is_remote;
4243
} _z_queryable_cache_data_t;
4344

4445
void _z_queryable_cache_invalidate(_z_session_t *zn);

include/zenoh-pico/session/session.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <stdbool.h>
1919
#include <stdint.h>
2020

21+
#include "zenoh-pico/api/constants.h"
2122
#include "zenoh-pico/collections/element.h"
2223
#include "zenoh-pico/collections/list.h"
2324
#include "zenoh-pico/collections/refcount.h"
@@ -73,6 +74,7 @@ typedef struct {
7374
_z_keyexpr_t _declared_key;
7475
uint16_t _key_id;
7576
uint32_t _id;
77+
zp_locality_t _allowed_origin;
7678
_z_closure_sample_callback_t _callback;
7779
_z_drop_handler_t _dropper;
7880
void *_arg;
@@ -109,6 +111,7 @@ typedef struct {
109111
_z_drop_handler_t _dropper;
110112
void *_arg;
111113
bool _complete;
114+
zp_locality_t _allowed_origin;
112115
} _z_session_queryable_t;
113116

114117
bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session_queryable_t *two);
@@ -137,6 +140,7 @@ typedef struct {
137140
_z_zint_t _id;
138141
_z_closure_reply_callback_t _callback;
139142
_z_drop_handler_t _dropper;
143+
zp_locality_t _allowed_destination;
140144
z_clock_t _start_time;
141145
uint64_t _timeout;
142146
void *_arg;

0 commit comments

Comments
 (0)