2424#include < proton/connection.hpp>
2525#include < proton/container.hpp>
2626#include < proton/delivery.hpp>
27- #include < proton/source.hpp>
28- #include < proton/message.hpp>
2927#include < proton/message_id.hpp>
28+ #include < proton/message.hpp>
3029#include < proton/messaging_handler.hpp>
31- #include < proton/types.hpp>
30+ #include < proton/receiver_options.hpp>
31+ #include < proton/source.hpp>
3232#include < proton/transaction_handler.hpp>
33+ #include < proton/types.hpp>
3334
3435#include < iostream>
3536#include < map>
4142
4243class tx_recv : public proton ::messaging_handler, proton::transaction_handler {
4344 private:
44- proton::sender sender;
4545 proton::receiver receiver;
4646 proton::session session;
4747 std::string conn_url_;
@@ -61,8 +61,10 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler {
6161 }
6262
6363 void on_connection_open (proton::connection& c) override {
64- receiver = c.open_receiver (addr_);
65- // sender = c.open_sender(addr_);
64+ // NOTE:credit_window(0) disables automatic flow control.
65+ // We will use flow control to receive batches of messages in a transaction.
66+ std::cout << " In this example we abort/commit transaction alternatively." << std::endl;
67+ receiver = c.open_receiver (addr_, proton::receiver_options ().credit_window (0 ));
6668 }
6769
6870 void on_session_open (proton::session &s) override {
@@ -79,11 +81,13 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler {
7981
8082 void on_transaction_declared (proton::session s) override {
8183 std::cout << " Transaction is declared: " << s.transaction_id () << std::endl;
84+ receiver.add_credit (batch_size);
8285 }
8386
8487 void on_transaction_committed (proton::session s) override {
8588 std::cout << " Transaction commited" << std::endl;
86- received += batch_size;
89+ received += current_batch;
90+ current_batch = 0 ;
8791 if (received == total) {
8892 std::cout << " All received messages committed, closing connection." << std::endl;
8993 s.connection ().close ();
@@ -107,7 +111,6 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler {
107111 current_batch += 1 ;
108112 if (current_batch == batch_size) {
109113 // Batch complete
110- std::cout << " In this example we abort even batch index and commit otherwise." << std::endl;
111114 if (batch_index % 2 == 1 ) {
112115 std::cout << " Commiting transaction..." << std::endl;
113116 session.transaction_commit ();
0 commit comments