diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 30728305319c..79ff0d5adeff 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7103,9 +7103,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( Allow Iceberg read optimization based on Iceberg metadata. -)", EXPERIMENTAL) \ - DECLARE(Bool, allow_retries_in_cluster_requests, false, R"( -Allow retries in cluster request, when one node goes offline )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. @@ -7227,6 +7224,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \ MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \ MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \ + MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \ /** The section above is for obsolete settings. Do not add anything there. */ #endif /// __CLION_IDE__ diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 83f33a52de41..d51a5aecf8b5 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -52,7 +52,6 @@ namespace Setting extern const SettingsBool use_hedged_requests; extern const SettingsBool push_external_roles_in_interserver_queries; extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms; - extern const SettingsBool allow_retries_in_cluster_requests; } namespace ErrorCodes @@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor( , extension(extension_) , priority_func(priority_func_) , read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests]) - , allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests]) { if (stage == QueryProcessingStage::QueryPlan && !query_plan) throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage"); @@ -468,8 +466,7 @@ int RemoteQueryExecutor::sendQueryAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ true, - read_packet_type_separately, - allow_retries_in_cluster_requests); + read_packet_type_separately); /// If query already sent, do nothing. Note that we cannot use sent_query flag here, /// because we can still be in process of sending scalars or external tables. @@ -542,8 +539,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ false, - read_packet_type_separately, - allow_retries_in_cluster_requests); + read_packet_type_separately); recreate_read_context = false; } @@ -734,16 +730,13 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet break; case Protocol::Server::ConnectionLost: - if (allow_retries_in_cluster_requests) + if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info) { - if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info) + if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica)) { - if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica)) - { - finished = true; - extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica); - return ReadResult(Block{}); - } + finished = true; + extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica); + return ReadResult(Block{}); } } packet.exception->rethrow(); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 7ef8be9e27cc..17bff1573ee4 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -339,8 +339,6 @@ class RemoteQueryExecutor const bool read_packet_type_separately = false; - const bool allow_retries_in_cluster_requests = false; - std::unordered_set replica_has_processed_data; /// Send all scalars to remote servers diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index bd9c0f4966e4..2fe4cab35003 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -22,13 +22,11 @@ namespace ErrorCodes RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, - bool read_packet_type_separately_, - bool allow_retries_in_cluster_requests_) + bool read_packet_type_separately_) : AsyncTaskExecutor(std::make_unique(*this)) , executor(executor_) , suspend_when_query_sent(suspend_when_query_sent_) , read_packet_type_separately(read_packet_type_separately_) - , allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_) { if (-1 == pipe2(pipe_fd, O_NONBLOCK)) throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe"); @@ -63,46 +61,38 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus { while (true) { - try - { - read_context.has_read_packet_part = PacketPart::None; - - if (read_context.read_packet_type_separately) - { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; - suspend_callback(); - } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; - if (read_context.packet.type == Protocol::Server::Data) - read_context.has_data_packets = true; - } - catch (const Exception & e) + read_context.has_read_packet_part = PacketPart::None; + + if (read_context.read_packet_type_separately) { - /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. - /// If initiator did not process any data packets before, this fact can be ignored. - /// Unprocessed tasks will be executed on other nodes. - if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF - && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards()) - { - read_context.has_read_packet_part = PacketPart::None; - } - else - throw; + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; + suspend_callback(); } + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Body; + if (read_context.packet.type == Protocol::Server::Data) + read_context.has_data_packets = true; suspend_callback(); } } - catch (const Exception &) + catch (const Exception & e) { - if (!read_context.allow_retries_in_cluster_requests) + /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. + /// If initiator did not process any data packets before, this fact can be ignored. + /// Unprocessed tasks will be executed on other nodes. + if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF + && !read_context.has_data_packets.load() + && read_context.executor.skipUnavailableShards()) + { + read_context.packet.type = Protocol::Server::ConnectionLost; + read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); + read_context.has_read_packet_part = PacketPart::Body; + suspend_callback(); + } + else throw; - read_context.packet.type = Protocol::Server::ConnectionLost; - read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); - read_context.has_read_packet_part = PacketPart::Body; - suspend_callback(); } } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index 82bb28f81264..16ff5dc67a69 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor explicit RemoteQueryExecutorReadContext( RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, - bool read_packet_type_separately_, - bool allow_retries_in_cluster_requests_); + bool read_packet_type_separately_); ~RemoteQueryExecutorReadContext() override; @@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor bool suspend_when_query_sent = false; bool is_query_sent = false; const bool read_packet_type_separately = false; - const bool allow_retries_in_cluster_requests = false; }; }