Skip to content

Commit

Permalink
udp_proxy_filter: Fix crash when accepting dynamic cds for UDP. Fixes #…
Browse files Browse the repository at this point in the history
…34195. (#37151)

Please see #34195 and #26206 for details. This change fixes a usage of
cluster info resource after the udp cluster be destroyed.

I've got an approval from envoy-security list for the fix.
Risk Level: Low
Testing: mentioned in both tickets
Docs Changes: N/A

---------

Signed-off-by: sinxccc <[email protected]>
  • Loading branch information
railwaycat authored Dec 13, 2024
1 parent 6878af7 commit 19dcf98
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
38 changes: 16 additions & 22 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ PerPacketLoadBalancingUdpProxyFilter::onDataInternal(Network::UdpRecvData& data)
auto host = cluster->chooseHost(data.addresses_.peer_, nullptr);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
cluster->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -204,7 +204,7 @@ void UdpProxyFilter::removeSession(ActiveSession* session) {
UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter,
Upstream::ThreadLocalCluster& cluster,
absl::flat_hash_set<ActiveSession*>&& sessions)
: filter_(filter), cluster_(cluster),
: filter_(filter), cluster_(cluster), cluster_info_(cluster.info()),
cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)),
member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb(
[this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) {
Expand Down Expand Up @@ -269,7 +269,7 @@ UdpProxyFilter::createSession(Network::UdpRecvData::LocalPeerAddresses&& address
auto host = cluster->chooseHost(addresses.peer_, nullptr);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
cluster->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
return nullptr;
}

Expand Down Expand Up @@ -345,8 +345,7 @@ void UdpProxyFilter::ActiveSession::onSessionComplete() {

filter_.config_->stats().downstream_sess_active_.dec();
if (cluster_connections_inc_) {
cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.dec();
}
Expand Down Expand Up @@ -385,7 +384,7 @@ void UdpProxyFilter::ActiveSession::fillSessionStreamInfo() {
ProtobufWkt::Struct stats_obj;
auto& fields_map = *stats_obj.mutable_fields();
if (cluster_ != nullptr) {
fields_map["cluster_name"] = ValueUtil::stringValue(cluster_->cluster_.info()->name());
fields_map["cluster_name"] = ValueUtil::stringValue(cluster_->cluster_info_->name());
}
fields_map["bytes_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_bytes_);
fields_map["bytes_received"] = ValueUtil::numberValue(session_stats_.downstream_sess_rx_bytes_);
Expand Down Expand Up @@ -551,7 +550,7 @@ void UdpProxyFilter::UdpActiveSession::writeUpstream(Network::UdpRecvData& data)
cluster_->cluster_stats_.sess_tx_errors_.inc();
} else {
cluster_->cluster_stats_.sess_tx_datagrams_.inc();
cluster_->cluster_.info()->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length);
cluster_->cluster_info_->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length);
}
}

Expand Down Expand Up @@ -592,7 +591,7 @@ bool UdpProxyFilter::UdpActiveSession::createUpstream() {
host_ = cluster_->chooseHost(addresses_.peer_, &udp_session_info_);
if (host_ == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
return false;
}
}
Expand Down Expand Up @@ -687,7 +686,7 @@ void UdpProxyFilter::UdpActiveSession::processPacket(
host_ != nullptr ? host_->address()->asStringView() : "unknown");

cluster_->cluster_stats_.sess_rx_datagrams_.inc();
cluster_->cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
cluster_->cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);

Network::UdpRecvData recv_data{{std::move(local_address), std::move(peer_address)},
std::move(buffer),
Expand Down Expand Up @@ -784,20 +783,16 @@ bool UdpProxyFilter::ActiveSession::setClusterInfo() {
return false;
}

if (!cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
if (!cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.canCreate()) {
ENVOY_LOG(debug, "cannot create new connection.");
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
cluster_->cluster_.info()->trafficStats()->upstream_cx_overflow_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_overflow_.inc();
return false;
}

cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.inc();
cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default).connections().inc();

cluster_connections_inc_ = true;
return true;
Expand Down Expand Up @@ -1003,21 +998,20 @@ bool UdpProxyFilter::TunnelingActiveSession::createConnectionPool() {

// Check this here because the TCP conn pool will queue our request waiting for a connection that
// will never be released.
if (!cluster_->cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
if (!cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.canCreate()) {
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
cluster_->cluster_.info()->trafficStats()->upstream_cx_overflow_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_overflow_.inc();
return false;
}

if (connect_attempts_ >= filter_.config_->tunnelingConfig()->maxConnectAttempts()) {
udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
cluster_->cluster_.info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
cluster_->cluster_info_->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
return false;
} else if (connect_attempts_ >= 1) {
cluster_->cluster_.info()->trafficStats()->upstream_rq_retry_.inc();
cluster_->cluster_info_->trafficStats()->upstream_rq_retry_.inc();
}

conn_pool_ = conn_pool_factory_->createConnPool(cluster_->cluster_, load_balancer_context_.get(),
Expand Down Expand Up @@ -1170,7 +1164,7 @@ void UdpProxyFilter::TunnelingActiveSession::onUpstreamData(Buffer::Instance& da

ASSERT(cluster_);
cluster_->cluster_stats_.sess_rx_datagrams_.inc();
cluster_->cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
cluster_->cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length);
resetIdleTimer();

Network::UdpRecvData recv_data{{addresses_.local_, addresses_.peer_},
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,

UdpProxyFilter& filter_;
Upstream::ThreadLocalCluster& cluster_;
Upstream::ClusterInfoConstSharedPtr cluster_info_;
UdpProxyUpstreamStats cluster_stats_;
absl::flat_hash_set<ActiveSession*> sessions_;

Expand Down
12 changes: 12 additions & 0 deletions test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,18 @@ stat_prefix: foo
expectSessionCreate(upstream_address_);
test_sessions_[0].expectWriteToUpstream("hello", 0, nullptr, true);
recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello");

// Push a new cluster again, we expect this to trigger active session removal.
{
NiceMock<Upstream::MockThreadLocalCluster> other_thread_local_cluster;
other_thread_local_cluster.cluster_.info_->name_ = "fake_cluster";
Upstream::ThreadLocalClusterCommand command =
[&other_thread_local_cluster]() -> Upstream::ThreadLocalCluster& {
return other_thread_local_cluster;
};
cluster_update_callbacks_->onClusterAddOrUpdate(other_thread_local_cluster.info()->name(),
command);
}
}

// Hitting the maximum per-cluster connection/session circuit breaker.
Expand Down

0 comments on commit 19dcf98

Please sign in to comment.