Skip to content

Commit 9901194

Browse files
yangshangqing95losipiuk
authored andcommitted
Support configurable ExchangeManager
Add support for specifying exchange manager config file location in config.
1 parent 6942821 commit 9901194

19 files changed

+134
-20
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.exchange;
15+
16+
import io.airlift.configuration.Config;
17+
import io.airlift.configuration.validation.FileExists;
18+
19+
import java.io.File;
20+
import java.util.Optional;
21+
22+
public class ExchangeManagerConfig
23+
{
24+
private Optional<File> exchangeManagerConfigFile = Optional.empty();
25+
26+
public Optional<@FileExists File> getExchangeManagerConfigFile()
27+
{
28+
return exchangeManagerConfigFile;
29+
}
30+
31+
@Config("exchange-manager.config-file")
32+
public ExchangeManagerConfig setExchangeManagerConfigFile(File exchangeManagerConfigFile)
33+
{
34+
this.exchangeManagerConfigFile = Optional.ofNullable(exchangeManagerConfigFile);
35+
return this;
36+
}
37+
}

core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import com.google.inject.Module;
1818
import com.google.inject.Scopes;
1919

20+
import static io.airlift.configuration.ConfigBinder.configBinder;
21+
2022
public class ExchangeManagerModule
2123
implements Module
2224
{
2325
@Override
2426
public void configure(Binder binder)
2527
{
28+
configBinder(binder).bindConfig(ExchangeManagerConfig.class);
2629
binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
2730
}
2831
}

core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.UncheckedIOException;
3030
import java.util.HashMap;
3131
import java.util.Map;
32+
import java.util.Optional;
3233
import java.util.concurrent.ConcurrentHashMap;
3334

3435
import static com.google.common.base.Preconditions.checkArgument;
@@ -52,16 +53,19 @@ public class ExchangeManagerRegistry
5253

5354
private volatile ExchangeManager exchangeManager;
5455
private final SecretsResolver secretsResolver;
56+
private final Optional<File> configFile;
5557

5658
@Inject
5759
public ExchangeManagerRegistry(
5860
OpenTelemetry openTelemetry,
5961
Tracer tracer,
60-
SecretsResolver secretsResolver)
62+
SecretsResolver secretsResolver,
63+
ExchangeManagerConfig config)
6164
{
6265
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
6366
this.tracer = requireNonNull(tracer, "tracer is null");
6467
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
68+
this.configFile = config.getExchangeManagerConfigFile();
6569
}
6670

6771
public void addExchangeManagerFactory(ExchangeManagerFactory factory)
@@ -74,13 +78,14 @@ public void addExchangeManagerFactory(ExchangeManagerFactory factory)
7478

7579
public void loadExchangeManager()
7680
{
77-
if (!CONFIG_FILE.exists()) {
81+
File configFile = this.configFile.orElse(CONFIG_FILE);
82+
if (!configFile.exists()) {
7883
return;
7984
}
8085

81-
Map<String, String> properties = loadProperties(CONFIG_FILE);
86+
Map<String, String> properties = loadProperties(configFile);
8287
String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY);
83-
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY);
88+
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", configFile, EXCHANGE_MANAGER_NAME_PROPERTY);
8489

8590
loadExchangeManager(name, properties);
8691
}

core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.trino.dispatcher.DispatchManager;
5151
import io.trino.eventlistener.EventListenerConfig;
5252
import io.trino.eventlistener.EventListenerManager;
53+
import io.trino.exchange.ExchangeManagerConfig;
5354
import io.trino.exchange.ExchangeManagerRegistry;
5455
import io.trino.execution.FailureInjector;
5556
import io.trino.execution.FailureInjector.InjectedFailureType;
@@ -315,6 +316,7 @@ private TestingTrinoServer(
315316
.addBinding()
316317
.to(TracingServletFilter.class);
317318
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
319+
binder.bind(ExchangeManagerConfig.class).in(Scopes.SINGLETON);
318320
binder.bind(AccessControlConfig.class).in(Scopes.SINGLETON);
319321
binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON);
320322
binder.bind(TestingGroupProvider.class).in(Scopes.SINGLETON);

core/trino-main/src/main/java/io/trino/testing/PlanTester.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.trino.cost.TaskCountEstimator;
6161
import io.trino.eventlistener.EventListenerConfig;
6262
import io.trino.eventlistener.EventListenerManager;
63+
import io.trino.exchange.ExchangeManagerConfig;
6364
import io.trino.exchange.ExchangeManagerRegistry;
6465
import io.trino.execution.DynamicFilterConfig;
6566
import io.trino.execution.NodeTaskMap;
@@ -474,7 +475,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
474475
ImmutableSet.of(),
475476
ImmutableSet.of(new ExcludeColumnsFunction()));
476477

477-
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver);
478+
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver, new ExchangeManagerConfig());
478479
spoolingManagerRegistry = new SpoolingManagerRegistry(
479480
new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false),
480481
new ServerConfig(),
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.exchange;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import org.junit.jupiter.api.Test;
18+
19+
import java.io.IOException;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.Map;
23+
24+
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
25+
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
26+
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
27+
28+
final class TestExchangeConfig
29+
{
30+
@Test
31+
void testDefaults()
32+
{
33+
assertRecordedDefaults(recordDefaults(ExchangeManagerConfig.class)
34+
.setExchangeManagerConfigFile(null));
35+
}
36+
37+
@Test
38+
void testExplicitPropertyMappings()
39+
throws IOException
40+
{
41+
Path exchangeConfig = Files.createTempFile(null, null);
42+
43+
Map<String, String> properties = ImmutableMap.of("exchange-manager.config-file", exchangeConfig.toString());
44+
45+
ExchangeManagerConfig expected = new ExchangeManagerConfig()
46+
.setExchangeManagerConfigFile(exchangeConfig.toFile());
47+
48+
assertFullMapping(properties, expected);
49+
}
50+
}

core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testIsBlockedCancellationIsolationInInitializationPhase()
4545
throw new UnsupportedOperationException();
4646
},
4747
RetryPolicy.NONE,
48-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())))) {
48+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()))) {
4949
ListenableFuture<Void> first = source.isBlocked();
5050
ListenableFuture<Void> second = source.isBlocked();
5151
assertThat(first)

core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.trino.connector.CatalogHandle;
3030
import io.trino.connector.ConnectorServices;
3131
import io.trino.connector.ConnectorServicesProvider;
32+
import io.trino.exchange.ExchangeManagerConfig;
3233
import io.trino.exchange.ExchangeManagerRegistry;
3334
import io.trino.execution.buffer.BufferResult;
3435
import io.trino.execution.buffer.BufferState;
@@ -337,7 +338,7 @@ private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig,
337338
new NodeSpillConfig(),
338339
new TestingGcMonitor(),
339340
noopTracer(),
340-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
341+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
341342
}
342343

343344
private TaskInfo createTask(SqlTaskManager sqlTaskManager, TaskId taskId, Set<ScheduledSplit> splits, OutputBuffers outputBuffers)

core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.opentelemetry.api.trace.Span;
3232
import io.trino.Session;
3333
import io.trino.cost.StatsAndCosts;
34+
import io.trino.exchange.ExchangeManagerConfig;
3435
import io.trino.exchange.ExchangeManagerRegistry;
3536
import io.trino.execution.NodeTaskMap.PartitionedSplitCountTracker;
3637
import io.trino.execution.buffer.LazyOutputBuffer;
@@ -235,7 +236,7 @@ public MockRemoteTask(
235236
DataSize.ofBytes(1),
236237
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
237238
() -> {},
238-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
239+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
239240

240241
this.fragment = requireNonNull(fragment, "fragment is null");
241242
this.nodeId = requireNonNull(nodeId, "nodeId is null");

core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.trino.connector.CatalogHandle;
2323
import io.trino.connector.CatalogServiceProvider;
2424
import io.trino.cost.StatsAndCosts;
25+
import io.trino.exchange.ExchangeManagerConfig;
2526
import io.trino.exchange.ExchangeManagerRegistry;
2627
import io.trino.execution.BaseTestSqlTaskManager.MockDirectExchangeClientSupplier;
2728
import io.trino.execution.buffer.OutputBuffers;
@@ -180,7 +181,7 @@ public static LocalExecutionPlanner createTestingPlanner()
180181
blockTypeOperators,
181182
PLANNER_CONTEXT.getTypeOperators(),
182183
new TableExecuteContextManager(),
183-
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of())),
184+
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
184185
new NodeVersion("test"),
185186
new CompilerConfig());
186187
}

0 commit comments

Comments
 (0)