Skip to content

Commit 6eb58f4

Browse files
astitcherDreamPearl
authored andcommitted
Implement local transaction in CPP
1 parent d323082 commit 6eb58f4

17 files changed

+849
-17
lines changed

cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ set(qpid-proton-cpp-source
120120
src/terminus.cpp
121121
src/timestamp.cpp
122122
src/tracker.cpp
123+
src/transaction_handler.cpp
123124
src/transfer.cpp
124125
src/transport.cpp
125126
src/type_id.cpp

cpp/examples/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ foreach(example
6060
scheduled_send
6161
service_bus
6262
multithreaded_client
63-
multithreaded_client_flow_control)
63+
multithreaded_client_flow_control
64+
tx_send
65+
tx_recv)
6466
add_executable(${example} ${example}.cpp)
6567
target_link_libraries(${example} Proton::cpp Threads::Threads)
6668
endforeach()

cpp/examples/simple_recv.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ class simple_recv : public proton::messaging_handler {
4343
proton::receiver receiver;
4444
int expected;
4545
int received;
46+
bool verbose;
4647

4748
public:
48-
simple_recv(const std::string &s, const std::string &u, const std::string &p, int c) :
49-
url(s), user(u), password(p), expected(c), received(0) {}
49+
simple_recv(const std::string &s, const std::string &u, const std::string &p, int c, bool verbose) :
50+
url(s), user(u), password(p), expected(c), received(0), verbose(verbose) {}
5051

5152
void on_container_start(proton::container &c) override {
5253
proton::connection_options co;
@@ -61,6 +62,9 @@ class simple_recv : public proton::messaging_handler {
6162
}
6263

6364
if (expected == 0 || received < expected) {
65+
if (verbose) {
66+
std::cout << msg << ": ";
67+
}
6468
std::cout << msg.body() << std::endl;
6569
received++;
6670

@@ -77,18 +81,20 @@ int main(int argc, char **argv) {
7781
std::string user;
7882
std::string password;
7983
int message_count = 100;
84+
bool verbose;
8085
example::options opts(argc, argv);
8186

8287
opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
8388
opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
8489
opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
8590
opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
91+
opts.add_flag(verbose, 'v', "verbose", "show whole message contents");
8692

8793

8894
try {
8995
opts.parse();
9096

91-
simple_recv recv(address, user, password, message_count);
97+
simple_recv recv(address, user, password, message_count, verbose);
9298
proton::container(recv).run();
9399

94100
return 0;

cpp/examples/tx_recv.cpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
#include "options.hpp"
23+
24+
#include <proton/connection.hpp>
25+
#include <proton/container.hpp>
26+
#include <proton/delivery.hpp>
27+
#include <proton/source.hpp>
28+
#include <proton/message.hpp>
29+
#include <proton/message_id.hpp>
30+
#include <proton/messaging_handler.hpp>
31+
#include <proton/types.hpp>
32+
#include <proton/transaction_handler.hpp>
33+
34+
#include <iostream>
35+
#include <map>
36+
#include <string>
37+
38+
#include <atomic>
39+
#include <chrono>
40+
#include <thread>
41+
42+
class tx_recv : public proton::messaging_handler {
43+
private:
44+
proton::sender sender;
45+
proton::receiver receiver;
46+
std::string conn_url_;
47+
std::string addr_;
48+
int expected;
49+
int received = 0;
50+
std::atomic<int> unique_msg_id;
51+
52+
public:
53+
tx_recv(const std::string& u, const std::string &a, int c):
54+
conn_url_(u), addr_(a), expected(c), unique_msg_id(20000) {}
55+
56+
void on_container_start(proton::container &c) override {
57+
c.connect(conn_url_);
58+
}
59+
60+
void on_connection_open(proton::connection& c) override {
61+
receiver = c.open_receiver(addr_);
62+
sender = c.open_sender(addr_);
63+
}
64+
65+
void on_session_open(proton::session &s) override {
66+
std::cout << "New session is open" << std::endl;
67+
}
68+
69+
void on_message(proton::delivery &d, proton::message &msg) override {
70+
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
71+
d.accept();
72+
proton::message reply_message;
73+
74+
reply_message.id(std::atomic_fetch_add(&unique_msg_id, 1));
75+
reply_message.body(msg.body());
76+
reply_message.reply_to(receiver.source().address());
77+
78+
sender.send(reply_message);
79+
80+
received += 1;
81+
if (received == expected) {
82+
receiver.connection().close();
83+
}
84+
}
85+
};
86+
87+
int main(int argc, char **argv) {
88+
std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
89+
std::string addr = argc > 2 ? argv[2] : "examples";
90+
int message_count = 6;
91+
example::options opts(argc, argv);
92+
93+
opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL");
94+
opts.add_value(addr, 'a', "address", "connect and send to address", "URL");
95+
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT");
96+
97+
try {
98+
opts.parse();
99+
100+
tx_recv recv(conn_url, addr, message_count);
101+
proton::container(recv).run();
102+
103+
return 0;
104+
} catch (const example::bad_option& e) {
105+
std::cout << opts << std::endl << e.what() << std::endl;
106+
} catch (const std::exception& e) {
107+
std::cerr << e.what() << std::endl;
108+
}
109+
110+
return 1;
111+
}

cpp/examples/tx_send.cpp

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
#include "options.hpp"
23+
24+
#include <proton/connection.hpp>
25+
#include <proton/container.hpp>
26+
#include <proton/message.hpp>
27+
#include <proton/message_id.hpp>
28+
#include <proton/messaging_handler.hpp>
29+
#include <proton/sender_options.hpp>
30+
#include <proton/types.hpp>
31+
#include <proton/transaction_handler.hpp>
32+
33+
#include <iostream>
34+
#include <map>
35+
#include <string>
36+
37+
#include <atomic>
38+
#include <chrono>
39+
#include <thread>
40+
41+
class tx_send : public proton::messaging_handler, proton::transaction_handler {
42+
private:
43+
proton::sender sender;
44+
std::string conn_url_;
45+
std::string addr_;
46+
int total;
47+
int batch_size;
48+
int sent;
49+
int batch_index = 0;
50+
int current_batch = 0;
51+
int committed = 0;
52+
std::atomic<int> unique_msg_id;
53+
54+
public:
55+
tx_send(const std::string& u, const std::string& a, int c, int b):
56+
conn_url_(u), addr_(a), total(c), batch_size(b), sent(0), unique_msg_id(10000) {}
57+
58+
void on_container_start(proton::container &c) override {
59+
c.connect(conn_url_);
60+
}
61+
62+
void on_connection_open(proton::connection& c) override {
63+
sender = c.open_sender(addr_);
64+
}
65+
66+
void on_session_open(proton::session& s) override {
67+
std::cout << "New session is open, declaring transaction now..." << std::endl;
68+
s.transaction_declare(*this);
69+
}
70+
71+
void on_transaction_declare_failed(proton::session s) override {
72+
std::cout << "Transaction declarion failed" << std::endl;
73+
s.connection().close();
74+
exit(-1);
75+
}
76+
77+
void on_transaction_commit_failed(proton::session s) override {
78+
std::cout << "Transaction commit failed!" << std::endl;
79+
s.connection().close();
80+
exit(-1);
81+
}
82+
83+
void on_transaction_declared(proton::session s) override {
84+
std::cout << "Transaction is declared: " << s.transaction_id() << std::endl;
85+
send();
86+
}
87+
88+
void on_sendable(proton::sender&) override {
89+
send();
90+
}
91+
92+
void send() {
93+
proton::session session = sender.session();
94+
while (session.transaction_is_declared() && sender.credit() &&
95+
(committed + current_batch) < total) {
96+
proton::message msg;
97+
std::map<std::string, int> m;
98+
m["sequence"] = committed + current_batch;
99+
100+
msg.id(std::atomic_fetch_add(&unique_msg_id, 1));
101+
msg.body(m);
102+
std::cout << "Sending [sender batch " << batch_index << "]: " << msg << std::endl;
103+
sender.send(msg);
104+
current_batch += 1;
105+
if(current_batch == batch_size)
106+
{
107+
std::cout << "In this example we commit even batch index and abort otherwise." << std::endl;
108+
if (batch_index % 2 == 0) {
109+
std::cout << "Commiting transaction..." << std::endl;
110+
session.transaction_commit();
111+
} else {
112+
std::cout << "Aborting transaction..." << std::endl;
113+
session.transaction_abort();
114+
}
115+
batch_index++;
116+
}
117+
}
118+
}
119+
120+
void on_transaction_committed(proton::session s) override {
121+
committed += current_batch;
122+
current_batch = 0;
123+
std::cout << "Transaction commited" << std::endl;
124+
if(committed == total) {
125+
std::cout << "All messages committed, closing connection." << std::endl;
126+
s.connection().close();
127+
}
128+
else {
129+
std::cout << "Re-declaring transaction now..." << std::endl;
130+
s.transaction_declare(*this);
131+
}
132+
}
133+
134+
void on_transaction_aborted(proton::session s) override {
135+
std::cout << "Transaction aborted!" << std::endl;
136+
std::cout << "Re-delaring transaction now..." << std::endl;
137+
current_batch = 0;
138+
s.transaction_declare(*this);
139+
}
140+
141+
void on_sender_close(proton::sender &s) override {
142+
current_batch = 0;
143+
}
144+
145+
};
146+
147+
int main(int argc, char **argv) {
148+
std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
149+
std::string addr = argc > 2 ? argv[2] : "examples";
150+
int message_count = 6;
151+
int batch_size = 3;
152+
example::options opts(argc, argv);
153+
154+
opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL");
155+
opts.add_value(addr, 'a', "address", "connect and send to address", "URL");
156+
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT");
157+
opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE");
158+
159+
try {
160+
opts.parse();
161+
162+
tx_send send(conn_url, addr, message_count, batch_size);
163+
proton::container(send).run();
164+
165+
return 0;
166+
} catch (const example::bad_option& e) {
167+
std::cout << opts << std::endl << e.what() << std::endl;
168+
} catch (const std::exception& e) {
169+
std::cerr << e.what() << std::endl;
170+
}
171+
172+
return 1;
173+
}

cpp/include/proton/fwd.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class source_options;
5555
class ssl;
5656
class target_options;
5757
class tracker;
58+
class transaction_handler;
5859
class transport;
5960
class url;
6061
class void_function0;

0 commit comments

Comments
 (0)