Skip to content

Commit a88f739

Browse files
committed
loopback: support local delivery for same-session messages
Deliver PUT/DELETE, query, reply, and reply-final messages for a key expression that is defined in the same local session locally, without causing network activity.
1 parent 7263484 commit a88f739

File tree

7 files changed

+846
-3
lines changed

7 files changed

+846
-3
lines changed

CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ if(UNIX OR MSVC)
684684
add_executable(z_api_source_info_test ${PROJECT_SOURCE_DIR}/tests/z_api_source_info_test.c)
685685
add_executable(z_api_queryable_test ${PROJECT_SOURCE_DIR}/tests/z_api_queryable_test.c)
686686
add_executable(z_api_scheduler_test ${PROJECT_SOURCE_DIR}/tests/z_api_scheduler_test.c)
687+
add_executable(z_local_loopback_test ${PROJECT_SOURCE_DIR}/tests/z_local_loopback_test.c)
687688
if(UNIX)
688689
add_executable(z_api_advanced_pubsub_test ${PROJECT_SOURCE_DIR}/tests/z_api_advanced_pubsub_test.c
689690
${PROJECT_SOURCE_DIR}/tests/utils/tcp_proxy.c)
@@ -701,6 +702,15 @@ endif()
701702
target_link_libraries(z_api_source_info_test zenohpico::lib)
702703
target_link_libraries(z_api_queryable_test zenohpico::lib)
703704
target_link_libraries(z_api_scheduler_test zenohpico::lib)
705+
target_link_libraries(z_local_loopback_test zenohpico::lib)
706+
target_include_directories(z_local_loopback_test PRIVATE ${PROJECT_SOURCE_DIR}/include)
707+
target_compile_definitions(z_local_loopback_test PRIVATE Z_LOOPBACK_TESTING=1)
708+
if(PICO_SHARED)
709+
target_compile_definitions(${Libname}_shared PRIVATE Z_LOOPBACK_TESTING=1)
710+
endif()
711+
if(PICO_STATIC)
712+
target_compile_definitions(${Libname}_static PRIVATE Z_LOOPBACK_TESTING=1)
713+
endif()
704714
target_link_libraries(z_api_advanced_pubsub_test zenohpico::lib)
705715

706716
configure_file(${PROJECT_SOURCE_DIR}/tests/routed.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed.sh COPYONLY)
@@ -722,6 +732,7 @@ endif()
722732
add_test(z_api_source_info_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_source_info_test)
723733
add_test(z_api_queryable_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_queryable_test)
724734
add_test(z_api_scheduler_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_scheduler_test)
735+
add_test(z_local_loopback_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_local_loopback_test)
725736
add_test(z_api_advanced_pubsub_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_advanced_pubsub_test)
726737
endif()
727738
endif()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//
2+
// Copyright (c) 2025 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <[email protected]>
13+
//
14+
15+
#ifndef ZENOH_PICO_SESSION_LOOPBACK_H
16+
#define ZENOH_PICO_SESSION_LOOPBACK_H
17+
18+
#include <stdbool.h>
19+
20+
#include "zenoh-pico/net/query.h"
21+
#include "zenoh-pico/protocol/core.h"
22+
#include "zenoh-pico/protocol/definitions/network.h"
23+
#include "zenoh-pico/session/session.h"
24+
#include "zenoh-pico/transport/transport.h"
25+
26+
#ifdef __cplusplus
27+
extern "C" {
28+
#endif
29+
30+
bool _z_session_deliver_push_locally(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
31+
const _z_encoding_t *encoding, z_sample_kind_t kind, _z_n_qos_t qos,
32+
const _z_timestamp_t *timestamp, const _z_bytes_t *attachment,
33+
z_reliability_t reliability, const _z_source_info_t *source_info);
34+
35+
bool _z_session_deliver_query_locally(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_slice_t *parameters,
36+
z_consolidation_mode_t consolidation, const _z_bytes_t *payload,
37+
const _z_encoding_t *encoding, const _z_bytes_t *attachment,
38+
const _z_source_info_t *source_info, _z_zint_t qid, _z_n_qos_t qos);
39+
40+
bool _z_session_deliver_reply_locally(const _z_query_t *query, const _z_session_rc_t *responder,
41+
const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload,
42+
const _z_encoding_t *encoding, z_sample_kind_t kind, _z_n_qos_t qos,
43+
const _z_timestamp_t *timestamp, const _z_bytes_t *attachment,
44+
const _z_source_info_t *source_info);
45+
46+
bool _z_session_deliver_reply_err_locally(const _z_query_t *query, const _z_session_rc_t *responder,
47+
const _z_bytes_t *payload, const _z_encoding_t *encoding, _z_n_qos_t qos);
48+
49+
bool _z_session_has_remote_targets(const _z_session_t *zn);
50+
bool _z_session_deliver_reply_final_locally(_z_session_t *zn, _z_zint_t rid);
51+
52+
#if defined(Z_LOOPBACK_TESTING)
53+
typedef _z_transport_common_t *(*_z_session_transport_override_fn)(_z_session_t *);
54+
void _z_session_set_transport_common_override(_z_session_transport_override_fn fn);
55+
56+
typedef z_result_t (*_z_session_send_override_fn)(_z_session_t *zn, const _z_network_message_t *n_msg,
57+
z_reliability_t reliability, z_congestion_control_t cong_ctrl,
58+
void *peer, bool *handled);
59+
void _z_transport_set_send_n_msg_override(_z_session_send_override_fn fn);
60+
#endif
61+
62+
#ifdef __cplusplus
63+
}
64+
#endif
65+
66+
#endif /* ZENOH_PICO_SESSION_LOOPBACK_H */

src/net/primitives.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "zenoh-pico/protocol/keyexpr.h"
3434
#include "zenoh-pico/session/interest.h"
3535
#include "zenoh-pico/session/liveliness.h"
36+
#include "zenoh-pico/session/loopback.h"
3637
#include "zenoh-pico/session/query.h"
3738
#include "zenoh-pico/session/queryable.h"
3839
#include "zenoh-pico/session/resource.h"
@@ -195,8 +196,15 @@ z_result_t _z_write(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_byte
195196
z_priority_t priority, bool is_express, const _z_timestamp_t *timestamp,
196197
const _z_bytes_t *attachment, z_reliability_t reliability, const _z_source_info_t *source_info) {
197198
z_result_t ret = _Z_RES_OK;
198-
_z_network_message_t msg;
199199
_z_qos_t qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority);
200+
201+
bool handled_locally = _z_session_deliver_push_locally(zn, keyexpr, payload, encoding, kind, qos, timestamp,
202+
attachment, reliability, source_info);
203+
if (handled_locally && !_z_session_has_remote_targets(zn)) {
204+
return _Z_RES_OK;
205+
}
206+
207+
_z_network_message_t msg;
200208
switch (kind) {
201209
case Z_SAMPLE_KIND_PUT:
202210
_z_n_msg_make_push_put(&msg, keyexpr, payload, encoding, qos, timestamp, attachment, reliability,
@@ -378,6 +386,12 @@ z_result_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, c
378386
}
379387
// Build the reply context decorator. This is NOT the final reply.
380388
_z_n_qos_t qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority);
389+
bool handled_locally = _z_session_deliver_reply_locally(query, zsrc, keyexpr, payload, encoding, kind, qos,
390+
timestamp, att, source_info);
391+
if (handled_locally && !_z_session_has_remote_targets(zn)) {
392+
return _Z_RES_OK;
393+
}
394+
381395
_z_zenoh_message_t z_msg;
382396
switch (kind) {
383397
case Z_SAMPLE_KIND_PUT:
@@ -408,6 +422,11 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
408422
// Build the reply context decorator. This is NOT the final reply.
409423
_z_n_qos_t qos = _z_n_qos_make(false, true, Z_PRIORITY_DEFAULT);
410424
_z_source_info_t source_info = _z_source_info_null();
425+
bool handled_locally = _z_session_deliver_reply_err_locally(query, zsrc, payload, encoding, qos);
426+
if (handled_locally && !_z_session_has_remote_targets(zn)) {
427+
return _Z_RES_OK;
428+
}
429+
411430
_z_zenoh_message_t msg;
412431
_z_n_msg_make_reply_err(&msg, &zn->_local_zid, query->_request_id, Z_RELIABILITY_DEFAULT, qos, payload, encoding,
413432
&source_info);
@@ -501,6 +520,13 @@ z_result_t _z_query(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const char *p
501520
_z_n_msg_make_query(&z_msg, keyexpr, &params, pq->_id, Z_RELIABILITY_DEFAULT, pq->_consolidation, payload, encoding,
502521
timeout_ms, attachment, qos, &source_info);
503522

523+
bool handled_locally = _z_session_deliver_query_locally(zn, keyexpr, &params, pq->_consolidation, payload, encoding,
524+
attachment, &source_info, pq->_id, qos);
525+
if (handled_locally && !_z_session_has_remote_targets(zn)) {
526+
_z_trigger_query_reply_final(zn, pq->_id);
527+
return _Z_RES_OK;
528+
}
529+
504530
if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, cong_ctrl, NULL) != _Z_RES_OK) {
505531
_z_unregister_pending_query(zn, pq);
506532
_Z_ERROR_RETURN(_Z_ERR_TRANSPORT_TX_FAILED);

src/net/query.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "zenoh-pico/net/query.h"
1515

1616
#include "zenoh-pico/net/session.h"
17+
#include "zenoh-pico/session/loopback.h"
1718
#include "zenoh-pico/transport/common/tx.h"
1819
#include "zenoh-pico/utils/logging.h"
1920

@@ -31,10 +32,16 @@ z_result_t _z_query_send_reply_final(_z_query_t *q) {
3132
if (_Z_RC_IS_NULL(&sess_rc)) {
3233
_Z_ERROR_RETURN(_Z_ERR_TRANSPORT_TX_FAILED);
3334
}
35+
36+
_z_session_t *session = _Z_RC_IN_VAL(&sess_rc);
37+
if (_z_session_deliver_reply_final_locally(session, q->_request_id)) {
38+
_z_session_rc_drop(&sess_rc);
39+
return _Z_RES_OK;
40+
}
41+
3442
_z_zenoh_message_t z_msg;
3543
_z_n_msg_make_response_final(&z_msg, q->_request_id);
36-
z_result_t ret =
37-
_z_send_n_msg(_Z_RC_IN_VAL(&sess_rc), &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK, NULL);
44+
z_result_t ret = _z_send_n_msg(session, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK, NULL);
3845
_z_msg_clear(&z_msg);
3946
_z_session_rc_drop(&sess_rc);
4047
return ret;

0 commit comments

Comments
 (0)