diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index a75fe7d5e22b..d0e941b2a347 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -133,7 +133,7 @@ PerPacketLoadBalancingUdpProxyFilter::onDataInternal(Network::UdpRecvData& data) auto host = cluster->chooseHost(data.addresses_.peer_, nullptr); if (host == nullptr) { ENVOY_LOG(debug, "cannot find any valid host."); - cluster->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); + cluster->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc(); return Network::FilterStatus::StopIteration; } @@ -204,7 +204,7 @@ void UdpProxyFilter::removeSession(ActiveSession* session) { UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster, absl::flat_hash_set&& sessions) - : filter_(filter), cluster_(cluster), + : filter_(filter), cluster_(cluster), cluster_info_(cluster.info()), cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)), member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb( [this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) { @@ -269,7 +269,7 @@ UdpProxyFilter::createSession(Network::UdpRecvData::LocalPeerAddresses&& address auto host = cluster->chooseHost(addresses.peer_, nullptr); if (host == nullptr) { ENVOY_LOG(debug, "cannot find any valid host."); - cluster->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); + cluster->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc(); return nullptr; } @@ -345,8 +345,7 @@ void UdpProxyFilter::ActiveSession::onSessionComplete() { filter_.config_->stats().downstream_sess_active_.dec(); if (cluster_connections_inc_) { - cluster_->cluster_.info() - ->resourceManager(Upstream::ResourcePriority::Default) + cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default) .connections() .dec(); } @@ -385,7 +384,7 @@ void UdpProxyFilter::ActiveSession::fillSessionStreamInfo() { ProtobufWkt::Struct stats_obj; auto& fields_map = *stats_obj.mutable_fields(); if (cluster_ != nullptr) { - fields_map["cluster_name"] = ValueUtil::stringValue(cluster_->cluster_.info()->name()); + fields_map["cluster_name"] = ValueUtil::stringValue(cluster_->cluster_info_->name()); } fields_map["bytes_sent"] = ValueUtil::numberValue(session_stats_.downstream_sess_tx_bytes_); fields_map["bytes_received"] = ValueUtil::numberValue(session_stats_.downstream_sess_rx_bytes_); @@ -551,7 +550,7 @@ void UdpProxyFilter::UdpActiveSession::writeUpstream(Network::UdpRecvData& data) cluster_->cluster_stats_.sess_tx_errors_.inc(); } else { cluster_->cluster_stats_.sess_tx_datagrams_.inc(); - cluster_->cluster_.info()->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length); + cluster_->cluster_info_->trafficStats()->upstream_cx_tx_bytes_total_.add(tx_buffer_length); } } @@ -592,7 +591,7 @@ bool UdpProxyFilter::UdpActiveSession::createUpstream() { host_ = cluster_->chooseHost(addresses_.peer_, &udp_session_info_); if (host_ == nullptr) { ENVOY_LOG(debug, "cannot find any valid host."); - cluster_->cluster_.info()->trafficStats()->upstream_cx_none_healthy_.inc(); + cluster_->cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc(); return false; } } @@ -687,7 +686,7 @@ void UdpProxyFilter::UdpActiveSession::processPacket( host_ != nullptr ? host_->address()->asStringView() : "unknown"); cluster_->cluster_stats_.sess_rx_datagrams_.inc(); - cluster_->cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length); + cluster_->cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length); Network::UdpRecvData recv_data{{std::move(local_address), std::move(peer_address)}, std::move(buffer), @@ -784,20 +783,16 @@ bool UdpProxyFilter::ActiveSession::setClusterInfo() { return false; } - if (!cluster_->cluster_.info() - ->resourceManager(Upstream::ResourcePriority::Default) + if (!cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default) .connections() .canCreate()) { ENVOY_LOG(debug, "cannot create new connection."); udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); - cluster_->cluster_.info()->trafficStats()->upstream_cx_overflow_.inc(); + cluster_->cluster_info_->trafficStats()->upstream_cx_overflow_.inc(); return false; } - cluster_->cluster_.info() - ->resourceManager(Upstream::ResourcePriority::Default) - .connections() - .inc(); + cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default).connections().inc(); cluster_connections_inc_ = true; return true; @@ -1003,21 +998,20 @@ bool UdpProxyFilter::TunnelingActiveSession::createConnectionPool() { // Check this here because the TCP conn pool will queue our request waiting for a connection that // will never be released. - if (!cluster_->cluster_.info() - ->resourceManager(Upstream::ResourcePriority::Default) + if (!cluster_->cluster_info_->resourceManager(Upstream::ResourcePriority::Default) .connections() .canCreate()) { udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); - cluster_->cluster_.info()->trafficStats()->upstream_cx_overflow_.inc(); + cluster_->cluster_info_->trafficStats()->upstream_cx_overflow_.inc(); return false; } if (connect_attempts_ >= filter_.config_->tunnelingConfig()->maxConnectAttempts()) { udp_session_info_.setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded); - cluster_->cluster_.info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc(); + cluster_->cluster_info_->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc(); return false; } else if (connect_attempts_ >= 1) { - cluster_->cluster_.info()->trafficStats()->upstream_rq_retry_.inc(); + cluster_->cluster_info_->trafficStats()->upstream_rq_retry_.inc(); } conn_pool_ = conn_pool_factory_->createConnPool(cluster_->cluster_, load_balancer_context_.get(), @@ -1170,7 +1164,7 @@ void UdpProxyFilter::TunnelingActiveSession::onUpstreamData(Buffer::Instance& da ASSERT(cluster_); cluster_->cluster_stats_.sess_rx_datagrams_.inc(); - cluster_->cluster_.info()->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length); + cluster_->cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_.add(rx_buffer_length); resetIdleTimer(); Network::UdpRecvData recv_data{{addresses_.local_, addresses_.peer_}, diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index 361164cf9022..b4e46ca5cd74 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -862,6 +862,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, UdpProxyFilter& filter_; Upstream::ThreadLocalCluster& cluster_; + Upstream::ClusterInfoConstSharedPtr cluster_info_; UdpProxyUpstreamStats cluster_stats_; absl::flat_hash_set sessions_; diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index 80269d9a5642..219dbe974929 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -927,6 +927,18 @@ stat_prefix: foo expectSessionCreate(upstream_address_); test_sessions_[0].expectWriteToUpstream("hello", 0, nullptr, true); recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + + // Push a new cluster again, we expect this to trigger active session removal. + { + NiceMock other_thread_local_cluster; + other_thread_local_cluster.cluster_.info_->name_ = "fake_cluster"; + Upstream::ThreadLocalClusterCommand command = + [&other_thread_local_cluster]() -> Upstream::ThreadLocalCluster& { + return other_thread_local_cluster; + }; + cluster_update_callbacks_->onClusterAddOrUpdate(other_thread_local_cluster.info()->name(), + command); + } } // Hitting the maximum per-cluster connection/session circuit breaker.