Skip to content

Commit adc416d

Browse files
authored
CXXCBC-328: fix restarting MCBP sessions (#406)
Fix socket reconnection during rebalance process. There are several improvements has been implemented to make the library resilient to rapid topology changes (in particular in Cloud environment) when both DNS-SRV bootstrap is being used along with alternative addresses. The changes include: * take into account alternative hostname and ports during detection of added/removed nodes on configuration update * replace node index tracking with hostname/port matching to when restarting the connections, this way we will be sure that no duplicate connections will be left, or live connections replaced by restarted session. * improved logging of critial events during rebalance: restarting, preservation and removing connections. Tools ----- * Added `--verbose` switch to `cbc-get` and `cbc-pillowfight` that will report error contexts of every failed operation to standard error stream in JSON format. It might be useful to collect/diagnose errors of the long running workloads. * Added batching switches `--batch-size` and `--batch-wait` to `cbc-pillowfight`. `--batch-size` allows to change number of operations to be scheduled at once before waiting for completion. `--batch-wait` allows to insert delays between batches in the workload thread. * Added `--number-of-keys-to-populate` switch, which allow to preload set of keys before running the workload. The switch sets number of the keys per thread.
1 parent cb25284 commit adc416d

23 files changed

+692
-324
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ set(couchbase_cxx_client_FILES
303303
core/impl/internal_search_row_locations.cxx
304304
core/impl/internal_term_facet_result.cxx
305305
core/impl/key_value_error_category.cxx
306+
core/impl/key_value_error_context.cxx
306307
core/impl/lookup_in.cxx
307308
core/impl/management_error_category.cxx
308309
core/impl/match_all_query.cxx
@@ -322,6 +323,7 @@ set(couchbase_cxx_client_FILES
322323
core/impl/prepend.cxx
323324
core/impl/query.cxx
324325
core/impl/query_error_category.cxx
326+
core/impl/query_error_context.cxx
325327
core/impl/query_string_query.cxx
326328
core/impl/regexp_query.cxx
327329
core/impl/remove.cxx

core/bucket.cxx

Lines changed: 183 additions & 152 deletions
Large diffs are not rendered by default.

core/bucket.hxx

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class bucket
8383
auto cmd = std::make_shared<operations::mcbp_command<bucket, Request>>(ctx_, shared_from_this(), request, default_timeout());
8484
cmd->start([cmd, handler = std::forward<Handler>(handler)](std::error_code ec, std::optional<io::mcbp_message>&& msg) mutable {
8585
using encoded_response_type = typename Request::encoded_response_type;
86-
std::uint16_t status_code = msg ? msg->header.status() : 0U;
86+
std::uint16_t status_code = msg ? msg->header.status() : 0xffffU;
8787
auto resp = msg ? encoded_response_type(std::move(*msg)) : encoded_response_type{};
8888
auto ctx = make_key_value_error_context(ec, status_code, cmd, resp);
8989
handler(cmd->request.make_response(std::move(ctx), std::move(resp)));
@@ -107,7 +107,7 @@ class bucket
107107
auto [partition, server] = map_id(cmd->request.id);
108108
if (!server.has_value()) {
109109
CB_LOG_TRACE(
110-
R"({} unable to map key=\"{}\" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
110+
R"({} unable to map key="{}" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
111111
return io::retry_orchestrator::maybe_retry(
112112
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
113113
}
@@ -116,19 +116,32 @@ class bucket
116116
}
117117
auto session = find_session_by_index(index);
118118
if (!session || !session->has_config()) {
119-
CB_LOG_TRACE(R"({} defer operation id={}, session={}, has_config={})",
119+
CB_LOG_TRACE(R"({} defer operation id={}, key="{}", partition={}, index={}, session={}, address="{}", has_config={})",
120120
log_prefix(),
121121
cmd->id_,
122+
cmd->request.id,
123+
cmd->request.partition,
124+
index,
122125
session.has_value(),
126+
session.has_value() ? session->bootstrap_address() : "",
123127
session.has_value() && session->has_config());
124128
return defer_command([self = shared_from_this(), cmd]() { self->map_and_send(cmd); });
125129
}
126130
if (session->is_stopped()) {
127131
CB_LOG_TRACE(
128-
R"({} the session has been found, but it is stopped, retrying id={}, session={})", log_prefix(), cmd->id_, session->id());
132+
R"({} the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}")",
133+
log_prefix(),
134+
index,
135+
cmd->id_,
136+
cmd->request.id,
137+
cmd->request.partition,
138+
session->id(),
139+
session->bootstrap_address());
129140
return io::retry_orchestrator::maybe_retry(
130141
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
131142
}
143+
cmd->last_dispatched_from_ = session->local_address();
144+
cmd->last_dispatched_to_ = session->bootstrap_address();
132145
cmd->send_to(session.value());
133146
}
134147

core/cluster.hxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class cluster : public std::enable_shared_from_this<cluster>
483483
}
484484
self->session_manager_->set_configuration(config, self->origin_.options());
485485
self->session_->on_configuration_update(self->session_manager_);
486-
self->session_->on_stop([self](retry_reason) {
486+
self->session_->on_stop([self]() {
487487
if (self->session_) {
488488
self->session_.reset();
489489
}

core/error_context/key_value.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ key_value_error_context
2424
make_key_value_error_context(std::error_code ec, const document_id& id)
2525
{
2626
return {
27-
ec, {}, {}, 0, {}, id.key(), id.bucket(), id.scope(), id.collection(), 0, {}, {}, {}, {},
27+
{}, ec, {}, {}, 0, {}, id.key(), id.bucket(), id.scope(), id.collection(), 0, {}, {}, {}, {},
2828
};
2929
}
3030

@@ -36,6 +36,7 @@ make_subdocument_error_context(const key_value_error_context& ctx,
3636
bool deleted)
3737
{
3838
return {
39+
ctx.operation_id(),
3940
ec,
4041
ctx.last_dispatched_to(),
4142
ctx.last_dispatched_from(),

core/error_context/key_value.hxx

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,21 @@ make_key_value_error_context(std::error_code ec, std::uint16_t status_code, cons
4343
const auto& scope = command->request.id.scope();
4444
const auto& bucket = command->request.id.bucket();
4545
std::uint32_t opaque = (ec && response.opaque() == 0) ? command->request.opaque : response.opaque();
46-
auto status = response.status();
47-
auto retry_attempts = command->request.retries.retry_attempts();
48-
auto retry_reasons = command->request.retries.retry_reasons();
49-
std::optional<std::string> last_dispatched_from{};
50-
std::optional<std::string> last_dispatched_to{};
46+
std::optional<key_value_status_code> status{};
5147
std::optional<key_value_error_map_info> error_map_info{};
52-
if (command->session_) {
53-
last_dispatched_from = command->session_->local_address();
54-
last_dispatched_to = command->session_->remote_address();
55-
if (status_code) {
48+
if (status_code != 0xffffU) {
49+
status = response.status();
50+
if (command->session_ && status_code > 0) {
5651
error_map_info = command->session_->decode_error_code(status_code);
5752
}
5853
}
54+
auto retry_attempts = command->request.retries.retry_attempts();
55+
auto retry_reasons = command->request.retries.retry_reasons();
5956

60-
return { ec,
61-
std::move(last_dispatched_from),
62-
std::move(last_dispatched_to),
57+
return { command->id_,
58+
ec,
59+
command->last_dispatched_to_,
60+
command->last_dispatched_from_,
6361
retry_attempts,
6462
std::move(retry_reasons),
6563
key,
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2020-Present Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <couchbase/key_value_error_context.hxx>
19+
20+
#include <couchbase/fmt/cas.hxx>
21+
#include <couchbase/fmt/key_value_error_map_attribute.hxx>
22+
#include <couchbase/fmt/key_value_status_code.hxx>
23+
#include <couchbase/fmt/retry_reason.hxx>
24+
25+
#include <tao/json/to_string.hpp>
26+
27+
namespace couchbase
28+
{
29+
auto
30+
key_value_error_context::to_json() const -> std::string
31+
{
32+
tao::json::value json = {
33+
{
34+
"ec",
35+
tao::json::value{
36+
{ "value", ec().value() },
37+
{ "message", ec().message() },
38+
},
39+
},
40+
{ "operation_id", operation_id() },
41+
{ "id", id_ },
42+
{ "bucket", bucket_ },
43+
{ "scope", scope_ },
44+
{ "collection", collection_ },
45+
};
46+
47+
if (auto val = retry_attempts(); val > 0) {
48+
json["retry_attempts"] = val;
49+
}
50+
if (opaque_ > 0) {
51+
json["opaque"] = opaque_;
52+
}
53+
54+
if (!cas_.empty()) {
55+
json["cas"] = fmt::format("{}", cas_);
56+
}
57+
58+
if (const auto& reasons = retry_reasons(); !reasons.empty()) {
59+
tao::json::value reasons_json = tao::json::empty_array;
60+
for (const auto& reason : reasons) {
61+
reasons_json.emplace_back(fmt::format("{}", reason));
62+
}
63+
json["retry_reasons"] = reasons_json;
64+
}
65+
if (const auto& val = last_dispatched_from(); val.has_value()) {
66+
json["last_dispatched_from"] = val.value();
67+
}
68+
if (const auto& val = last_dispatched_to(); val.has_value()) {
69+
json["last_dispatched_to"] = val.value();
70+
}
71+
if (const auto& val = status_code_; val.has_value()) {
72+
json["status_code"] = fmt::format("{}", val.value());
73+
}
74+
if (const auto& val = extended_error_info_; val.has_value()) {
75+
json["extended_error_info"] = tao::json::value{
76+
{ "context", val->context() },
77+
{ "reference", val->reference() },
78+
};
79+
}
80+
if (const auto& val = error_map_info_; val.has_value()) {
81+
tao::json::value info{
82+
{ "code", val->code() },
83+
{ "name", val->name() },
84+
{ "description", val->description() },
85+
};
86+
if (const auto& attributes = val->attributes(); !attributes.empty()) {
87+
tao::json::value attrs_json = tao::json::empty_array;
88+
for (const auto& attr : attributes) {
89+
attrs_json.emplace_back(fmt::format("{}", attr));
90+
}
91+
info["attributes"] = attrs_json;
92+
}
93+
json["error_map_info"] = info;
94+
}
95+
96+
return tao::json::to_string(json, 2);
97+
}
98+
} // namespace couchbase

core/impl/query_error_context.cxx

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2020-Present Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <couchbase/query_error_context.hxx>
19+
20+
#include <couchbase/fmt/retry_reason.hxx>
21+
22+
#include <tao/json/to_string.hpp>
23+
24+
namespace couchbase
25+
{
26+
auto
27+
query_error_context::to_json() const -> std::string
28+
{
29+
tao::json::value json = {
30+
{
31+
"ec",
32+
tao::json::value{
33+
{ "value", ec().value() },
34+
{ "message", ec().message() },
35+
},
36+
},
37+
{ "operation_id", operation_id() },
38+
{ "retry_attempts", retry_attempts() },
39+
{ "client_context_id", client_context_id_ },
40+
{ "statement", statement_ },
41+
{ "method", statement_ },
42+
{ "path", statement_ },
43+
{ "http_status", http_status_ },
44+
{ "http_body", http_body_ },
45+
{ "hostname", hostname_ },
46+
{ "port", port_ },
47+
};
48+
49+
if (const auto& val = parameters_; val.has_value()) {
50+
json["parameters"] = val.value();
51+
}
52+
if (first_error_code_ > 0) {
53+
json["first_error_code"] = first_error_code_;
54+
}
55+
if (!first_error_message_.empty()) {
56+
json["first_error_message"] = first_error_message_;
57+
}
58+
59+
if (const auto& reasons = retry_reasons(); !reasons.empty()) {
60+
tao::json::value reasons_json = tao::json::empty_array;
61+
for (const auto& reason : reasons) {
62+
reasons_json.emplace_back(fmt::format("{}", reason));
63+
}
64+
json["retry_reasons"] = reasons_json;
65+
}
66+
if (const auto& val = last_dispatched_from(); val.has_value()) {
67+
json["last_dispatched_from"] = val.value();
68+
}
69+
if (const auto& val = last_dispatched_to(); val.has_value()) {
70+
json["last_dispatched_to"] = val.value();
71+
}
72+
73+
return tao::json::to_string(json, 2);
74+
}
75+
} // namespace couchbase

core/io/mcbp_command.hxx

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
5959
mcbp_command_handler handler_{};
6060
std::shared_ptr<Manager> manager_{};
6161
std::chrono::milliseconds timeout_{};
62-
std::string id_{ uuid::to_string(uuid::random()) };
62+
std::string id_{
63+
fmt::format("{:02x}/{}", static_cast<std::uint8_t>(encoded_request_type::body_type::opcode), uuid::to_string(uuid::random()))
64+
};
6365
std::shared_ptr<couchbase::tracing::request_span> span_{ nullptr };
6466
std::shared_ptr<couchbase::tracing::request_span> parent_span{ nullptr };
67+
std::optional<std::string> last_dispatched_from_{};
68+
std::optional<std::string> last_dispatched_to_{};
6569

6670
mcbp_command(asio::io_context& ctx, std::shared_ptr<Manager> manager, Request req, std::chrono::milliseconds default_timeout)
6771
: deadline(ctx)
@@ -109,7 +113,10 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
109113
handler_ = nullptr;
110114
}
111115
}
112-
invoke_handler(request.retries.idempotent() ? errc::common::unambiguous_timeout : errc::common::ambiguous_timeout);
116+
invoke_handler(request.retries.idempotent() || !opaque_.has_value()
117+
? errc::common::unambiguous_timeout // safe to retry or has not been sent to the server
118+
: errc::common::ambiguous_timeout // non-idempotent and has been sent to the server
119+
);
113120
}
114121

115122
void invoke_handler(std::error_code ec, std::optional<io::mcbp_message>&& msg = {})

0 commit comments

Comments
 (0)