Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agents/grpc/proto/info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ message InfoBody {
repeated string tags = 13;
uint64 totalMem = 14;
map<string, string> versions = 15;
uint32 kernelVersion = 16;
optional string appVersion = 17;
}

message InfoEvent {
Expand Down
4 changes: 2 additions & 2 deletions agents/grpc/src/command_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ CommandStream::~CommandStream() {
nsuv::ns_mutex::scoped_lock lock(lock_);
// try cancel and wait until OnDone is called
if (!write_state_.done) {
cancelling_for_destruction_ = true;
context_.TryCancel();
do {
uv_cond_wait(&on_done_cond_, lock_.base());
Expand All @@ -58,12 +59,11 @@ void CommandStream::OnDone(const Status& s) {
nsuv::ns_mutex::scoped_lock lock(lock_);
write_state_.done = true;
uv_cond_signal(&on_done_cond_);
if (!obs) {
if (!obs || cancelling_for_destruction_) {
return;
}
}

// Don't notify the observer if the stream was cancelled (destroyed)
obs->on_command_stream_done(s);
}

Expand Down
1 change: 1 addition & 0 deletions agents/grpc/src/command_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class CommandStream:
TSQueue<grpcagent::CommandResponse> response_q_;
nsuv::ns_mutex lock_;
uv_cond_t on_done_cond_;
bool cancelling_for_destruction_ = false;
};

} // namespace grpc
Expand Down
32 changes: 22 additions & 10 deletions agents/grpc/src/grpc_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "nsolid/nsolid_api.h"
#include "nsolid/nsolid_util.h"
#include "../../otlp/src/otlp_common.h"
#include "../../src/root_certs.h"
#include "../../src/span_collector.h"
#include "absl/log/initialize.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
Expand All @@ -21,6 +22,9 @@
#include "opentelemetry/exporters/otlp/otlp_metric_utils.h"
#include "opentelemetry/trace/semantic_conventions.h"

using std::chrono::duration_cast;
using std::chrono::nanoseconds;
using std::chrono::seconds;
using std::chrono::system_clock;
using std::chrono::time_point;
using google::protobuf::Arena;
Expand Down Expand Up @@ -73,17 +77,14 @@ const size_t GRPC_MAX_SIZE = 4L * 1024 * 1024; // 4GB
const int PUB_KEY_SIZE = 40;
const int CONSOLE_ID_SIZE = 36;

static const char* const root_certs[] = {
#include "node_root_certs.h" // NOLINT(build/include_order)
};
const seconds DEFAULT_GRPC_TIMEOUT = seconds{ 60 };

template <typename... Args>
inline void Debug(Args&&... args) {
per_process::Debug(DebugCategory::NSOLID_GRPC_AGENT,
std::forward<Args>(args)...);
}


inline void DebugJSON(const char* str, const json& msg) {
if (UNLIKELY(per_process::enabled_debug_list.enabled(
DebugCategory::NSOLID_GRPC_AGENT))) {
Expand All @@ -97,10 +98,6 @@ JSThreadMetrics::JSThreadMetrics(SharedEnvInst envinst):

std::pair<int64_t, int64_t>
create_recorded(const time_point<system_clock>& ts) {
using std::chrono::duration_cast;
using std::chrono::seconds;
using std::chrono::nanoseconds;

system_clock::duration dur = ts.time_since_epoch();
return { duration_cast<seconds>(dur).count(),
duration_cast<nanoseconds>(dur % seconds(1)).count() };
Expand Down Expand Up @@ -204,6 +201,9 @@ void PopulateInfoEvent(grpcagent::InfoEvent* info_event,
if (info.find("app") != info.end()) {
body->set_app(info["app"].get<std::string>());
}
if (info.find("appVersion") != info.end()) {
body->set_appversion(info["appVersion"].get<std::string>());
}
if (info.find("arch") != info.end()) {
body->set_arch(info["arch"].get<std::string>());
}
Expand Down Expand Up @@ -253,6 +253,10 @@ void PopulateInfoEvent(grpcagent::InfoEvent* info_event,
(*body->mutable_versions())[key] = value.get<std::string>();
}
}

if (info.find("kernelVersion") != info.end()) {
body->set_kernelversion(info["kernelVersion"].get<uint32_t>());
}
}
}

Expand Down Expand Up @@ -457,8 +461,8 @@ GrpcAgent::GrpcAgent(): hooks_init_(false),

if (custom_certs_.empty()) {
Debug("Using default certs\n");
for (size_t i = 0; i < sizeof(root_certs) / sizeof(root_certs[0]); i++) {
cacert_ += root_certs[i];
for (size_t i = 0; i < GetRootCertsCount(); i++) {
cacert_ += GetRootCerts()[i];
cacert_ += "\n";
}
}
Expand Down Expand Up @@ -1024,9 +1028,11 @@ int GrpcAgent::config(const json& config) {
endpoint.c_str(), static_cast<unsigned>(insecure));

OtlpGrpcClientOptions opts;
opts.compression = "gzip";
opts.endpoint = endpoint;
opts.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
opts.timeout = DEFAULT_GRPC_TIMEOUT;
// Make sure the client is initialized. We set it to the same
// default value as ax_concurrent_requests as the exporters.
opts.max_concurrent_requests = 64;
Expand All @@ -1049,9 +1055,11 @@ int GrpcAgent::config(const json& config) {

{
OtlpGrpcExporterOptions options;
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
options.timeout = DEFAULT_GRPC_TIMEOUT;
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand All @@ -1065,9 +1073,11 @@ int GrpcAgent::config(const json& config) {
}
{
OtlpGrpcMetricExporterOptions options;
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
options.timeout = DEFAULT_GRPC_TIMEOUT;
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand All @@ -1082,9 +1092,11 @@ int GrpcAgent::config(const json& config) {
}
{
OtlpGrpcLogRecordExporterOptions options;
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
options.timeout = DEFAULT_GRPC_TIMEOUT;
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand Down
Loading
Loading