From 4e40be56517cc92f52b27171ee107811a5ce459d Mon Sep 17 00:00:00 2001 From: Rakhi Kumari Date: Sat, 4 Oct 2025 10:50:17 +0530 Subject: [PATCH 1/5] Implement local transaction in CPP --- cpp/CMakeLists.txt | 1 + cpp/examples/CMakeLists.txt | 4 +- cpp/examples/simple_recv.cpp | 12 +- cpp/examples/tx_recv.cpp | 111 +++++++++++ cpp/examples/tx_send.cpp | 173 ++++++++++++++++ cpp/include/proton/fwd.hpp | 1 + cpp/include/proton/session.hpp | 20 +- cpp/include/proton/target_options.hpp | 4 + cpp/include/proton/transaction_handler.hpp | 61 ++++++ cpp/src/contexts.hpp | 2 + cpp/src/delivery.cpp | 8 + cpp/src/messaging_adapter.cpp | 41 +++- cpp/src/node_options.cpp | 8 +- cpp/src/sender.cpp | 9 + cpp/src/session.cpp | 219 +++++++++++++++++++++ cpp/src/transaction_handler.cpp | 44 +++++ cpp/src/transaction_test.cpp | 148 ++++++++++++++ 17 files changed, 849 insertions(+), 17 deletions(-) create mode 100644 cpp/examples/tx_recv.cpp create mode 100644 cpp/examples/tx_send.cpp create mode 100644 cpp/include/proton/transaction_handler.hpp create mode 100644 cpp/src/transaction_handler.cpp create mode 100644 cpp/src/transaction_test.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 287bbb5af7..f79590e9ef 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -120,6 +120,7 @@ set(qpid-proton-cpp-source src/terminus.cpp src/timestamp.cpp src/tracker.cpp + src/transaction_handler.cpp src/transfer.cpp src/transport.cpp src/type_id.cpp diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 8e59f0adb5..685a5ed369 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -60,7 +60,9 @@ foreach(example scheduled_send service_bus multithreaded_client - multithreaded_client_flow_control) + multithreaded_client_flow_control + tx_send + tx_recv) add_executable(${example} ${example}.cpp) target_link_libraries(${example} Proton::cpp Threads::Threads) endforeach() diff --git a/cpp/examples/simple_recv.cpp b/cpp/examples/simple_recv.cpp index 0dadf75b86..fece839586 100644 --- a/cpp/examples/simple_recv.cpp +++ b/cpp/examples/simple_recv.cpp @@ -43,10 +43,11 @@ class simple_recv : public proton::messaging_handler { proton::receiver receiver; int expected; int received; + bool verbose; public: - simple_recv(const std::string &s, const std::string &u, const std::string &p, int c) : - url(s), user(u), password(p), expected(c), received(0) {} + simple_recv(const std::string &s, const std::string &u, const std::string &p, int c, bool verbose) : + url(s), user(u), password(p), expected(c), received(0), verbose(verbose) {} void on_container_start(proton::container &c) override { proton::connection_options co; @@ -61,6 +62,9 @@ class simple_recv : public proton::messaging_handler { } if (expected == 0 || received < expected) { + if (verbose) { + std::cout << msg << ": "; + } std::cout << msg.body() << std::endl; received++; @@ -77,18 +81,20 @@ int main(int argc, char **argv) { std::string user; std::string password; int message_count = 100; + bool verbose; example::options opts(argc, argv); opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL"); opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT"); opts.add_value(user, 'u', "user", "authenticate as USER", "USER"); opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD"); + opts.add_flag(verbose, 'v', "verbose", "show whole message contents"); try { opts.parse(); - simple_recv recv(address, user, password, message_count); + simple_recv recv(address, user, password, message_count, verbose); proton::container(recv).run(); return 0; diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp new file mode 100644 index 0000000000..0da433b78a --- /dev/null +++ b/cpp/examples/tx_recv.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +class tx_recv : public proton::messaging_handler { + private: + proton::sender sender; + proton::receiver receiver; + std::string conn_url_; + std::string addr_; + int expected; + int received = 0; + std::atomic unique_msg_id; + + public: + tx_recv(const std::string& u, const std::string &a, int c): + conn_url_(u), addr_(a), expected(c), unique_msg_id(20000) {} + + void on_container_start(proton::container &c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) override { + receiver = c.open_receiver(addr_); + sender = c.open_sender(addr_); + } + + void on_session_open(proton::session &s) override { + std::cout << "New session is open" << std::endl; + } + + void on_message(proton::delivery &d, proton::message &msg) override { + std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; + d.accept(); + proton::message reply_message; + + reply_message.id(std::atomic_fetch_add(&unique_msg_id, 1)); + reply_message.body(msg.body()); + reply_message.reply_to(receiver.source().address()); + + sender.send(reply_message); + + received += 1; + if (received == expected) { + receiver.connection().close(); + } + } +}; + +int main(int argc, char **argv) { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + int message_count = 6; + example::options opts(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL"); + opts.add_value(addr, 'a', "address", "connect and send to address", "URL"); + opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); + + try { + opts.parse(); + + tx_recv recv(conn_url, addr, message_count); + proton::container(recv).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp new file mode 100644 index 0000000000..26d1cefc63 --- /dev/null +++ b/cpp/examples/tx_send.cpp @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string conn_url_; + std::string addr_; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + std::atomic unique_msg_id; + + public: + tx_send(const std::string& u, const std::string& a, int c, int b): + conn_url_(u), addr_(a), total(c), batch_size(b), sent(0), unique_msg_id(10000) {} + + void on_container_start(proton::container &c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) override { + sender = c.open_sender(addr_); + } + + void on_session_open(proton::session& s) override { + std::cout << "New session is open, declaring transaction now..." << std::endl; + s.transaction_declare(*this); + } + + void on_transaction_declare_failed(proton::session s) override { + std::cout << "Transaction declarion failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_commit_failed(proton::session s) override { + std::cout << "Transaction commit failed!" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + send(); + } + + void on_sendable(proton::sender&) override { + send(); + } + + void send() { + proton::session session = sender.session(); + while (session.transaction_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + std::map m; + m["sequence"] = committed + current_batch; + + msg.id(std::atomic_fetch_add(&unique_msg_id, 1)); + msg.body(m); + std::cout << "Sending [sender batch " << batch_index << "]: " << msg << std::endl; + sender.send(msg); + current_batch += 1; + if(current_batch == batch_size) + { + std::cout << "In this example we commit even batch index and abort otherwise." << std::endl; + if (batch_index % 2 == 0) { + std::cout << "Commiting transaction..." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Aborting transaction..." << std::endl; + session.transaction_abort(); + } + batch_index++; + } + } + } + + void on_transaction_committed(proton::session s) override { + committed += current_batch; + current_batch = 0; + std::cout << "Transaction commited" << std::endl; + if(committed == total) { + std::cout << "All messages committed, closing connection." << std::endl; + s.connection().close(); + } + else { + std::cout << "Re-declaring transaction now..." << std::endl; + s.transaction_declare(*this); + } + } + + void on_transaction_aborted(proton::session s) override { + std::cout << "Transaction aborted!" << std::endl; + std::cout << "Re-delaring transaction now..." << std::endl; + current_batch = 0; + s.transaction_declare(*this); + } + + void on_sender_close(proton::sender &s) override { + current_batch = 0; + } + +}; + +int main(int argc, char **argv) { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + int message_count = 6; + int batch_size = 3; + example::options opts(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL"); + opts.add_value(addr, 'a', "address", "connect and send to address", "URL"); + opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); + opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); + + try { + opts.parse(); + + tx_send send(conn_url, addr, message_count, batch_size); + proton::container(send).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} diff --git a/cpp/include/proton/fwd.hpp b/cpp/include/proton/fwd.hpp index 801d269323..31e4d2c49e 100644 --- a/cpp/include/proton/fwd.hpp +++ b/cpp/include/proton/fwd.hpp @@ -55,6 +55,7 @@ class source_options; class ssl; class target_options; class tracker; +class transaction_handler; class transport; class url; class void_function0; diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp index 60522c817b..710b2ad870 100644 --- a/cpp/include/proton/session.hpp +++ b/cpp/include/proton/session.hpp @@ -37,6 +37,10 @@ struct pn_session_t; namespace proton { +/// @cond INTERNAL +class transaction_impl; +/// @endcond + /// A container of senders and receivers. class PN_CPP_CLASS_EXTERN session : public internal::object, public endpoint { @@ -105,14 +109,23 @@ PN_CPP_CLASS_EXTERN session : public internal::object, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void transaction_declare(proton::transaction_handler &handler, bool settle_before_discharge = false); + PN_CPP_EXTERN bool transaction_is_declared(); + PN_CPP_EXTERN proton::binary transaction_id() const; + PN_CPP_EXTERN void transaction_commit(); + PN_CPP_EXTERN void transaction_abort(); + + + /// @cond INTERNAL - friend class internal::factory; - friend class session_iterator; + friend class internal::factory; + friend class sender; + friend class session_iterator; + friend class transaction_impl; /// @endcond }; /// @cond INTERNAL - /// An iterator of sessions. class session_iterator : public internal::iter_base { public: @@ -126,7 +139,6 @@ class session_iterator : public internal::iter_base { typedef internal::iter_range session_range; /// @endcond - } // proton #endif // PROTON_SESSION_HPP diff --git a/cpp/include/proton/target_options.hpp b/cpp/include/proton/target_options.hpp index f5fe991777..b612671ab7 100644 --- a/cpp/include/proton/target_options.hpp +++ b/cpp/include/proton/target_options.hpp @@ -60,6 +60,10 @@ class target_options { /// address is ignored if dynamic() is true. PN_CPP_EXTERN target_options& address(const std::string& addr); + /// Set the target be of type coordinator. + /// This immediately override the currently assigned type. + PN_CPP_EXTERN target_options& make_coordinator(); + /// Request that a node be dynamically created by the remote peer. /// The default is false. Any specified target address() is /// ignored if true. diff --git a/cpp/include/proton/transaction_handler.hpp b/cpp/include/proton/transaction_handler.hpp new file mode 100644 index 0000000000..1e229fbf56 --- /dev/null +++ b/cpp/include/proton/transaction_handler.hpp @@ -0,0 +1,61 @@ +#ifndef PROTON_TRANSACTION_HPP +#define PROTON_TRANSACTION_HPP + + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "./fwd.hpp" +#include "./internal/export.hpp" +#include "./sender.hpp" +#include "./tracker.hpp" +#include "./container.hpp" + +/// @file +/// @copybrief proton::transaction + +namespace proton { + +class +PN_CPP_CLASS_EXTERN transaction_handler { + public: + PN_CPP_EXTERN virtual ~transaction_handler(); + + /// Called when a local transaction is declared. + PN_CPP_EXTERN virtual void on_transaction_declared(session); + + /// Called when a local transaction is discharged successfully. + PN_CPP_EXTERN virtual void on_transaction_committed(session); + + /// Called when a local transaction is discharged unsuccessfully (aborted). + PN_CPP_EXTERN virtual void on_transaction_aborted(session); + + /// Called when a local transaction declare fails. + PN_CPP_EXTERN virtual void on_transaction_declare_failed(session); + + /// Called when the commit of a local transaction fails. + PN_CPP_EXTERN virtual void on_transaction_commit_failed(session); +}; + +} // namespace proton + +#endif // PROTON_TRANSACTION_HPP diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 7ebab1d7b9..9025483e6b 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -25,6 +25,7 @@ #include "reconnect_options_impl.hpp" #include "proton/work_queue.hpp" +#include "proton/session.hpp" #include "proton/message.hpp" #include "proton/object.h" @@ -152,6 +153,7 @@ class session_context : public context { session_context() : handler(0), user_data_(nullptr) {} static session_context& get(pn_session_t* s); + transaction_impl* _txn_impl; messaging_handler* handler; void* user_data_; }; diff --git a/cpp/src/delivery.cpp b/cpp/src/delivery.cpp index 9f03eaf734..ec986a5f4a 100644 --- a/cpp/src/delivery.cpp +++ b/cpp/src/delivery.cpp @@ -34,7 +34,15 @@ namespace { void settle_delivery(pn_delivery_t* o, uint64_t state) { + proton::session session = proton::make_wrapper(o).session(); + if(session.transaction_is_declared()) { + // Transactional disposition + auto disp = pn_transactional_disposition(pn_delivery_local(o)); + pn_transactional_disposition_set_id(disp, pn_bytes(session.transaction_id())); + pn_transactional_disposition_set_outcome_type(disp, state); + } else { pn_delivery_update(o, state); + } pn_delivery_settle(o); } diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index f90cd7613c..85f9a2c085 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -30,6 +30,7 @@ #include "proton/receiver_options.hpp" #include "proton/sender.hpp" #include "proton/sender_options.hpp" +#include "proton/target_options.hpp" #include "proton/session.hpp" #include "proton/tracker.hpp" #include "proton/transport.hpp" @@ -40,6 +41,7 @@ #include #include +#include #include #include #include @@ -69,7 +71,8 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) { // TODO: process session flow data, if no link-specific data, just return. if (!lnk) return; int state = pn_link_state(lnk); - if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR || + ((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) { link_context& lctx = link_context::get(lnk); if (pn_link_is_sender(lnk)) { if (pn_link_credit(lnk) > 0) { @@ -116,7 +119,29 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { link_context& lctx = link_context::get(lnk); Tracing& ot = Tracing::getTracing(); - if (pn_link_is_receiver(lnk)) { + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { + if (pn_delivery_updated(dlv)) { + tracker t(make_wrapper(dlv)); + ot.on_settled_span(t); + switch (pn_delivery_remote_state(dlv)) { + case PN_ACCEPTED: + handler.on_tracker_accept(t); + break; + case PN_REJECTED: + handler.on_tracker_reject(t); + break; + case PN_RELEASED: + case PN_MODIFIED: + handler.on_tracker_release(t); + break; + } + if (t.settled()) { + handler.on_tracker_settle(t); + if (lctx.auto_settle) + t.settle(); + } + } + } else if (pn_link_is_receiver(lnk)) { delivery d(make_wrapper(dlv)); if (pn_delivery_aborted(dlv)) { pn_delivery_settle(dlv); @@ -274,11 +299,13 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { auto lnk = pn_event_link(event); - // Currently don't implement (transaction) coordinator - if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - auto error = pn_link_condition(lnk); - pn_condition_set_name(error, "amqp:not-implemented"); - pn_link_close(lnk); + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { + auto cond = pn_link_condition(lnk); + if (pn_condition_is_set(cond)) { + pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED"); + pn_link_close(lnk); + return; + } return; } if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp index fd489baf39..2c172e4a1f 100644 --- a/cpp/src/node_options.cpp +++ b/cpp/src/node_options.cpp @@ -162,6 +162,7 @@ class target_options::impl { option expiry_policy; option > capabilities; option dynamic_properties; + option is_coordinator; void apply(target& t) { node_address(t, address, dynamic, anonymous); @@ -175,6 +176,9 @@ class target_options::impl { get(dynamic_properties.value, target_map); value(pn_terminus_properties(unwrap(t))) = target_map; } + if (is_coordinator.set && is_coordinator.value) { + pn_terminus_set_type(unwrap(t), pn_terminus_type_t(PN_COORDINATOR)); + } } }; @@ -201,8 +205,8 @@ target_options& target_options::dynamic_properties(const target::dynamic_propert return *this; } -void target_options::apply(target& s) const { impl_->apply(s); } - +target_options& target_options::make_coordinator() { impl_->is_coordinator = true; return *this; } +void target_options::apply(target& s) const { impl_->apply(s); } } // namespace proton diff --git a/cpp/src/sender.cpp b/cpp/src/sender.cpp index 942e755b0a..4a6bcd148a 100644 --- a/cpp/src/sender.cpp +++ b/cpp/src/sender.cpp @@ -26,10 +26,12 @@ #include "proton/source.hpp" #include "proton/target.hpp" #include "proton/tracker.hpp" +#include "types_internal.hpp" #include #include #include +#include #include "proton_bits.hpp" #include "contexts.hpp" @@ -84,6 +86,13 @@ tracker sender::send(const message &message, const binary &tag) { pn_delivery_settle(dlv); if (!pn_link_credit(pn_object())) link_context::get(pn_object()).draining = false; + + // If transaction is declared + if (session().transaction_is_declared()) { + auto disp = pn_transactional_disposition(pn_delivery_local(unwrap(track))); + pn_transactional_disposition_set_id(disp, pn_bytes(session().transaction_id())); + } + return track; } diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index b8f777a005..6520d36f26 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -21,13 +21,23 @@ #include "proton/session.hpp" #include "proton/connection.hpp" +#include "proton/delivery.h" +#include "proton/delivery.hpp" +#include "proton/error.hpp" #include "proton/receiver_options.hpp" #include "proton/sender_options.hpp" #include "proton/session_options.hpp" +#include "proton/target_options.hpp" +#include "proton/transaction_handler.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/tracker.hpp" +#include "proton/transfer.hpp" +#include "types_internal.hpp" #include "contexts.hpp" #include "link_namer.hpp" #include "proton_bits.hpp" +#include #include #include @@ -70,6 +80,7 @@ std::string next_link_name(const connection& c) { return ln ? ln->link_name() : uuid::random().str(); } + } sender session::open_sender(const std::string &addr) { @@ -148,4 +159,212 @@ void* session::user_data() const { return sctx.user_data_; } +class transaction_impl { + public: + proton::sender txn_ctrl; + proton::transaction_handler *handler = nullptr; + proton::binary transaction_id; + bool failed = false; + enum State { + FREE, + DECLARING, + DECLARED, + DISCHARGING, + }; + enum State state = State::FREE; + std::vector pending; + + void commit(); + void abort(); + void declare(); + + void discharge(bool failed); + void release_pending(); + + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); + void handle_outcome(proton::tracker t); + transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge); + ~transaction_impl(); +}; + + +namespace { + +bool transaction_is_empty(const session& s) { return session_context::get(unwrap(s))._txn_impl == NULL; } + +void transaction_handle_outcome(const session& s, proton::tracker t) { + session_context::get(unwrap(s))._txn_impl->handle_outcome(t); +} + +void transaction_delete(const session& s) { auto &_txn_impl = session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = nullptr;} + +} + +void session::transaction_declare(proton::transaction_handler &handler, bool settle_before_discharge) { + auto &txn_impl = session_context::get(pn_object())._txn_impl; + if (txn_impl == nullptr) { + // Create _txn_impl + proton::connection conn = this->connection(); + class InternalTransactionHandler : public proton::messaging_handler { + + void on_tracker_settle(proton::tracker &t) override { + if (!transaction_is_empty(t.session())) { + transaction_handle_outcome(t.session(), t); + } + } + }; + + proton::target_options opts; + std::vector cap = {proton::symbol("amqp:local-transactions")}; + opts.capabilities(cap); + opts.make_coordinator(); + + proton::sender_options so; + so.name("txn-ctrl"); + so.target(opts); + + static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it + so.handler(internal_handler); + + static proton::sender s = conn.open_sender("does not matter", so); + + settle_before_discharge = false; + + txn_impl = new transaction_impl(s, handler, settle_before_discharge); + } + // Declare txn + txn_impl->declare(); +} + + +proton::binary session::transaction_id() const { return session_context::get(pn_object())._txn_impl->transaction_id; } +void session::transaction_commit() { session_context::get(pn_object())._txn_impl->commit(); } +void session::transaction_abort() { session_context::get(pn_object())._txn_impl->abort(); } +bool session::transaction_is_declared() { return (!transaction_is_empty(*this)) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; } + +transaction_impl::transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge) + : txn_ctrl(_txn_ctrl), handler(&_handler) { +} +transaction_impl::~transaction_impl() {} + +void transaction_impl::commit() { + discharge(false); +} + +void transaction_impl::abort() { + discharge(true); +} + +void transaction_impl::declare() { + if (state != transaction_impl::State::FREE) + throw proton::error("This session has some associcated transaction already"); + state = State::DECLARING; + + proton::symbol descriptor("amqp:declare:list"); + std::list vd; + proton::value i_am_null; + vd.push_back(i_am_null); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +void transaction_impl::discharge(bool _failed) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + state = State::DISCHARGING; + + failed = _failed; + proton::symbol descriptor("amqp:discharge:list"); + std::list vd; + vd.push_back(transaction_id); + vd.push_back(failed); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << _value + << proton::codec::finish(); + + + proton::message msg = msg_value; + proton::tracker delivery = txn_ctrl.send(msg); + return delivery; +} + +void transaction_impl::release_pending() { + for (auto d : pending) { + delivery d2(make_wrapper(unwrap(d))); + d2.release(); + } + pending.clear(); +} + +void transaction_impl::handle_outcome(proton::tracker t) { + pn_disposition_t *disposition = pn_delivery_remote(unwrap(t)); + if (state == State::DECLARING) { + // Attempting to declare transaction + proton::value val(pn_disposition_data(disposition)); + auto vd = get>(val); + if (vd.size() > 0) { + transaction_id = vd[0]; + state = State::DECLARED; + handler->on_transaction_declared(t.session()); + return; + } else if (pn_disposition_is_failed(disposition)) { + state = State::FREE; + transaction_delete(t.session()); + handler->on_transaction_declare_failed(t.session()); + return; + } else { + state = State::FREE; + transaction_delete(t.session()); + handler->on_transaction_declare_failed(t.session()); + return; + } + } else if (state == State::DISCHARGING) { + // Attempting to commit/abort transaction + if (pn_disposition_is_failed(disposition)) { + if (!failed) { + state = State::FREE; + transaction_delete(t.session()); + handler->on_transaction_commit_failed(t.session()); + release_pending(); + return; + } else { + state = State::FREE; + transaction_delete(t.session()); + // Transaction abort failed. + return; + } + } else { + if (failed) { + // Transaction abort is successful + state = State::FREE; + transaction_delete(t.session()); + handler->on_transaction_aborted(t.session()); + release_pending(); + return; + } else { + // Transaction commit is successful + state = State::FREE; + transaction_delete(t.session()); + handler->on_transaction_committed(t.session()); + return; + } + } + pending.clear(); + return; + } + throw proton::error("reached unintended state in local transaction handler"); +} + } // namespace proton diff --git a/cpp/src/transaction_handler.cpp b/cpp/src/transaction_handler.cpp new file mode 100644 index 0000000000..bee861f9db --- /dev/null +++ b/cpp/src/transaction_handler.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/transaction_handler.hpp" +#include "proton/delivery.h" +#include "proton/delivery.hpp" +#include "proton/message.hpp" +#include "proton/target_options.hpp" +#include "proton/tracker.hpp" +#include "proton/transfer.hpp" + +#include "proton_bits.hpp" +#include + +#include + +namespace proton { + +transaction_handler::~transaction_handler() = default; +void transaction_handler::on_transaction_declared(session) {} +void transaction_handler::on_transaction_committed(session) {} +void transaction_handler::on_transaction_aborted(session) {} +void transaction_handler::on_transaction_declare_failed(session) {} +void transaction_handler::on_transaction_commit_failed(session) {} + +} diff --git a/cpp/src/transaction_test.cpp b/cpp/src/transaction_test.cpp new file mode 100644 index 0000000000..cb4207f711 --- /dev/null +++ b/cpp/src/transaction_test.cpp @@ -0,0 +1,148 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_bits.hpp" + +#include +#include +#include +#include + +namespace { +std::mutex m; +std::condition_variable cv; +bool listener_ready = false; +int listener_port; +} //namespace + +class test_recv : public proton::messaging_handler { + private: + class listener_ready_handler : public proton::listen_handler { + void on_open(proton::listener &l) override { + { + std::lock_guard lk(m); + listener_port = l.port(); + listener_ready = true; + } + cv.notify_one(); + } + }; + + std::string url; + proton::listener listener; + listener_ready_handler listen_handler; + + public: + test_recv(const std::string &s) : url(s) {} + + void on_container_start(proton::container &c) override { + listener = c.listen(url, listen_handler); + } + + void on_message(proton::delivery &d, proton::message &msg) override { + + } +}; + +class test_send : public proton::messaging_handler, proton::transaction_handler { + private: + std::string url; + proton::sender sender; + proton::session session; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + int confirmed = 0; + + int batch_size = 3; + int total = 6; + public: + test_send(const std::string &s) : url(s) {} + + void on_container_start(proton::container &c) override { + proton::connection_options co; + sender = c.open_sender(url, co); + } + + + void on_session_open(proton::session &s) override { + session = s; + s.declare_transaction(*this); + } + + + void on_transaction_declared(proton::session s) override { + send(sender); + } + + void on_sendable(proton::sender &s) override { + send(s); + } + + void send(proton::sender &s) { + static int unique_id = 10000; + while (session.transaction_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + std::map m; + m["sequence"] = committed + current_batch; + + msg.id(unique_id++); + msg.body(m); + s.send(msg); + current_batch += 1; + if(current_batch == batch_size) + { + std::cout << " >> Txn attempt commit" << std::endl; + if (batch_index % 2 == 0) { + session.transaction_commit(); + } else { + session.transaction_abort(); + } + batch_index++; + } + } + } +}; + +int test_transaction() { + + std::string recv_address("127.0.0.1:0/test"); + test_recv recv(recv_address); + proton::container c(recv); + + std::thread thread_recv([&c]() -> void { c.run(); }); + + // wait until listener is ready + std::unique_lock lk(m); + cv.wait(lk, [] { return listener_ready; }); + + std::string send_address = + "127.0.0.1:" + std::to_string(listener_port) + "/test"; + test_send send(send_address); + proton::container(send).run(); + thread_recv.join(); + + return 0; +} + +int main(int argc, char **argv) { + int failed = 0; + RUN_ARGV_TEST(failed, test_transaction()); + return failed; +} From 41419db8e1b7618e3b0e0c5ebff61696997991fe Mon Sep 17 00:00:00 2001 From: Rakhi Kumari Date: Sun, 5 Oct 2025 00:51:05 +0530 Subject: [PATCH 2/5] Local Transaction tx_recv.cpp example shows abort() function --- cpp/examples/tx_recv.cpp | 74 ++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp index 0da433b78a..d94e3c680c 100644 --- a/cpp/examples/tx_recv.cpp +++ b/cpp/examples/tx_recv.cpp @@ -39,19 +39,22 @@ #include #include -class tx_recv : public proton::messaging_handler { +class tx_recv : public proton::messaging_handler, proton::transaction_handler { private: proton::sender sender; proton::receiver receiver; + proton::session session; std::string conn_url_; std::string addr_; - int expected; + int total; + int batch_size; int received = 0; - std::atomic unique_msg_id; + int current_batch = 0; + int batch_index = 0; public: - tx_recv(const std::string& u, const std::string &a, int c): - conn_url_(u), addr_(a), expected(c), unique_msg_id(20000) {} + tx_recv(const std::string& u, const std::string &a, int c, int b): + conn_url_(u), addr_(a), total(c), batch_size(b) {} void on_container_start(proton::container &c) override { c.connect(conn_url_); @@ -59,27 +62,60 @@ class tx_recv : public proton::messaging_handler { void on_connection_open(proton::connection& c) override { receiver = c.open_receiver(addr_); - sender = c.open_sender(addr_); + // sender = c.open_sender(addr_); } void on_session_open(proton::session &s) override { std::cout << "New session is open" << std::endl; + s.transaction_declare(*this); + session = s; } - void on_message(proton::delivery &d, proton::message &msg) override { - std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; - d.accept(); - proton::message reply_message; + void on_transaction_declare_failed(proton::session s) override { + std::cout << "Transaction declarion failed" << std::endl; + s.connection().close(); + exit(-1); + } - reply_message.id(std::atomic_fetch_add(&unique_msg_id, 1)); - reply_message.body(msg.body()); - reply_message.reply_to(receiver.source().address()); + void on_transaction_declared(proton::session s) override { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + } - sender.send(reply_message); + void on_transaction_committed(proton::session s) override { + std::cout << "Transaction commited" << std::endl; + received += batch_size; + if (received == total) { + std::cout << "All received messages committed, closing connection." << std::endl; + s.connection().close(); + } + else { + std::cout << "Re-declaring transaction now... to receive next batch." << std::endl; + s.transaction_declare(*this); + } + } + + void on_transaction_aborted(proton::session s) override { + std::cout << "Transaction aborted!" << std::endl; + std::cout << "Re-delaring transaction now..." << std::endl; + current_batch = 0; + s.transaction_declare(*this); + } - received += 1; - if (received == expected) { - receiver.connection().close(); + void on_message(proton::delivery &d, proton::message &msg) override { + std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; + d.accept(); + current_batch += 1; + if (current_batch == batch_size) { + // Batch complete + std::cout << "In this example we abort even batch index and commit otherwise." << std::endl; + if (batch_index % 2 == 1) { + std::cout << "Commiting transaction..." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Aborting transaction..." << std::endl; + session.transaction_abort(); + } + batch_index++; } } }; @@ -88,16 +124,18 @@ int main(int argc, char **argv) { std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; std::string addr = argc > 2 ? argv[2] : "examples"; int message_count = 6; + int batch_size = 3; example::options opts(argc, argv); opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL"); opts.add_value(addr, 'a', "address", "connect and send to address", "URL"); opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); + opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); try { opts.parse(); - tx_recv recv(conn_url, addr, message_count); + tx_recv recv(conn_url, addr, message_count, batch_size); proton::container(recv).run(); return 0; From cb6b0645ad31bd845397fbee990950f6b9829ed1 Mon Sep 17 00:00:00 2001 From: Rakhi Kumari Date: Mon, 13 Oct 2025 17:22:24 +0530 Subject: [PATCH 3/5] Update tx_send and tx_recv examples to fully work --- cpp/examples/tx_recv.cpp | 19 +++++++++++-------- cpp/examples/tx_send.cpp | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp index d94e3c680c..328571fb4b 100644 --- a/cpp/examples/tx_recv.cpp +++ b/cpp/examples/tx_recv.cpp @@ -24,12 +24,13 @@ #include #include #include -#include -#include #include +#include #include -#include +#include +#include #include +#include #include #include @@ -41,7 +42,6 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler { private: - proton::sender sender; proton::receiver receiver; proton::session session; std::string conn_url_; @@ -61,8 +61,10 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler { } void on_connection_open(proton::connection& c) override { - receiver = c.open_receiver(addr_); - // sender = c.open_sender(addr_); + // NOTE:credit_window(0) disables automatic flow control. + // We will use flow control to receive batches of messages in a transaction. + std::cout << "In this example we abort/commit transaction alternatively." << std::endl; + receiver = c.open_receiver(addr_, proton::receiver_options().credit_window(0)); } void on_session_open(proton::session &s) override { @@ -79,11 +81,13 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler { void on_transaction_declared(proton::session s) override { std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + receiver.add_credit(batch_size); } void on_transaction_committed(proton::session s) override { std::cout << "Transaction commited" << std::endl; - received += batch_size; + received += current_batch; + current_batch = 0; if (received == total) { std::cout << "All received messages committed, closing connection." << std::endl; s.connection().close(); @@ -107,7 +111,6 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler { current_batch += 1; if (current_batch == batch_size) { // Batch complete - std::cout << "In this example we abort even batch index and commit otherwise." << std::endl; if (batch_index % 2 == 1) { std::cout << "Commiting transaction..." << std::endl; session.transaction_commit(); diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp index 26d1cefc63..8498c1a69d 100644 --- a/cpp/examples/tx_send.cpp +++ b/cpp/examples/tx_send.cpp @@ -60,6 +60,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { } void on_connection_open(proton::connection& c) override { + std::cout << "In this example we abort/commit transaction alternatively." << std::endl; sender = c.open_sender(addr_); } @@ -104,7 +105,6 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { current_batch += 1; if(current_batch == batch_size) { - std::cout << "In this example we commit even batch index and abort otherwise." << std::endl; if (batch_index % 2 == 0) { std::cout << "Commiting transaction..." << std::endl; session.transaction_commit(); From 63d16bb60e0d9db5201e247c9e08da83d1b37eaf Mon Sep 17 00:00:00 2001 From: Rakhi Kumari Date: Wed, 22 Oct 2025 22:52:47 +0530 Subject: [PATCH 4/5] Add a test file and few improvements --- cpp/CMakeLists.txt | 1 - cpp/examples/tx_recv.cpp | 27 +- cpp/examples/tx_send.cpp | 27 +- cpp/include/proton/fwd.hpp | 1 - cpp/include/proton/messaging_handler.hpp | 9 + cpp/include/proton/session.hpp | 7 +- cpp/include/proton/transaction_handler.hpp | 61 ---- cpp/src/contexts.hpp | 1 + cpp/src/delivery.cpp | 4 +- cpp/src/handler.cpp | 4 + cpp/src/messaging_adapter.cpp | 33 +- cpp/src/session.cpp | 35 +- cpp/src/transaction_handler.cpp | 44 --- cpp/src/transaction_test.cpp | 401 +++++++++++++++------ cpp/tests.cmake | 3 + 15 files changed, 348 insertions(+), 310 deletions(-) delete mode 100644 cpp/include/proton/transaction_handler.hpp delete mode 100644 cpp/src/transaction_handler.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f79590e9ef..287bbb5af7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -120,7 +120,6 @@ set(qpid-proton-cpp-source src/terminus.cpp src/timestamp.cpp src/tracker.cpp - src/transaction_handler.cpp src/transfer.cpp src/transport.cpp src/type_id.cpp diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp index 328571fb4b..df550dd825 100644 --- a/cpp/examples/tx_recv.cpp +++ b/cpp/examples/tx_recv.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include @@ -40,7 +39,7 @@ #include #include -class tx_recv : public proton::messaging_handler, proton::transaction_handler { +class tx_recv : public proton::messaging_handler { private: proton::receiver receiver; proton::session session; @@ -68,23 +67,23 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler { } void on_session_open(proton::session &s) override { - std::cout << "New session is open" << std::endl; - s.transaction_declare(*this); - session = s; + if(!s.transaction_is_declared()) { + std::cout << "New session is open" << std::endl; + s.transaction_declare(*this); + session = s; + } else { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + receiver.add_credit(batch_size); + } } - void on_transaction_declare_failed(proton::session s) override { - std::cout << "Transaction declarion failed" << std::endl; + void on_session_error(proton::session &s) override { + std::cout << "Session error: " << s.error().what() << std::endl; s.connection().close(); exit(-1); } - void on_transaction_declared(proton::session s) override { - std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; - receiver.add_credit(batch_size); - } - - void on_transaction_committed(proton::session s) override { + void on_session_transaction_committed(proton::session &s) override { std::cout << "Transaction commited" << std::endl; received += current_batch; current_batch = 0; @@ -98,7 +97,7 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler { } } - void on_transaction_aborted(proton::session s) override { + void on_session_transaction_aborted(proton::session &s) override { std::cout << "Transaction aborted!" << std::endl; std::cout << "Re-delaring transaction now..." << std::endl; current_batch = 0; diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp index 8498c1a69d..0a268c80cf 100644 --- a/cpp/examples/tx_send.cpp +++ b/cpp/examples/tx_send.cpp @@ -28,7 +28,6 @@ #include #include #include -#include #include #include @@ -38,7 +37,7 @@ #include #include -class tx_send : public proton::messaging_handler, proton::transaction_handler { +class tx_send : public proton::messaging_handler { private: proton::sender sender; std::string conn_url_; @@ -65,27 +64,27 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { } void on_session_open(proton::session& s) override { - std::cout << "New session is open, declaring transaction now..." << std::endl; - s.transaction_declare(*this); + if(!s.transaction_is_declared()) { + std::cout << "New session is open, declaring transaction now..." << std::endl; + s.transaction_declare(*this); + } else { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + send(); + } } - void on_transaction_declare_failed(proton::session s) override { - std::cout << "Transaction declarion failed" << std::endl; + void on_session_error(proton::session &s) override { + std::cout << "Session error: " << s.error().what() << std::endl; s.connection().close(); exit(-1); } - void on_transaction_commit_failed(proton::session s) override { + void on_session_transaction_commit_failed(proton::session &s) override { std::cout << "Transaction commit failed!" << std::endl; s.connection().close(); exit(-1); } - void on_transaction_declared(proton::session s) override { - std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; - send(); - } - void on_sendable(proton::sender&) override { send(); } @@ -117,7 +116,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { } } - void on_transaction_committed(proton::session s) override { + void on_session_transaction_committed(proton::session &s) override { committed += current_batch; current_batch = 0; std::cout << "Transaction commited" << std::endl; @@ -131,7 +130,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { } } - void on_transaction_aborted(proton::session s) override { + void on_session_transaction_aborted(proton::session &s) override { std::cout << "Transaction aborted!" << std::endl; std::cout << "Re-delaring transaction now..." << std::endl; current_batch = 0; diff --git a/cpp/include/proton/fwd.hpp b/cpp/include/proton/fwd.hpp index 31e4d2c49e..801d269323 100644 --- a/cpp/include/proton/fwd.hpp +++ b/cpp/include/proton/fwd.hpp @@ -55,7 +55,6 @@ class source_options; class ssl; class target_options; class tracker; -class transaction_handler; class transport; class url; class void_function0; diff --git a/cpp/include/proton/messaging_handler.hpp b/cpp/include/proton/messaging_handler.hpp index 213dbe73ef..f41b990533 100644 --- a/cpp/include/proton/messaging_handler.hpp +++ b/cpp/include/proton/messaging_handler.hpp @@ -172,6 +172,15 @@ PN_CPP_CLASS_EXTERN messaging_handler { /// The remote peer closed the session with an error condition. PN_CPP_EXTERN virtual void on_session_error(session&); + /// Called when a local transaction is discharged successfully. + PN_CPP_EXTERN virtual void on_session_transaction_committed(session&); + + /// Called when the commit of a local transaction fails. + PN_CPP_EXTERN virtual void on_session_transaction_commit_failed(session&); + + /// Called when a local transaction is discharged unsuccessfully (aborted). + PN_CPP_EXTERN virtual void on_session_transaction_aborted(session&); + /// The remote peer opened the link. PN_CPP_EXTERN virtual void on_receiver_open(receiver&); diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp index 710b2ad870..350c9dfa00 100644 --- a/cpp/include/proton/session.hpp +++ b/cpp/include/proton/session.hpp @@ -37,10 +37,6 @@ struct pn_session_t; namespace proton { -/// @cond INTERNAL -class transaction_impl; -/// @endcond - /// A container of senders and receivers. class PN_CPP_CLASS_EXTERN session : public internal::object, public endpoint { @@ -109,7 +105,7 @@ PN_CPP_CLASS_EXTERN session : public internal::object, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; - PN_CPP_EXTERN void transaction_declare(proton::transaction_handler &handler, bool settle_before_discharge = false); + PN_CPP_EXTERN void transaction_declare(proton::messaging_handler &handler, bool settle_before_discharge = false); PN_CPP_EXTERN bool transaction_is_declared(); PN_CPP_EXTERN proton::binary transaction_id() const; PN_CPP_EXTERN void transaction_commit(); @@ -121,7 +117,6 @@ PN_CPP_CLASS_EXTERN session : public internal::object, public endp friend class internal::factory; friend class sender; friend class session_iterator; - friend class transaction_impl; /// @endcond }; diff --git a/cpp/include/proton/transaction_handler.hpp b/cpp/include/proton/transaction_handler.hpp deleted file mode 100644 index 1e229fbf56..0000000000 --- a/cpp/include/proton/transaction_handler.hpp +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef PROTON_TRANSACTION_HPP -#define PROTON_TRANSACTION_HPP - - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include "./fwd.hpp" -#include "./internal/export.hpp" -#include "./sender.hpp" -#include "./tracker.hpp" -#include "./container.hpp" - -/// @file -/// @copybrief proton::transaction - -namespace proton { - -class -PN_CPP_CLASS_EXTERN transaction_handler { - public: - PN_CPP_EXTERN virtual ~transaction_handler(); - - /// Called when a local transaction is declared. - PN_CPP_EXTERN virtual void on_transaction_declared(session); - - /// Called when a local transaction is discharged successfully. - PN_CPP_EXTERN virtual void on_transaction_committed(session); - - /// Called when a local transaction is discharged unsuccessfully (aborted). - PN_CPP_EXTERN virtual void on_transaction_aborted(session); - - /// Called when a local transaction declare fails. - PN_CPP_EXTERN virtual void on_transaction_declare_failed(session); - - /// Called when the commit of a local transaction fails. - PN_CPP_EXTERN virtual void on_transaction_commit_failed(session); -}; - -} // namespace proton - -#endif // PROTON_TRANSACTION_HPP diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 9025483e6b..5f0bdc9230 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -42,6 +42,7 @@ namespace proton { class proton_handler; class connector; +class transaction_impl; namespace io {class link_namer;} diff --git a/cpp/src/delivery.cpp b/cpp/src/delivery.cpp index ec986a5f4a..25b631d6a7 100644 --- a/cpp/src/delivery.cpp +++ b/cpp/src/delivery.cpp @@ -36,12 +36,12 @@ namespace { void settle_delivery(pn_delivery_t* o, uint64_t state) { proton::session session = proton::make_wrapper(o).session(); if(session.transaction_is_declared()) { - // Transactional disposition + // Transactional disposition auto disp = pn_transactional_disposition(pn_delivery_local(o)); pn_transactional_disposition_set_id(disp, pn_bytes(session.transaction_id())); pn_transactional_disposition_set_outcome_type(disp, state); } else { - pn_delivery_update(o, state); + pn_delivery_update(o, state); } pn_delivery_settle(o); } diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp index 1632efda64..c62966cd91 100644 --- a/cpp/src/handler.cpp +++ b/cpp/src/handler.cpp @@ -65,6 +65,10 @@ void messaging_handler::on_session_open(session &s) { pn_session_open(unwrap(s)); } } +void messaging_handler::on_session_transaction_committed(session &) {} +void messaging_handler::on_session_transaction_commit_failed(session &) {} +void messaging_handler::on_session_transaction_aborted(session &) {} + void messaging_handler::on_receiver_close(receiver &) {} void messaging_handler::on_receiver_error(receiver &l) { on_error(l.error()); } void messaging_handler::on_receiver_open(receiver &l) { diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index 85f9a2c085..b68f49fe22 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -119,29 +119,7 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { link_context& lctx = link_context::get(lnk); Tracing& ot = Tracing::getTracing(); - if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { - if (pn_delivery_updated(dlv)) { - tracker t(make_wrapper(dlv)); - ot.on_settled_span(t); - switch (pn_delivery_remote_state(dlv)) { - case PN_ACCEPTED: - handler.on_tracker_accept(t); - break; - case PN_REJECTED: - handler.on_tracker_reject(t); - break; - case PN_RELEASED: - case PN_MODIFIED: - handler.on_tracker_release(t); - break; - } - if (t.settled()) { - handler.on_tracker_settle(t); - if (lctx.auto_settle) - t.settle(); - } - } - } else if (pn_link_is_receiver(lnk)) { + if (pn_link_is_receiver(lnk)) { delivery d(make_wrapper(dlv)); if (pn_delivery_aborted(dlv)) { pn_delivery_settle(dlv); @@ -299,15 +277,6 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { auto lnk = pn_event_link(event); - if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { - auto cond = pn_link_condition(lnk); - if (pn_condition_is_set(cond)) { - pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED"); - pn_link_close(lnk); - return; - } - return; - } if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link // Copy source and target from remote end. pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk)); diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index 6520d36f26..13d43229cf 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -28,7 +28,6 @@ #include "proton/sender_options.hpp" #include "proton/session_options.hpp" #include "proton/target_options.hpp" -#include "proton/transaction_handler.hpp" #include "proton/messaging_handler.hpp" #include "proton/tracker.hpp" #include "proton/transfer.hpp" @@ -162,7 +161,7 @@ void* session::user_data() const { class transaction_impl { public: proton::sender txn_ctrl; - proton::transaction_handler *handler = nullptr; + proton::messaging_handler *handler = nullptr; proton::binary transaction_id; bool failed = false; enum State { @@ -184,7 +183,7 @@ class transaction_impl { proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); void handle_outcome(proton::tracker t); transaction_impl(proton::sender &_txn_ctrl, - proton::transaction_handler &_handler, + proton::messaging_handler &_handler, bool _settle_before_discharge); ~transaction_impl(); }; @@ -202,7 +201,7 @@ void transaction_delete(const session& s) { auto &_txn_impl = session_context::g } -void session::transaction_declare(proton::transaction_handler &handler, bool settle_before_discharge) { +void session::transaction_declare(proton::messaging_handler &handler, bool settle_before_discharge) { auto &txn_impl = session_context::get(pn_object())._txn_impl; if (txn_impl == nullptr) { // Create _txn_impl @@ -245,7 +244,7 @@ void session::transaction_abort() { session_context::get(pn_object())._txn_impl- bool session::transaction_is_declared() { return (!transaction_is_empty(*this)) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; } transaction_impl::transaction_impl(proton::sender &_txn_ctrl, - proton::transaction_handler &_handler, + proton::messaging_handler &_handler, bool _settle_before_discharge) : txn_ctrl(_txn_ctrl), handler(&_handler) { } @@ -310,6 +309,7 @@ void transaction_impl::release_pending() { void transaction_impl::handle_outcome(proton::tracker t) { pn_disposition_t *disposition = pn_delivery_remote(unwrap(t)); + proton::session session = t.session(); if (state == State::DECLARING) { // Attempting to declare transaction proton::value val(pn_disposition_data(disposition)); @@ -317,17 +317,18 @@ void transaction_impl::handle_outcome(proton::tracker t) { if (vd.size() > 0) { transaction_id = vd[0]; state = State::DECLARED; - handler->on_transaction_declared(t.session()); + handler->on_session_open(session); return; } else if (pn_disposition_is_failed(disposition)) { state = State::FREE; - transaction_delete(t.session()); - handler->on_transaction_declare_failed(t.session()); + transaction_delete(session); + // on_transaction_declare_failed + handler->on_session_error(session); return; } else { state = State::FREE; - transaction_delete(t.session()); - handler->on_transaction_declare_failed(t.session()); + transaction_delete(session); + handler->on_session_error(session); return; } } else if (state == State::DISCHARGING) { @@ -335,13 +336,13 @@ void transaction_impl::handle_outcome(proton::tracker t) { if (pn_disposition_is_failed(disposition)) { if (!failed) { state = State::FREE; - transaction_delete(t.session()); - handler->on_transaction_commit_failed(t.session()); + transaction_delete(session); + handler->on_session_transaction_commit_failed(session); release_pending(); return; } else { state = State::FREE; - transaction_delete(t.session()); + transaction_delete(session); // Transaction abort failed. return; } @@ -349,15 +350,15 @@ void transaction_impl::handle_outcome(proton::tracker t) { if (failed) { // Transaction abort is successful state = State::FREE; - transaction_delete(t.session()); - handler->on_transaction_aborted(t.session()); + transaction_delete(session); + handler->on_session_transaction_aborted(session); release_pending(); return; } else { // Transaction commit is successful state = State::FREE; - transaction_delete(t.session()); - handler->on_transaction_committed(t.session()); + transaction_delete(session); + handler->on_session_transaction_committed(session); return; } } diff --git a/cpp/src/transaction_handler.cpp b/cpp/src/transaction_handler.cpp deleted file mode 100644 index bee861f9db..0000000000 --- a/cpp/src/transaction_handler.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/transaction_handler.hpp" -#include "proton/delivery.h" -#include "proton/delivery.hpp" -#include "proton/message.hpp" -#include "proton/target_options.hpp" -#include "proton/tracker.hpp" -#include "proton/transfer.hpp" - -#include "proton_bits.hpp" -#include - -#include - -namespace proton { - -transaction_handler::~transaction_handler() = default; -void transaction_handler::on_transaction_declared(session) {} -void transaction_handler::on_transaction_committed(session) {} -void transaction_handler::on_transaction_aborted(session) {} -void transaction_handler::on_transaction_declare_failed(session) {} -void transaction_handler::on_transaction_commit_failed(session) {} - -} diff --git a/cpp/src/transaction_test.cpp b/cpp/src/transaction_test.cpp index cb4207f711..1ae7fd95bd 100644 --- a/cpp/src/transaction_test.cpp +++ b/cpp/src/transaction_test.cpp @@ -1,148 +1,313 @@ - +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ #include #include +#include #include #include -#include -#include +#include "proton/codec/decoder.hpp" +#include "proton/codec/encoder.hpp" #include #include -#include #include -#include -#include +#include +#include +#include #include -#include +#include "proton_bits.hpp" +#include +#include #include -#include - -#include "test_bits.hpp" +#include +#include // C++ API doesn't export disposition +#include "test_bits.hpp" // For RUN_ARGV_TEST and ASSERT_EQUAL +#include "types_internal.hpp" +#include +#include #include -#include +#include +#include +#include #include #include +#include namespace { std::mutex m; std::condition_variable cv; bool listener_ready = false; int listener_port; -} //namespace - -class test_recv : public proton::messaging_handler { - private: - class listener_ready_handler : public proton::listen_handler { - void on_open(proton::listener &l) override { - { - std::lock_guard lk(m); - listener_port = l.port(); - listener_ready = true; - } - cv.notify_one(); - } - }; - - std::string url; - proton::listener listener; - listener_ready_handler listen_handler; - - public: - test_recv(const std::string &s) : url(s) {} - - void on_container_start(proton::container &c) override { - listener = c.listen(url, listen_handler); - } - - void on_message(proton::delivery &d, proton::message &msg) override { - - } +const proton::binary fake_txn_id("prqs5678-abcd-efgh-1a2b-3c4d5e6f7g8e"); +} // namespace + +void wait_for_promise_or_fail(std::promise& prom, const std::string& what) { + if (prom.get_future().wait_for(std::chrono::seconds(5)) == std::future_status::timeout) { + FAIL("Test FAILED: Did not receive '" << what << "' in time."); + } +} + +class FakeBroker : public proton::messaging_handler { + private: + class listener_ready_handler : public proton::listen_handler { + void on_open(proton::listener& l) override { + std::lock_guard lk(m); + listener_port = l.port(); + listener_ready = true; + cv.notify_one(); + } + }; + std::string url; + listener_ready_handler listen_handler; + proton::receiver coordinator_link; + + public: + proton::listener listener; + std::map> transactions_messages; + std::promise declare_promise; + std::promise commit_promise; + std::promise abort_promise; + + FakeBroker(const std::string& s) : url(s) {} + + void on_container_start(proton::container& c) override { + listener = c.listen(url, listen_handler); + } + + void on_connection_open(proton::connection& c) override { + c.open(proton::connection_options{}.offered_capabilities({"ANONYMOUS-RELAY"})); + } + + void on_receiver_open(proton::receiver& r) override { + // Identify the transaction link. + if(r.target().capabilities().size() > 0 && + r.target().capabilities()[0] == proton::symbol("amqp:local-transactions")) { + coordinator_link = r; + } + r.open(); + } + + void on_message(proton::delivery& d, proton::message& m) override { + if (coordinator_link.active() && d.receiver() == coordinator_link) { + handle_transaction_control(d, m); + } else { + handle_application_message(d, m); + } + } + + void handle_application_message(proton::delivery& d, proton::message& m) { + auto disp = pn_transactional_disposition(pn_delivery_remote(unwrap(d))); + if (disp != NULL) { + // transactional message + proton::binary txn_id = proton::bin(pn_transactional_disposition_get_id(disp)); + transactions_messages[txn_id].push_back(m); + } + } + + void handle_transaction_control(proton::delivery& d, proton::message& m) { + proton::codec::decoder dec(m.body()); + proton::symbol descriptor; + proton::value _value; + proton::type_id _t = dec.next_type(); + + if (_t == proton::type_id::DESCRIBED) { + proton::codec::start s; + dec >> s >> descriptor >> _value >> proton::codec::finish(); + } else { + std::cerr << "[fake_broker] Invalid transaction control message format: " << to_string(m) << std::endl; + d.reject(); + return; + } + + if (descriptor == "amqp:declare:list") { + pn_delivery_t* pd = proton::unwrap(d); + pn_disposition_t* disp = pn_delivery_local(pd); + pn_custom_disposition_t *custom_disp = pn_custom_disposition(disp); + pn_custom_disposition_set_type(custom_disp, 0x33); + pn_data_t* pn_data = pn_custom_disposition_data(custom_disp); + pn_data_put_list(pn_data); + pn_data_enter(pn_data); + pn_data_put_binary(pn_data, pn_bytes(fake_txn_id.size(), reinterpret_cast(&fake_txn_id[0]))); + pn_data_exit(pn_data); + pn_delivery_settle(pd); + + std::cout << "[BROKER] transaction declared: " << fake_txn_id << std::endl; + declare_promise.set_value(); + } else if (descriptor == "amqp:discharge:list") { + // Commit / Abort transaction. + std::vector vd; + proton::get(_value, vd); + ASSERT_EQUAL(vd.size(), 2u); + proton::binary txn_id = vd[0].get(); + bool is_abort = vd[1].get(); + if (!is_abort) { + // Commit + std::cout << "[BROKER] transaction commited:" << txn_id << std::endl; + // As part of this test, we don't need to forward transactions_messages. + // We are leaving the messages here to count them later on. + commit_promise.set_value(); + d.accept(); + } else { + // Abort + std::cout << "[BROKER] transaction aborted:" << txn_id << std::endl; + transactions_messages.erase(txn_id); + abort_promise.set_value(); + d.accept(); + } + // Closing the connection as we are testing till commit/abort. + d.receiver().close(); + d.connection().close(); + listener.stop(); + } + } }; -class test_send : public proton::messaging_handler, proton::transaction_handler { - private: - std::string url; - proton::sender sender; - proton::session session; - int batch_index = 0; - int current_batch = 0; - int committed = 0; - int confirmed = 0; - - int batch_size = 3; - int total = 6; - public: - test_send(const std::string &s) : url(s) {} - - void on_container_start(proton::container &c) override { - proton::connection_options co; - sender = c.open_sender(url, co); - } - - - void on_session_open(proton::session &s) override { - session = s; - s.declare_transaction(*this); - } - - - void on_transaction_declared(proton::session s) override { - send(sender); - } - - void on_sendable(proton::sender &s) override { - send(s); - } - - void send(proton::sender &s) { - static int unique_id = 10000; - while (session.transaction_is_declared() && sender.credit() && - (committed + current_batch) < total) { - proton::message msg; - std::map m; - m["sequence"] = committed + current_batch; - - msg.id(unique_id++); - msg.body(m); - s.send(msg); - current_batch += 1; - if(current_batch == batch_size) - { - std::cout << " >> Txn attempt commit" << std::endl; - if (batch_index % 2 == 0) { - session.transaction_commit(); - } else { - session.transaction_abort(); - } - batch_index++; - } - } - } +class test_client : public proton::messaging_handler { + private: + std::string server_address_; + int messages_left_; + bool is_commit_; + + public: + proton::binary last_txn_id; + proton::sender sender_; + std::promise block_declare_transaction_on_session; + std::promise transaction_finished_promise; + + test_client(const std::string& s) : + server_address_(s), messages_left_(0) {} + + void on_container_start(proton::container& c) override { + c.connect(server_address_); + } + + void on_connection_open(proton::connection& c) override { + sender_ = c.open_sender("/test"); + } + + void on_session_open(proton::session& s) override { + if (!s.transaction_is_declared()) { + wait_for_promise_or_fail(block_declare_transaction_on_session, "waiting on test to be ready"); + s.transaction_declare(*this); + } else { + last_txn_id = s.transaction_id(); + std::cout << "Client: Transaction declared successfully: " << last_txn_id << std::endl; + send(); + } + } + + void schedule_messages_in_transaction(int count, bool is_commit) { + messages_left_ = count; + is_commit_ = is_commit; + } + + void on_sendable(proton::sender&) override { + send(); + } + + void send() { + proton::session session = sender_.session(); + while (session.transaction_is_declared() && sender_.credit() && + messages_left_ > 0) { + proton::message msg("hello"); + sender_.send(msg); + messages_left_--; + if (messages_left_ == 0) { + if (is_commit_) { + std::cout << "Client: Committing transaction." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Client: Aborting transaction." << std::endl; + session.transaction_abort(); + } + } + } + } + + void on_session_transaction_committed(proton::session &s) override { + std::cout << "Client: Transaction committed" << std::endl; + transaction_finished_promise.set_value(); + s.connection().close(); + } + + void on_session_transaction_aborted(proton::session &s) override { + std::cout << "Client: Transaction aborted" << std::endl; + transaction_finished_promise.set_value(); + s.connection().close(); + } }; -int test_transaction() { - - std::string recv_address("127.0.0.1:0/test"); - test_recv recv(recv_address); - proton::container c(recv); +void test_transaction_commit(FakeBroker &broker, test_client &client) { + std::cout << "Starting test_transaction_commit..." << std::endl; + + const unsigned int messages_in_txn = 5; + client.schedule_messages_in_transaction(messages_in_txn, true); + client.block_declare_transaction_on_session.set_value(); + + wait_for_promise_or_fail(broker.declare_promise, "declare in broker"); + wait_for_promise_or_fail(broker.commit_promise, "commit in broker"); - std::thread thread_recv([&c]() -> void { c.run(); }); + // Only one transaction + ASSERT_EQUAL(broker.transactions_messages.size(), 1u); + // Check message count inside broker + ASSERT_EQUAL(broker.transactions_messages[fake_txn_id].size(), messages_in_txn); +} + +void test_transaction_abort(FakeBroker &broker, test_client &client) { + std::cout << "Starting test_transaction_abort..." << std::endl; - // wait until listener is ready - std::unique_lock lk(m); - cv.wait(lk, [] { return listener_ready; }); + const unsigned int messages_in_txn = 5; + client.schedule_messages_in_transaction(messages_in_txn, false); + client.block_declare_transaction_on_session.set_value(); - std::string send_address = - "127.0.0.1:" + std::to_string(listener_port) + "/test"; - test_send send(send_address); - proton::container(send).run(); - thread_recv.join(); + wait_for_promise_or_fail(broker.declare_promise, "declare in broker"); + wait_for_promise_or_fail(broker.commit_promise, "commit in broker"); - return 0; + // Only zero transactions + ASSERT_EQUAL(broker.transactions_messages.size(), 0u); } -int main(int argc, char **argv) { - int failed = 0; - RUN_ARGV_TEST(failed, test_transaction()); - return failed; +int main(int argc, char** argv) { + int tests_failed = 0; + + std::string broker_address("127.0.0.1:0"); + FakeBroker broker(broker_address); + + proton::container broker_container(broker); + std::thread broker_thread([&broker_container]() -> void { broker_container.run(); }); + + // Wait for the listener + std::unique_lock lk(m); + cv.wait(lk, [] { return listener_ready; }); + + + std::string server_address = "127.0.0.1:" + std::to_string(listener_port); + test_client client(server_address); + + proton::container client_container(client); + std::thread client_thread([&client_container]() -> void { client_container.run(); }); + + RUN_ARGV_TEST(tests_failed, test_transaction_commit(broker, client)); + // RUN_ARGV_TEST(tests_failed, test_transaction_abort(broker, client)); + + broker_thread.join(); + client_thread.join(); + + return tests_failed; } diff --git a/cpp/tests.cmake b/cpp/tests.cmake index b00d501295..85f1ae2696 100644 --- a/cpp/tests.cmake +++ b/cpp/tests.cmake @@ -63,6 +63,9 @@ add_cpp_test(link_test) add_cpp_test(credit_test) add_cpp_test(delivery_test) add_cpp_test(context_test) +add_cpp_test(transaction_test) +target_link_libraries(transaction_test qpid-proton-core) + if (ENABLE_JSONCPP) add_cpp_test(connect_config_test) target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled From ac1950fa03d1f3308550fe4849f0aed582a149dc Mon Sep 17 00:00:00 2001 From: Rakhi Kumari Date: Mon, 3 Nov 2025 11:46:48 +0530 Subject: [PATCH 5/5] Rebase and fix error 'no data' --- cpp/src/session.cpp | 17 ++++++----------- cpp/src/transaction_test.cpp | 12 ++++-------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index 13d43229cf..aabf421613 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -309,26 +309,21 @@ void transaction_impl::release_pending() { void transaction_impl::handle_outcome(proton::tracker t) { pn_disposition_t *disposition = pn_delivery_remote(unwrap(t)); + pn_declared_disposition_t *declared_disp = pn_declared_disposition(disposition); proton::session session = t.session(); if (state == State::DECLARING) { // Attempting to declare transaction - proton::value val(pn_disposition_data(disposition)); - auto vd = get>(val); - if (vd.size() > 0) { - transaction_id = vd[0]; - state = State::DECLARED; - handler->on_session_open(session); - return; - } else if (pn_disposition_is_failed(disposition)) { + if (pn_disposition_is_failed(disposition)) { state = State::FREE; transaction_delete(session); // on_transaction_declare_failed handler->on_session_error(session); return; } else { - state = State::FREE; - transaction_delete(session); - handler->on_session_error(session); + pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp); + transaction_id = proton::bin(txn_id); + state = State::DECLARED; + handler->on_session_open(session); return; } } else if (state == State::DISCHARGING) { diff --git a/cpp/src/transaction_test.cpp b/cpp/src/transaction_test.cpp index 1ae7fd95bd..df7d61689a 100644 --- a/cpp/src/transaction_test.cpp +++ b/cpp/src/transaction_test.cpp @@ -136,15 +136,12 @@ class FakeBroker : public proton::messaging_handler { } if (descriptor == "amqp:declare:list") { + pn_bytes_t txn_id = pn_bytes(fake_txn_id.size(), reinterpret_cast(&fake_txn_id[0])); + pn_delivery_t* pd = proton::unwrap(d); pn_disposition_t* disp = pn_delivery_local(pd); - pn_custom_disposition_t *custom_disp = pn_custom_disposition(disp); - pn_custom_disposition_set_type(custom_disp, 0x33); - pn_data_t* pn_data = pn_custom_disposition_data(custom_disp); - pn_data_put_list(pn_data); - pn_data_enter(pn_data); - pn_data_put_binary(pn_data, pn_bytes(fake_txn_id.size(), reinterpret_cast(&fake_txn_id[0]))); - pn_data_exit(pn_data); + pn_declared_disposition_t* declared_disp = pn_declared_disposition(disp); + pn_declared_disposition_set_id(declared_disp, txn_id); pn_delivery_settle(pd); std::cout << "[BROKER] transaction declared: " << fake_txn_id << std::endl; @@ -304,7 +301,6 @@ int main(int argc, char** argv) { std::thread client_thread([&client_container]() -> void { client_container.run(); }); RUN_ARGV_TEST(tests_failed, test_transaction_commit(broker, client)); - // RUN_ARGV_TEST(tests_failed, test_transaction_abort(broker, client)); broker_thread.join(); client_thread.join();