Skip to content

Commit 55906e4

Browse files
authored
Merge pull request grpc#16912 from markdroth/fail_rpcs_on_transient_failure
Fail wait_for_ready=false RPCs when channel is in TRANSIENT_FAILURE.
2 parents 17f1000 + b6059e2 commit 55906e4

File tree

3 files changed

+49
-11
lines changed

3 files changed

+49
-11
lines changed

Diff for: src/core/ext/filters/client_channel/client_channel.cc

+34
Original file line numberDiff line numberDiff line change
@@ -2951,13 +2951,37 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
29512951
}
29522952
}
29532953

2954+
// If the channel is in TRANSIENT_FAILURE and the call is not
2955+
// wait_for_ready=true, fails the call and returns true.
2956+
static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
2957+
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2958+
call_data* calld = static_cast<call_data*>(elem->call_data);
2959+
grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
2960+
if (grpc_connectivity_state_check(&chand->state_tracker) ==
2961+
GRPC_CHANNEL_TRANSIENT_FAILURE &&
2962+
(batch->payload->send_initial_metadata.send_initial_metadata_flags &
2963+
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
2964+
pending_batches_fail(
2965+
elem,
2966+
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2967+
"channel is in state TRANSIENT_FAILURE"),
2968+
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
2969+
true /* yield_call_combiner */);
2970+
return true;
2971+
}
2972+
return false;
2973+
}
2974+
29542975
// Invoked once resolver results are available.
29552976
static void process_service_config_and_start_lb_pick_locked(
29562977
grpc_call_element* elem) {
29572978
call_data* calld = static_cast<call_data*>(elem->call_data);
29582979
// Only get service config data on the first attempt.
29592980
if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
29602981
apply_service_config_to_call_locked(elem);
2982+
// Check this after applying service config, since it may have
2983+
// affected the call's wait_for_ready value.
2984+
if (fail_call_if_in_transient_failure(elem)) return;
29612985
}
29622986
// Start LB pick.
29632987
grpc_core::LbPicker::StartLocked(elem);
@@ -3127,6 +3151,16 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
31273151
// We do not yet have an LB policy, so wait for a resolver result.
31283152
if (GPR_UNLIKELY(!chand->started_resolving)) {
31293153
start_resolving_locked(chand);
3154+
} else {
3155+
// Normally, we want to do this check in
3156+
// process_service_config_and_start_lb_pick_locked(), so that we
3157+
// can honor the wait_for_ready setting in the service config.
3158+
// However, if the channel is in TRANSIENT_FAILURE at this point, that
3159+
// means that the resolver has returned a failure, so we're not going
3160+
// to get a service config right away. In that case, we fail the
3161+
// call now based on the wait_for_ready value passed in from the
3162+
// application.
3163+
if (fail_call_if_in_transient_failure(elem)) return;
31303164
}
31313165
// Create a new waiter, which will delete itself when done.
31323166
grpc_core::New<grpc_core::ResolverResultWaiter>(elem);

Diff for: test/cpp/end2end/client_lb_end2end_test.cc

+10-8
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,14 @@ class ClientLbEnd2endTest : public ::testing::Test {
212212
bool SendRpc(
213213
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
214214
EchoResponse* response = nullptr, int timeout_ms = 1000,
215-
Status* result = nullptr) {
215+
Status* result = nullptr, bool wait_for_ready = false) {
216216
const bool local_response = (response == nullptr);
217217
if (local_response) response = new EchoResponse;
218218
EchoRequest request;
219219
request.set_message(kRequestMessage_);
220220
ClientContext context;
221221
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
222+
if (wait_for_ready) context.set_wait_for_ready(true);
222223
Status status = stub->Echo(&context, request, response);
223224
if (result != nullptr) *result = status;
224225
if (local_response) delete response;
@@ -227,10 +228,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
227228

228229
void CheckRpcSendOk(
229230
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
230-
const grpc_core::DebugLocation& location) {
231+
const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
231232
EchoResponse response;
232233
Status status;
233-
const bool success = SendRpc(stub, &response, 2000, &status);
234+
const bool success =
235+
SendRpc(stub, &response, 2000, &status, wait_for_ready);
234236
ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
235237
<< "\n"
236238
<< "Error: " << status.error_message() << " "
@@ -301,7 +303,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
301303
if (ignore_failure) {
302304
SendRpc(stub);
303305
} else {
304-
CheckRpcSendOk(stub, location);
306+
CheckRpcSendOk(stub, location, true);
305307
}
306308
} while (servers_[server_idx]->service_.request_count() == 0);
307309
ResetCounters();
@@ -506,7 +508,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
506508
do {
507509
channel_state = channel->GetState(true /* try to connect */);
508510
} while (channel_state == GRPC_CHANNEL_READY);
509-
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
511+
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
510512
servers_[0]->service_.ResetCounters();
511513

512514
// Next update introduces servers_[1], making the channel recover.
@@ -830,7 +832,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
830832
do {
831833
channel_state = channel->GetState(true /* try to connect */);
832834
} while (channel_state == GRPC_CHANNEL_READY);
833-
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
835+
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
834836
servers_[0]->service_.ResetCounters();
835837

836838
// Next update introduces servers_[1], making the channel recover.
@@ -839,7 +841,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
839841
SetNextResolution(ports);
840842
WaitForServer(stub, 1, DEBUG_LOCATION);
841843
channel_state = channel->GetState(false /* try to connect */);
842-
GPR_ASSERT(channel_state == GRPC_CHANNEL_READY);
844+
ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
843845

844846
// Check LB policy name for the channel.
845847
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
@@ -952,7 +954,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
952954
if (SendRpc(stub)) break;
953955
now = gpr_now(GPR_CLOCK_MONOTONIC);
954956
}
955-
GPR_ASSERT(gpr_time_cmp(deadline, now) > 0);
957+
ASSERT_GT(gpr_time_cmp(deadline, now), 0);
956958
}
957959

958960
TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {

Diff for: test/cpp/end2end/grpclb_end2end_test.cc

+5-3
Original file line numberDiff line numberDiff line change
@@ -539,13 +539,15 @@ class GrpclbEnd2endTest : public ::testing::Test {
539539
balancers_.at(i)->add_response(response, delay_ms);
540540
}
541541

542-
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
542+
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
543+
bool wait_for_ready = false) {
543544
const bool local_response = (response == nullptr);
544545
if (local_response) response = new EchoResponse;
545546
EchoRequest request;
546547
request.set_message(kRequestMessage_);
547548
ClientContext context;
548549
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
550+
if (wait_for_ready) context.set_wait_for_ready(true);
549551
Status status = stub_->Echo(&context, request, response);
550552
if (local_response) delete response;
551553
return status;
@@ -1366,7 +1368,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
13661368
{}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
13671369
{"load_balancing", num_of_drop_by_load_balancing_addresses}}),
13681370
0);
1369-
const Status status = SendRpc();
1371+
const Status status = SendRpc(nullptr, 1000, true);
13701372
EXPECT_FALSE(status.ok());
13711373
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
13721374
}
@@ -1391,7 +1393,7 @@ TEST_F(SingleBalancerTest, DropAll) {
13911393
// fail.
13921394
Status status;
13931395
do {
1394-
status = SendRpc();
1396+
status = SendRpc(nullptr, 1000, true);
13951397
} while (status.ok());
13961398
EXPECT_FALSE(status.ok());
13971399
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");

0 commit comments

Comments
 (0)