Skip to content

Commit eb4514b

Browse files
committed
Add transaction commit and abort functionalities
1 parent 0e20334 commit eb4514b

File tree

5 files changed

+205
-96
lines changed

5 files changed

+205
-96
lines changed

cpp/examples/tx_send.cpp

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,21 @@
3333
#include <map>
3434
#include <string>
3535

36+
#include <chrono>
37+
#include <thread>
38+
3639
class tx_send : public proton::messaging_handler, proton::transaction_handler {
3740
private:
3841
proton::sender sender;
3942
std::string url;
4043
int total;
4144
int batch_size;
4245
int sent;
46+
int batch_index = 0;
4347
int current_batch = 0;
4448
int committed = 0;
4549
int confirmed = 0;
50+
4651
proton::container *container;
4752
// proton::transaction_handler transaction_handler;
4853
proton::transaction transaction;
@@ -56,68 +61,90 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
5661
sender = c.open_sender(url);
5762
connection = sender.connection();
5863
std::cout << " [on_container_start] declare_txn started..." << std::endl;
59-
transaction = c.declare_transaction(connection, *this);
60-
std::cout << " [on_container_start] completed!! txn: " << &transaction << std::endl;
64+
c.declare_transaction(connection, *this);
65+
std::cout << " [on_container_start] completed!!" << &transaction
66+
<< std::endl;
6167
}
6268

63-
void on_transaction_aborted(proton::transaction) {}
6469
void on_transaction_declare_failed(proton::transaction) {}
65-
void on_transaction_commit_failed(proton::transaction) {}
70+
void on_transaction_commit_failed(proton::transaction) {
71+
std::cout << "Transaction Commit Failed" << std::endl;
72+
connection.close();
73+
exit(-1);
74+
}
6675

6776
void on_transaction_declared(proton::transaction t) override {
68-
std::cout << "[on_transaction_declared] txn: " << (&transaction)
69-
<< " new_txn: " << (t._impl->id) << std::endl;
70-
connection.close();
71-
// transaction = &t;
72-
// ASSUME: THIS FUNCTION DOESN"T WORK
73-
// send();
77+
std::cout << "[on_transaction_declared] txn called " << (&t)
78+
<< std::endl;
79+
// connection.close();
80+
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
81+
<< "\t" << transaction.is_empty() << std::endl;
82+
transaction = t;
83+
84+
send(sender);
7485
}
7586

7687
void on_sendable(proton::sender &s) override {
7788
// send();
78-
// std::cout<<" [OnSendable] transaction: "<< &transaction << std::endl;
79-
// send(s);
89+
std::cout << " [OnSendable] transaction: " << &transaction
90+
<< std::endl;
91+
send(s);
8092
}
8193

8294
void send(proton::sender &s) {
8395
// TODO: Add more condition in while loop
84-
// transaction != null
85-
while ( sender.credit() && (committed + current_batch) < total)
86-
{
96+
while (!transaction.is_empty() && sender.credit() &&
97+
(committed + current_batch) < total) {
8798
proton::message msg;
8899
std::map<std::string, int> m;
89100
m["sequence"] = committed + current_batch;
90101

91102
msg.id(committed + current_batch + 1);
92103
msg.body(m);
104+
std::cout << " [example] transaction send msg: " << msg
105+
<< std::endl;
93106
transaction.send(sender, msg);
94107
current_batch += 1;
95108
if(current_batch == batch_size)
96109
{
97-
transaction.commit();
98-
// transaction = NULL;
110+
std::cout << " >> Txn attempt commit" << std::endl;
111+
if (batch_index % 2 == 0) {
112+
transaction.commit();
113+
} else {
114+
transaction.abort();
115+
}
116+
117+
transaction = proton::transaction();
118+
batch_index++;
99119
}
100120
}
101-
102121
}
103122

104123
void on_tracker_accept(proton::tracker &t) override {
105124
confirmed += 1;
125+
std::cout << " [example] on_tracker_accept:" << confirmed
126+
<< std::endl;
106127
}
107128

108129
void on_transaction_committed(proton::transaction t) override {
109130
committed += current_batch;
131+
current_batch = 0;
110132
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
111133
if(committed == total) {
112-
std::cout << "All messages committed";
113-
// connection.close();
134+
std::cout << "All messages committed" << std::endl;
135+
connection.close();
114136
}
115137
else {
116-
// current_batch = 0;
117-
// container->declare_transaction(connection, transaction_handler);
138+
container->declare_transaction(connection, *this);
118139
}
119140
}
120141

142+
void on_transaction_aborted(proton::transaction t) override {
143+
std::cout << "Meesages Aborted ....." << std::endl;
144+
current_batch = 0;
145+
container->declare_transaction(connection, *this);
146+
}
147+
121148
void on_sender_close(proton::sender &s) override {
122149
current_batch = 0;
123150
}
@@ -126,8 +153,8 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
126153

127154
int main(int argc, char **argv) {
128155
std::string address("127.0.0.1:5672/examples");
129-
int message_count = 10;
130-
int batch_size = 10;
156+
int message_count = 6;
157+
int batch_size = 3;
131158
example::options opts(argc, argv);
132159

133160
opts.add_value(address, 'a', "address", "connect and send to URL", "URL");

cpp/include/proton/transaction.hpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class transaction_handler;
4040
// TODO: This should not be accessible to users.
4141
class transaction_impl {
4242
public:
43-
proton::sender *txn_ctrl = nullptr;
43+
proton::sender txn_ctrl;
4444
proton::transaction_handler *handler = nullptr;
4545
proton::binary id;
4646
proton::tracker _declare;
@@ -54,6 +54,11 @@ class transaction_impl {
5454
proton::tracker send(proton::sender s, proton::message msg);
5555

5656
void discharge(bool failed);
57+
void release_pending();
58+
void accept(tracker &d);
59+
void update(tracker &d, uint64_t state);
60+
void set_id(binary _id);
61+
5762
proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
5863
void handle_outcome(proton::tracker t);
5964
transaction_impl(proton::sender &_txn_ctrl,
@@ -67,19 +72,21 @@ class transaction_impl {
6772

6873
class
6974
PN_CPP_CLASS_EXTERN transaction {
70-
// private:
71-
// PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
72-
// proton::transaction_handler& _handler, bool _settle_before_discharge);
75+
private:
76+
// PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
77+
// proton::transaction_handler& _handler, bool _settle_before_discharge);
78+
79+
static transaction mk_transaction_impl(sender &s, transaction_handler &h,
80+
bool f);
81+
PN_CPP_EXTERN transaction(transaction_impl *impl);
82+
transaction_impl *_impl;
7383

74-
static transaction mk_transaction_impl(sender &s, transaction_handler &h,
75-
bool f);
76-
PN_CPP_EXTERN transaction(transaction_impl* impl);
7784
public:
78-
transaction_impl* _impl;
79-
// TODO:
85+
// TODO:
8086
// PN_CPP_EXTERN transaction(transaction &o);
8187
PN_CPP_EXTERN transaction();
8288
PN_CPP_EXTERN ~transaction();
89+
PN_CPP_EXTERN bool is_empty();
8390
PN_CPP_EXTERN void commit();
8491
PN_CPP_EXTERN void abort();
8592
PN_CPP_EXTERN void declare();

cpp/src/messaging_adapter.cpp

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,8 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) {
7171
// TODO: process session flow data, if no link-specific data, just return.
7272
if (!lnk) return;
7373
int state = pn_link_state(lnk);
74-
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
75-
std::cout << " on_link_flow, type: PN_COORDINATOR" << std::endl;
76-
return;
77-
78-
}
79-
if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
74+
if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR ||
75+
((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) {
8076
link_context& lctx = link_context::get(lnk);
8177
if (pn_link_is_sender(lnk)) {
8278
if (pn_link_credit(lnk) > 0) {
@@ -123,13 +119,36 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) {
123119
Tracing& ot = Tracing::getTracing();
124120
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
125121
// delivery d(make_wrapper<delivery>(dlv));
126-
pn_disposition_t *disposition = pn_delivery_remote(dlv);
127-
proton::value val(pn_disposition_data(disposition));
128-
std::cout << " on_delivery: COOORINDATOR.. tracker: " << val
129-
<< std::endl;
130-
tracker t(make_wrapper<tracker>(dlv));
122+
// pn_disposition_t *disposition = pn_delivery_remote(dlv);
123+
// proton::value val(pn_disposition_data(disposition));
124+
// std::cout << " on_delivery: COOORINDATOR.. tracker: " << val
125+
// << std::endl;
126+
// tracker t(make_wrapper<tracker>(dlv));
131127
std::cout << " on_delivery: COOORINDATOR.. TRACKER MADE: "
132128
<< std::endl;
129+
130+
if (pn_delivery_updated(dlv)) {
131+
tracker t(make_wrapper<tracker>(dlv));
132+
ot.on_settled_span(t);
133+
switch (pn_delivery_remote_state(dlv)) {
134+
case PN_ACCEPTED:
135+
handler.on_tracker_accept(t);
136+
break;
137+
case PN_REJECTED:
138+
handler.on_tracker_reject(t);
139+
break;
140+
case PN_RELEASED:
141+
case PN_MODIFIED:
142+
handler.on_tracker_release(t);
143+
break;
144+
}
145+
if (t.settled()) {
146+
handler.on_tracker_settle(t);
147+
if (lctx.auto_settle)
148+
t.settle();
149+
}
150+
}
151+
133152
// t.user_data = val; // not
134153

135154
// proton::disposition _disposition = make_wrapper(disposition); // #
@@ -140,7 +159,7 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) {
140159
// std::cout<< " on_delivery: COOORINDATOR with TXN IN :"
141160
// << val2 << std::endl;
142161

143-
handler.on_tracker_settle(t);
162+
// handler.on_tracker_settle(t);
144163
} else if (pn_link_is_receiver(lnk)) {
145164
delivery d(make_wrapper<delivery>(dlv));
146165
if (pn_delivery_aborted(dlv)) {

cpp/src/proactor_container_impl.cpp

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -872,52 +872,29 @@ void container::impl::stop(const proton::error_condition& err) {
872872
transaction container::impl::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {
873873
class InternalTransactionHandler : public proton::messaging_handler {
874874
// TODO: auto_settle
875+
875876
void on_tracker_settle(proton::tracker &t) override {
876877
std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn"
877878
<< std::endl;
878-
t.transaction().handle_outcome(t);
879-
880-
// t.user_data = val; // not
881-
882-
// proton::disposition _disposition = make_wrapper(disposition);
883-
// // # t.remote();
884-
885-
// proton::value val2 = _disposition.data();
886-
887-
// proton::disposition _disposition = t.remote();
888-
889-
// proton::value val = _disposition.data();
890-
891-
// std::cout<< " declare_transaction: on_tracker_settle with
892-
// TXN IN :" << val << std::endl;
893-
894-
// if(t.transaction()) {
895-
// t.transaction().handle_outcome(t);
896-
// }
879+
if (!t.transaction().is_empty()) {
880+
t.transaction().handle_outcome(t);
881+
}
897882
}
898-
899-
// TODO: Add on_unhandled function
900883
};
901884

902-
// TODO: Sender should be created only once. (May be use Singleton Class)
903-
// proton::target_options t;
904-
905885
proton::target_options t;
906886
std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
907887
t.capabilities(cap);
908888
t.type(PN_COORDINATOR);
909889

910890
proton::sender_options so;
911891
so.name("txn-ctrl");
912-
// Todo: Check the value, Or by deafult null?
913-
//so.source() ?
914892
so.target(t);
915-
// TODO: FIX STATIC
916893
static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
917894
so.handler(internal_handler);
918895
std::cout<<" [declare_transaction] txn-name sender open with handler: " << &internal_handler << std::endl;
919896

920-
proton::sender s = conn.open_sender("does not matter", so);
897+
static proton::sender s = conn.open_sender("does not matter", so);
921898

922899
settle_before_discharge = false;
923900

0 commit comments

Comments
 (0)