Skip to content

Commit

Permalink
Fix crash when accepting dynamic cds for UDP envoyproxy#34195
Browse files Browse the repository at this point in the history
Signed-off-by: sinxccc <[email protected]>
  • Loading branch information
railwaycat committed Nov 14, 2024
1 parent 8b38c57 commit 6d65dd9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
38 changes: 16 additions & 22 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ PerPacketLoadBalancingUdpProxyFilter::onDataInternal(Network::UdpRecvData& data)
auto host = cluster->chooseHost(data.addresses_.peer_, nullptr);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
cluster->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -204,7 +204,7 @@ void UdpProxyFilter::removeSession(ActiveSession* session) {
UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter,
Upstream::ThreadLocalCluster& cluster,
absl::flat_hash_set<ActiveSession*>&& sessions)
: filter_(filter), cluster_(cluster),
: filter_(filter), cluster_(cluster), cluster_info_(cluster.info()),
cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)),
member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb(
[this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) {
Expand Down Expand Up @@ -269,7 +269,7 @@ UdpProxyFilter::createSession(Network::UdpRecvData::LocalPeerAddresses&& address
auto host = cluster->chooseHost(addresses.peer_, nullptr);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
cluster->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
return nullptr;
}

Expand Down Expand Up @@ -345,8 +345,7 @@ void UdpProxyFilter::ActiveSession::onSessionComplete() {

filter_.config_->stats().downstream_sess_active_.dec();
if (cluster_connections_inc_) {
cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.dec();
}
Expand Down Expand Up @@ -385,7 +384,7 @@ void UdpProxyFilter::ActiveSession::fillSessionStreamInfo() {
ProtobufWkt::Struct stats_obj;
auto& fields_map = *stats_obj.mutable_fields();
if (cluster_ != nullptr) {
fields_map["cluster_name"] = ValueUtil::stringValue(cluster_->cluster_.info()->name());
fields_map["cluster_name"] = ValueUtil::stringValue(cluster_->cluster_info_->name());
}
fields_map["bytes_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_bytes_);
fields_map["bytes_received"] = ValueUtil::numberValue(session_stats_.downstream_sess_rx_bytes_);
Expand Down Expand Up @@ -551,7 +550,7 @@ void UdpProxyFilter::UdpActiveSession::writeUpstream(Network::UdpRecvData& data)
cluster_->cluster_stats_.sess_tx_errors_.inc();
} else {
cluster_->cluster_stats_.sess_tx_datagrams_.inc();
cluster_->cluster_.info()->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length);
cluster_->cluster_info_->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length);
}
}

Expand Down Expand Up @@ -592,7 +591,7 @@ bool UdpProxyFilter::UdpActiveSession::createUpstream() {
host_ = cluster_->chooseHost(addresses_.peer_, &udp_session_info_);
if (host_ == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
return false;
}
}
Expand Down Expand Up @@ -687,7 +686,7 @@ void UdpProxyFilter::UdpActiveSession::processPacket(
host_ != nullptr ? host_->address()->asStringView() : "unknown");

cluster_->cluster_stats_.sess_rx_datagrams_.inc();
cluster_->cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
cluster_->cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);

Network::UdpRecvData recv_data{{std::move(local_address), std::move(peer_address)},
std::move(buffer),
Expand Down Expand Up @@ -784,20 +783,16 @@ bool UdpProxyFilter::ActiveSession::setClusterInfo() {
return false;
}

if (!cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
if (!cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.canCreate()) {
ENVOY_LOG(debug, "cannot create new connection.");
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
cluster_->cluster_.info()->trafficStats()->upstream_cx_overflow_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_overflow_.inc();
return false;
}

cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.inc();
cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default).connections().inc();

cluster_connections_inc_ = true;
return true;
Expand Down Expand Up @@ -1002,21 +997,20 @@ bool UdpProxyFilter::TunnelingActiveSession::createConnectionPool() {

// Check this here because the TCP conn pool will queue our request waiting for a connection that
// will never be released.
if (!cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
if (!cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.canCreate()) {
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
cluster_->cluster_.info()->trafficStats()->upstream_cx_overflow_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_overflow_.inc();
return false;
}

if (connect_attempts_ >= filter_.config_->tunnelingConfig()->maxConnectAttempts()) {
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
cluster_->cluster_.info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
return false;
} else if (connect_attempts_ >= 1) {
cluster_->cluster_.info()->trafficStats()->upstream_rq_retry_.inc();
cluster_->cluster_info_->trafficStats()->upstream_rq_retry_.inc();
}

conn_pool_ = conn_pool_factory_->createConnPool(cluster_->cluster_, load_balancer_context_.get(),
Expand Down Expand Up @@ -1169,7 +1163,7 @@ void UdpProxyFilter::TunnelingActiveSession::onUpstreamData(Buffer::Instance& da

ASSERT(cluster_);
cluster_->cluster_stats_.sess_rx_datagrams_.inc();
cluster_->cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
cluster_->cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
resetIdleTimer();

Network::UdpRecvData recv_data{{addresses_.local_, addresses_.peer_},
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,

UdpProxyFilter& filter_;
Upstream::ThreadLocalCluster& cluster_;
Upstream::ClusterInfoConstSharedPtr cluster_info_;
UdpProxyUpstreamStats cluster_stats_;
absl::flat_hash_set<ActiveSession*> sessions_;

Expand Down

0 comments on commit 6d65dd9

Please sign in to comment.