3939#include < chrono>
4040#include < thread>
4141
42- class tx_recv : public proton ::messaging_handler {
42+ class tx_recv : public proton ::messaging_handler, proton::transaction_handler {
4343 private:
4444 proton::sender sender;
4545 proton::receiver receiver;
46+ proton::session session;
4647 std::string conn_url_;
4748 std::string addr_;
48- int expected;
49+ int total;
50+ int batch_size;
4951 int received = 0 ;
50- std::atomic<int > unique_msg_id;
52+ int current_batch = 0 ;
53+ int batch_index = 0 ;
5154
5255 public:
53- tx_recv (const std::string& u, const std::string &a, int c):
54- conn_url_ (u), addr_(a), expected (c), unique_msg_id( 20000 ) {}
56+ tx_recv (const std::string& u, const std::string &a, int c, int b ):
57+ conn_url_ (u), addr_(a), total (c), batch_size(b ) {}
5558
5659 void on_container_start (proton::container &c) override {
5760 c.connect (conn_url_);
5861 }
5962
6063 void on_connection_open (proton::connection& c) override {
6164 receiver = c.open_receiver (addr_);
62- sender = c.open_sender (addr_);
65+ // sender = c.open_sender(addr_);
6366 }
6467
6568 void on_session_open (proton::session &s) override {
6669 std::cout << " New session is open" << std::endl;
70+ s.transaction_declare (*this );
71+ session = s;
6772 }
6873
69- void on_message (proton::delivery &d, proton::message &msg) override {
70- std::cout<<" # MESSAGE: " << msg.id () <<" : " << msg.body () << std::endl;
71- d.accept ();
72- proton::message reply_message;
74+ void on_transaction_declare_failed (proton::session s) override {
75+ std::cout << " Transaction declarion failed" << std::endl;
76+ s.connection ().close ();
77+ exit (-1 );
78+ }
7379
74- reply_message. id ( std::atomic_fetch_add (&unique_msg_id, 1 ));
75- reply_message. body (msg. body ()) ;
76- reply_message. reply_to (receiver. source (). address ());
80+ void on_transaction_declared (proton::session s) override {
81+ std::cout << " Transaction is declared: " << s. transaction_id () << std::endl ;
82+ }
7783
78- sender.send (reply_message);
84+ void on_transaction_committed (proton::session s) override {
85+ std::cout << " Transaction commited" << std::endl;
86+ received += batch_size;
87+ if (received == total) {
88+ std::cout << " All received messages committed, closing connection." << std::endl;
89+ s.connection ().close ();
90+ }
91+ else {
92+ std::cout << " Re-declaring transaction now... to receive next batch." << std::endl;
93+ s.transaction_declare (*this );
94+ }
95+ }
96+
97+ void on_transaction_aborted (proton::session s) override {
98+ std::cout << " Transaction aborted!" << std::endl;
99+ std::cout << " Re-delaring transaction now..." << std::endl;
100+ current_batch = 0 ;
101+ s.transaction_declare (*this );
102+ }
79103
80- received += 1 ;
81- if (received == expected) {
82- receiver.connection ().close ();
104+ void on_message (proton::delivery &d, proton::message &msg) override {
105+ std::cout<<" # MESSAGE: " << msg.id () <<" : " << msg.body () << std::endl;
106+ d.accept ();
107+ current_batch += 1 ;
108+ if (current_batch == batch_size) {
109+ // Batch complete
110+ std::cout << " In this example we abort even batch index and commit otherwise." << std::endl;
111+ if (batch_index % 2 == 1 ) {
112+ std::cout << " Commiting transaction..." << std::endl;
113+ session.transaction_commit ();
114+ } else {
115+ std::cout << " Aborting transaction..." << std::endl;
116+ session.transaction_abort ();
117+ }
118+ batch_index++;
83119 }
84120 }
85121};
@@ -88,16 +124,18 @@ int main(int argc, char **argv) {
88124 std::string conn_url = argc > 1 ? argv[1 ] : " //127.0.0.1:5672" ;
89125 std::string addr = argc > 2 ? argv[2 ] : " examples" ;
90126 int message_count = 6 ;
127+ int batch_size = 3 ;
91128 example::options opts (argc, argv);
92129
93130 opts.add_value (conn_url, ' u' , " url" , " connect and send to URL" , " URL" );
94131 opts.add_value (addr, ' a' , " address" , " connect and send to address" , " URL" );
95132 opts.add_value (message_count, ' m' , " messages" , " number of messages to send" , " COUNT" );
133+ opts.add_value (batch_size, ' b' , " batch_size" , " number of messages in each transaction" , " BATCH_SIZE" );
96134
97135 try {
98136 opts.parse ();
99137
100- tx_recv recv (conn_url, addr, message_count);
138+ tx_recv recv (conn_url, addr, message_count, batch_size );
101139 proton::container (recv).run ();
102140
103141 return 0 ;
0 commit comments