-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcasablanca_couchdb_sample.cpp
162 lines (123 loc) · 4.83 KB
/
casablanca_couchdb_sample.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Producer/consumer bulk upload to CouchDB using Microsoft Casablanca REST library and Boost
// compiled with locally built casablanca:
// https://casablanca.codeplex.com
// clang++ casablanca_couchdb_sample.cpp -std=c++11 -I../casablanca/Release/include/ -lcpprest -L../casablanca//build.release/Binaries/ -lboost_thread-mt -lboost_system -lboost_chrono -lboost_program_options
#include <cpprest/http_client.h>
#include <cpprest/filestream.h>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/format.hpp>
#include <boost/atomic.hpp>
#include <boost/foreach.hpp>
#include <boost/chrono.hpp>
#include <boost/program_options.hpp>
#include <iostream>
using namespace utility; // Common utilities like string conversions
using namespace web; // Common features like URIs.
using namespace web::http; // Common HTTP functionality
using namespace web::http::client; // HTTP client features
using namespace concurrency::streams; // Asynchronous streams
int iterations = 100000;
int batch_size = 10;
boost::lockfree::spsc_queue<web::json::value> queue(128);
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::atomic<bool> done (false);
void send_batch(std::vector<web::json::value> &batch) {
web::json::value v = web::json::value::object();
v["docs"] = web::json::value::array();
int i=0;
BOOST_FOREACH(web::json::value batch_object, batch) {
v["docs"][i++] = batch_object;
}
// Create http_client to send the request.
http_client client(U("http://127.0.0.1:5984/testdb"));
// Build request URI and start the request.
uri_builder builder(U("_bulk_docs"));
pplx::task<void> requestTask = client.request(methods::POST, builder.to_string(), v)
// Handle response headers arriving.
.then([=](http_response response)
{
#ifdef DEBUG
printf("Received response status code:%u\n", response.status_code());
std::cout << response.to_string();
#endif
return;
});
// Wait for all the outstanding I/O to complete and handle any exceptions
try
{
requestTask.wait();
}
catch (const std::exception &e)
{
printf("Error exception:%s\n", e.what());
}
}
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
web::json::value doc = web::json::value::object();
doc["_id"] = web::json::value::string(str(boost::format("FishStew %d") % i));
doc["servings"] = web::json::value::number(4);
doc["subtitle"] = web::json::value::string("Delicious with freshly baked bread");
doc["title"] = web::json::value::string("FishStew");
while (!queue.push(doc))
; // queue full, busy wait
++producer_count;
}
}
void consumer(void)
{
web::json::value value;
int batch = 0;
std::vector<web::json::value> batch_to_send;
while (!done) {
while (queue.pop(value)) {
++consumer_count;
batch_to_send.push_back(value);
if (++batch == batch_size) {
batch = 0;
send_batch(batch_to_send);
batch_to_send.clear();
}
}
}
}
int main(int argc, char* argv[])
{
// Declare the supported options.
boost::program_options::options_description desc("Allowed options");
desc.add_options()
("help", "produce this help message")
("bs", boost::program_options::value<int>(), "batch size")
("d", boost::program_options::value<int>(), "number of documents")
;
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);
if (vm.count("help")) {
std::cout << desc << "\n";
return 1;
}
if (vm.count("bs")) {
batch_size = vm["bs"].as<int>();
}
if (vm.count("d")) {
iterations = vm["d"].as<int>();
}
boost::chrono::steady_clock::time_point start=boost::chrono::steady_clock::now();
boost::thread producer_thread(producer);
boost::thread consumer_thread(consumer);
producer_thread.join();
done = true;
consumer_thread.join();
std::cout << "produced " << producer_count << " objects." << std::endl;
std::cout << "consumed " << consumer_count << " objects." << std::endl;
std::cout << "batch size was " << batch_size << " objects." << std::endl;
boost::chrono::duration<double> d = boost::chrono::steady_clock::now() - start;
// d now holds the number of milliseconds from start to end.
std::cout << "took " << d.count() << " seconds\n" << std::endl;
std::cout << "rate " << (consumer_count / d.count()) << " docs / second" << std::endl;
return 0;
}