Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7103,9 +7103,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
Allow retries in cluster request, when one node goes offline
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
Expand Down Expand Up @@ -7227,6 +7224,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren
MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \
MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \
MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
/** The section above is for obsolete settings. Do not add anything there. */
#endif /// __CLION_IDE__

Expand Down
21 changes: 7 additions & 14 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ namespace Setting
extern const SettingsBool use_hedged_requests;
extern const SettingsBool push_external_roles_in_interserver_queries;
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
extern const SettingsBool allow_retries_in_cluster_requests;
}

namespace ErrorCodes
Expand Down Expand Up @@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, extension(extension_)
, priority_func(priority_func_)
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
{
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
Expand Down Expand Up @@ -468,8 +466,7 @@ int RemoteQueryExecutor::sendQueryAsync()
read_context = std::make_unique<ReadContext>(
*this,
/*suspend_when_query_sent*/ true,
read_packet_type_separately,
allow_retries_in_cluster_requests);
read_packet_type_separately);

/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
/// because we can still be in process of sending scalars or external tables.
Expand Down Expand Up @@ -542,8 +539,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context = std::make_unique<ReadContext>(
*this,
/*suspend_when_query_sent*/ false,
read_packet_type_separately,
allow_retries_in_cluster_requests);
read_packet_type_separately);
recreate_read_context = false;
}

Expand Down Expand Up @@ -734,16 +730,13 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
break;

case Protocol::Server::ConnectionLost:
if (allow_retries_in_cluster_requests)
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
{
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
{
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
{
finished = true;
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
return ReadResult(Block{});
}
finished = true;
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
return ReadResult(Block{});
}
}
packet.exception->rethrow();
Expand Down
2 changes: 0 additions & 2 deletions src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,6 @@ class RemoteQueryExecutor

const bool read_packet_type_separately = false;

const bool allow_retries_in_cluster_requests = false;

std::unordered_set<size_t> replica_has_processed_data;

/// Send all scalars to remote servers
Expand Down
60 changes: 25 additions & 35 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ namespace ErrorCodes
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
RemoteQueryExecutor & executor_,
bool suspend_when_query_sent_,
bool read_packet_type_separately_,
bool allow_retries_in_cluster_requests_)
bool read_packet_type_separately_)
: AsyncTaskExecutor(std::make_unique<Task>(*this))
, executor(executor_)
, suspend_when_query_sent(suspend_when_query_sent_)
, read_packet_type_separately(read_packet_type_separately_)
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
{
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
Expand Down Expand Up @@ -63,46 +61,38 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
{
while (true)
{
try
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
if (read_context.packet.type == Protocol::Server::Data)
read_context.has_data_packets = true;
}
catch (const Exception & e)
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
/// If initiator did not process any data packets before, this fact can be ignored.
/// Unprocessed tasks will be executed on other nodes.
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
{
read_context.has_read_packet_part = PacketPart::None;
}
else
throw;
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
if (read_context.packet.type == Protocol::Server::Data)
read_context.has_data_packets = true;

suspend_callback();
}
}
catch (const Exception &)
catch (const Exception & e)
{
if (!read_context.allow_retries_in_cluster_requests)
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
/// If initiator did not process any data packets before, this fact can be ignored.
/// Unprocessed tasks will be executed on other nodes.
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
&& !read_context.has_data_packets.load()
&& read_context.executor.skipUnavailableShards())
{
read_context.packet.type = Protocol::Server::ConnectionLost;
Comment on lines +85 to +89

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve skip_unavailable_shards behavior on early EOF

With skip_unavailable_shards enabled, an ATTEMPT_TO_READ_AFTER_EOF before any data now produces a ConnectionLost packet that is rethrown in RemoteQueryExecutor (case Protocol::Server::ConnectionLost around lines 732–742 of RemoteQueryExecutor.cpp). The previous logic swallowed this exception when no data had been read so other shards could continue. This regression causes queries that should skip an unreachable shard to fail whenever the shard drops the socket before sending data.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is rethrown only when tasks were not rescheduled on other replicas. Without rescheduling tasks can be lost forever, and response contain only part of data.

read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
read_context.has_read_packet_part = PacketPart::Body;
suspend_callback();
}
else
throw;
read_context.packet.type = Protocol::Server::ConnectionLost;
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
read_context.has_read_packet_part = PacketPart::Body;
suspend_callback();
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
explicit RemoteQueryExecutorReadContext(
RemoteQueryExecutor & executor_,
bool suspend_when_query_sent_,
bool read_packet_type_separately_,
bool allow_retries_in_cluster_requests_);
bool read_packet_type_separately_);

~RemoteQueryExecutorReadContext() override;

Expand Down Expand Up @@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
bool suspend_when_query_sent = false;
bool is_query_sent = false;
const bool read_packet_type_separately = false;
const bool allow_retries_in_cluster_requests = false;
};

}
Expand Down
Loading