Skip to content

Commit f6163fc

Browse files
authored
feat: Add exchange.max-buffer-size to system config (#26488)
Summary: Expose the velox query config property to prestissimo system config Differential Revision: D85894641 ``` == NO RELEASE NOTE == ```
1 parent 4781438 commit f6163fc

File tree

4 files changed

+120
-0
lines changed

4 files changed

+120
-0
lines changed

presto-native-execution/presto_cpp/main/PrestoToVeloxQueryConfig.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ void updateFromSystemConfigs(
153153
{std::string{SystemConfig::kTaskPartitionedWriterCount},
154154
velox::core::QueryConfig::kTaskPartitionedWriterCount},
155155

156+
{std::string{SystemConfig::kExchangeMaxBufferSize},
157+
velox::core::QueryConfig::kMaxExchangeBufferSize},
158+
156159
{std::string(SystemConfig::kSinkMaxBufferSize),
157160
velox::core::QueryConfig::kMaxOutputBufferSize,
158161
[](const auto& value) {

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ SystemConfig::SystemConfig() {
242242
BOOL_PROP(kExchangeEnableConnectionPool, true),
243243
BOOL_PROP(kExchangeEnableBufferCopy, true),
244244
BOOL_PROP(kExchangeImmediateBufferTransfer, true),
245+
NUM_PROP(kExchangeMaxBufferSize, 32UL << 20),
245246
NUM_PROP(kTaskRunTimeSliceMicros, 50'000),
246247
BOOL_PROP(kIncludeNodeInSpillPath, false),
247248
NUM_PROP(kOldTaskCleanUpMs, 60'000),
@@ -898,6 +899,10 @@ bool SystemConfig::exchangeImmediateBufferTransfer() const {
898899
return optionalProperty<bool>(kExchangeImmediateBufferTransfer).value();
899900
}
900901

902+
uint64_t SystemConfig::exchangeMaxBufferSize() const {
903+
return optionalProperty<uint64_t>(kExchangeMaxBufferSize).value();
904+
}
905+
901906
int32_t SystemConfig::taskRunTimeSliceMicros() const {
902907
return optionalProperty<int32_t>(kTaskRunTimeSliceMicros).value();
903908
}

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,11 @@ class SystemConfig : public ConfigBase {
710710
kExchangeHttpClientNumCpuThreadsHwMultiplier{
711711
"exchange.http-client.num-cpu-threads-hw-multiplier"};
712712

713+
/// Maximum size in bytes to accumulate in ExchangeQueue. Enforced
714+
/// approximately, not strictly.
715+
static constexpr std::string_view kExchangeMaxBufferSize{
716+
"exchange.max-buffer-size"};
717+
713718
/// The maximum timeslice for a task on thread if there are threads queued.
714719
static constexpr std::string_view kTaskRunTimeSliceMicros{
715720
"task-run-timeslice-micros"};
@@ -1067,6 +1072,8 @@ class SystemConfig : public ConfigBase {
10671072

10681073
bool exchangeImmediateBufferTransfer() const;
10691074

1075+
uint64_t exchangeMaxBufferSize() const;
1076+
10701077
int32_t taskRunTimeSliceMicros() const;
10711078

10721079
bool includeNodeInSpillPath() const;

presto-native-execution/presto_cpp/main/tests/PrestoToVeloxQueryConfigTest.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,3 +571,108 @@ TEST_F(PrestoToVeloxQueryConfigTest, sessionStartTimeConfiguration) {
571571
EXPECT_EQ(
572572
std::numeric_limits<int64_t>::max(), veloxConfig5.sessionStartTimeMs());
573573
}
574+
575+
TEST_F(PrestoToVeloxQueryConfigTest, systemConfigsWithoutSessionOverride) {
576+
// Verifies system configs are properly applied when no session properties
577+
// override them. Uses exact count matching to catch any config additions or
578+
// removals.
579+
580+
auto session = createBasicSession();
581+
session.systemProperties.clear();
582+
auto veloxConfigs = toVeloxConfigs(session);
583+
584+
struct SystemConfigMapping {
585+
std::string veloxConfigKey;
586+
std::string systemConfigKey;
587+
};
588+
589+
// MUST match veloxToPrestoConfigMapping in PrestoToVeloxQueryConfig.cpp
590+
std::vector<SystemConfigMapping> expectedMappings = {
591+
{core::QueryConfig::kQueryMaxMemoryPerNode,
592+
std::string(SystemConfig::kQueryMaxMemoryPerNode)},
593+
{core::QueryConfig::kSpillFileCreateConfig,
594+
std::string(SystemConfig::kSpillerFileCreateConfig)},
595+
{core::QueryConfig::kSpillEnabled,
596+
std::string(SystemConfig::kSpillEnabled)},
597+
{core::QueryConfig::kJoinSpillEnabled,
598+
std::string(SystemConfig::kJoinSpillEnabled)},
599+
{core::QueryConfig::kOrderBySpillEnabled,
600+
std::string(SystemConfig::kOrderBySpillEnabled)},
601+
{core::QueryConfig::kAggregationSpillEnabled,
602+
std::string(SystemConfig::kAggregationSpillEnabled)},
603+
{core::QueryConfig::kRequestDataSizesMaxWaitSec,
604+
std::string(SystemConfig::kRequestDataSizesMaxWaitSec)},
605+
{core::QueryConfig::kMaxSplitPreloadPerDriver,
606+
std::string(SystemConfig::kDriverMaxSplitPreload)},
607+
{core::QueryConfig::kMaxLocalExchangePartitionBufferSize,
608+
std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize)},
609+
{core::QueryConfig::kPrestoArrayAggIgnoreNulls,
610+
std::string(SystemConfig::kUseLegacyArrayAgg)},
611+
{core::QueryConfig::kTaskWriterCount,
612+
std::string(SystemConfig::kTaskWriterCount)},
613+
{core::QueryConfig::kTaskPartitionedWriterCount,
614+
std::string(SystemConfig::kTaskPartitionedWriterCount)},
615+
{core::QueryConfig::kMaxExchangeBufferSize,
616+
std::string(SystemConfig::kExchangeMaxBufferSize)},
617+
{core::QueryConfig::kMaxOutputBufferSize,
618+
std::string(SystemConfig::kSinkMaxBufferSize)},
619+
{core::QueryConfig::kMaxPartitionedOutputBufferSize,
620+
std::string(SystemConfig::kDriverMaxPagePartitioningBufferSize)},
621+
{core::QueryConfig::kMaxPartialAggregationMemory,
622+
std::string(SystemConfig::kTaskMaxPartialAggregationMemory)},
623+
};
624+
625+
const size_t kExpectedSystemConfigMappingCount = 16;
626+
EXPECT_EQ(kExpectedSystemConfigMappingCount, expectedMappings.size())
627+
<< "Update expectedMappings to match veloxToPrestoConfigMapping";
628+
629+
// Verify each system config mapping is present when it has a value
630+
auto* systemConfig = SystemConfig::instance();
631+
for (const auto& mapping : expectedMappings) {
632+
auto systemValue = systemConfig->optionalProperty(mapping.systemConfigKey);
633+
if (systemValue.hasValue()) {
634+
EXPECT_TRUE(veloxConfigs.count(mapping.veloxConfigKey) > 0)
635+
<< "Expected '" << mapping.veloxConfigKey << "' when system config '"
636+
<< mapping.systemConfigKey << "' = " << systemValue.value();
637+
}
638+
}
639+
640+
// Verify special case configs (always added)
641+
EXPECT_TRUE(
642+
veloxConfigs.count(core::QueryConfig::kAdjustTimestampToTimezone) > 0);
643+
EXPECT_EQ(
644+
"true", veloxConfigs.at(core::QueryConfig::kAdjustTimestampToTimezone));
645+
646+
EXPECT_TRUE(
647+
veloxConfigs.count(core::QueryConfig::kDriverCpuTimeSliceLimitMs) > 0);
648+
EXPECT_EQ(
649+
"1000", veloxConfigs.at(core::QueryConfig::kDriverCpuTimeSliceLimitMs));
650+
651+
// Verify session-specific configs
652+
EXPECT_TRUE(veloxConfigs.count(core::QueryConfig::kSessionStartTime) > 0);
653+
EXPECT_EQ(
654+
"1234567890", veloxConfigs.at(core::QueryConfig::kSessionStartTime));
655+
656+
// Calculate expected exact count
657+
size_t expectedExactConfigs = 0;
658+
for (const auto& mapping : expectedMappings) {
659+
if (systemConfig->optionalProperty(mapping.systemConfigKey).hasValue()) {
660+
expectedExactConfigs++;
661+
}
662+
}
663+
expectedExactConfigs += 2; // kAdjustTimestampToTimezone,
664+
// kDriverCpuTimeSliceLimitMs
665+
expectedExactConfigs += 1; // kSessionStartTime
666+
667+
// Use exact matching to catch any config additions/removals
668+
EXPECT_EQ(veloxConfigs.size(), expectedExactConfigs)
669+
<< "Config count mismatch indicates mapping change. Expected "
670+
<< expectedExactConfigs << ", got " << veloxConfigs.size();
671+
672+
// Debug output
673+
std::cout << "System configs (no session overrides):" << std::endl;
674+
for (const auto& [key, value] : veloxConfigs) {
675+
std::cout << " " << key << " = " << value << std::endl;
676+
}
677+
std::cout << "Total: " << veloxConfigs.size() << std::endl;
678+
}

0 commit comments

Comments
 (0)