Skip to content

Commit 477437c

Browse files
authored
CXXCBC-738: Add dispatch spans for HTTP operations (#854)
1 parent ee16aa2 commit 477437c

11 files changed

+253
-156
lines changed

core/io/http_command.hxx

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -97,26 +97,12 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
9797
}
9898
#endif
9999

100-
void finish_dispatch(const std::string& remote_address, const std::string& local_address)
101-
{
102-
if (span_ == nullptr) {
103-
return;
104-
}
105-
if (span_->uses_tags())
106-
span_->add_tag(tracing::attributes::remote_socket, remote_address);
107-
if (span_->uses_tags())
108-
span_->add_tag(tracing::attributes::local_socket, local_address);
109-
span_->end();
110-
span_ = nullptr;
111-
}
112-
113100
void start(handler_type&& handler)
114101
{
115102
span_ = tracer_->create_span(tracing::span_name_for_http_service(request.type), parent_span_);
116103
if (span_->uses_tags()) {
117104
span_->add_tag(tracing::attributes::service,
118105
tracing::service_name_for_http_service(request.type));
119-
span_->add_tag(tracing::attributes::dispatch::operation_id, client_context_id_);
120106
}
121107

122108
handler_ = std::move(handler);
@@ -164,10 +150,6 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
164150
void invoke_handler(std::error_code ec, io::http_response&& msg)
165151
#endif
166152
{
167-
if (span_ != nullptr) {
168-
span_->end();
169-
span_ = nullptr;
170-
}
171153
if (handler_type handler = std::move(handler_); handler) {
172154
const auto& node_uuid = session_ ? session_->node_uuid() : "";
173155
auto telemetry_recorder = app_telemetry_meter_->value_recorder(node_uuid, {});
@@ -214,7 +196,14 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
214196
ctx.last_dispatched_to = session_->remote_address();
215197
ctx.hostname = session_->http_context().hostname;
216198
ctx.port = session_->http_context().port;
217-
handler(request.make_response(std::move(ctx), std::move(encoded_resp)));
199+
200+
// Can raise priv::retry_http_request when a retry is required
201+
auto resp = request.make_response(std::move(ctx), std::move(encoded_resp));
202+
203+
span_->end();
204+
span_ = nullptr;
205+
206+
handler(std::move(resp));
218207
}
219208
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
220209
dispatch_deadline_.cancel();
@@ -230,9 +219,6 @@ struct http_command : public std::enable_shared_from_this<http_command<Request>>
230219
if (!handler_) {
231220
return;
232221
}
233-
if (span_->uses_tags()) {
234-
span_->add_tag(tracing::attributes::dispatch::local_id, session_->id());
235-
}
236222
send();
237223
}
238224

@@ -265,6 +251,7 @@ private:
265251
return invoke_handler(ec, {});
266252
}
267253
encoded.headers["client-context-id"] = client_context_id_;
254+
268255
CB_LOG_TRACE(
269256
R"({} HTTP request: {}, method={}, path="{}", client_context_id="{}", timeout={}ms)",
270257
session_->log_prefix(),
@@ -273,18 +260,27 @@ private:
273260
encoded.path,
274261
client_context_id_,
275262
timeout_.count());
263+
264+
auto dispatch_span = create_dispatch_span();
265+
276266
session_->write_and_subscribe(
277267
encoded,
278268
[self = this->shared_from_this(),
269+
dispatch_span = std::move(dispatch_span),
279270
start = std::chrono::steady_clock::now()](std::error_code ec, io::http_response&& msg) {
280271
if (ec == asio::error::operation_aborted) {
272+
dispatch_span->end();
281273
return self->invoke_handler(errc::common::ambiguous_timeout, std::move(msg));
282274
}
283275

284-
auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(
285-
std::chrono::steady_clock::now() - start);
286-
self->app_telemetry_meter_->value_recorder(self->session_->node_uuid(), {})
287-
->record_latency(latency_for_service_type(self->request.type), latency);
276+
dispatch_span->end();
277+
278+
{
279+
auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(
280+
std::chrono::steady_clock::now() - start);
281+
self->app_telemetry_meter_->value_recorder(self->session_->node_uuid(), {})
282+
->record_latency(latency_for_service_type(self->request.type), latency);
283+
}
288284

289285
if (self->meter_) {
290286
metrics::metric_attributes attrs{
@@ -294,8 +290,8 @@ private:
294290
};
295291
self->meter_->record_value(std::move(attrs), start);
296292
}
293+
297294
self->deadline.cancel();
298-
self->finish_dispatch(self->session_->remote_address(), self->session_->local_address());
299295
CB_LOG_TRACE(R"({} HTTP response: {}, client_context_id="{}", ec={}, status={}, body={})",
300296
self->session_->log_prefix(),
301297
self->request.type,
@@ -313,6 +309,28 @@ private:
313309
}
314310
});
315311
}
312+
313+
[[nodiscard]] auto create_dispatch_span() const
314+
-> std::shared_ptr<couchbase::tracing::request_span>
315+
{
316+
std::shared_ptr<couchbase::tracing::request_span> dispatch_span =
317+
tracer_->create_span(tracing::operation::step_dispatch, span_);
318+
if (dispatch_span->uses_tags()) {
319+
dispatch_span->add_tag(tracing::attributes::dispatch::network_transport, "tcp");
320+
dispatch_span->add_tag(tracing::attributes::dispatch::operation_id, client_context_id_);
321+
dispatch_span->add_tag(tracing::attributes::dispatch::local_id, session_->id());
322+
dispatch_span->add_tag(tracing::attributes::dispatch::server_address,
323+
session_->http_context().canonical_hostname);
324+
dispatch_span->add_tag(tracing::attributes::dispatch::server_port,
325+
session_->http_context().canonical_port);
326+
327+
const auto& peer_endpoint = session_->remote_endpoint();
328+
dispatch_span->add_tag(tracing::attributes::dispatch::peer_address,
329+
peer_endpoint.address().to_string());
330+
dispatch_span->add_tag(tracing::attributes::dispatch::peer_port, peer_endpoint.port());
331+
}
332+
return dispatch_span;
333+
}
316334
};
317335

318336
} // namespace couchbase::core::operations

core/io/http_context.hxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ struct http_context {
2929
query_cache& cache;
3030
std::string hostname;
3131
std::uint16_t port;
32+
std::string canonical_hostname;
33+
std::uint16_t canonical_port;
3234
};
3335

3436
namespace priv

core/io/http_session.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ http_session::local_address() -> std::string
185185
return info_.local_address();
186186
}
187187

188+
auto
189+
http_session::remote_endpoint() -> const asio::ip::tcp::endpoint&
190+
{
191+
const std::scoped_lock lock(info_mutex_);
192+
return info_.remote_endpoint();
193+
}
194+
188195
auto
189196
http_session::diag_info() -> diag::endpoint_diag_info
190197
{

core/io/http_session.hxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public:
9494
[[nodiscard]] auto http_context() -> couchbase::core::http_context&;
9595
[[nodiscard]] auto remote_address() -> std::string;
9696
[[nodiscard]] auto local_address() -> std::string;
97+
[[nodiscard]] auto remote_endpoint() -> const asio::ip::tcp::endpoint&;
9798
[[nodiscard]] auto diag_info() -> diag::endpoint_diag_info;
9899
[[nodiscard]] auto log_prefix() -> std::string;
99100
[[nodiscard]] auto id() const -> const std::string&;

0 commit comments

Comments
 (0)