Skip to content

Commit

Permalink
Only log null spooling stats in FTE mode
Browse files Browse the repository at this point in the history
In pipelined mode, the spooling stats are always null, which fills the log needlessly.

#20214 (comment)
  • Loading branch information
dekimir authored and wendigo committed Dec 26, 2023
1 parent 457df6e commit c106cf5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.metadata.Split;
import io.trino.operator.RetryPolicy;
import io.trino.operator.TaskStats;
import io.trino.server.DynamicFilterService;
import io.trino.server.FailTaskRequest;
Expand Down Expand Up @@ -110,6 +111,7 @@
import static io.trino.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask;
import static io.trino.SystemSessionProperties.getRemoteTaskGuaranteedSplitsPerRequest;
import static io.trino.SystemSessionProperties.getRemoteTaskRequestSizeHeadroom;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.isRemoteTaskAdaptiveUpdateRequestSizeEnabled;
import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTERS_VERSION;
import static io.trino.execution.TaskInfo.createInitialTask;
Expand Down Expand Up @@ -343,6 +345,7 @@ public HttpRemoteTask(
errorScheduledExecutor,
stats);

RetryPolicy retryPolicy = getRetryPolicy(session);
this.taskInfoFetcher = new TaskInfoFetcher(
this::fatalUnacknowledgedFailure,
taskStatusFetcher,
Expand All @@ -357,7 +360,8 @@ public HttpRemoteTask(
updateScheduledExecutor,
errorScheduledExecutor,
stats,
estimatedMemory);
estimatedMemory,
retryPolicy);

taskStatusFetcher.addStateChangeListener(newStatus -> {
TaskState state = newStatus.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.operator.RetryPolicy;

import java.net.URI;
import java.util.Optional;
Expand All @@ -52,6 +53,7 @@
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.Request.Builder.prepareGet;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.operator.RetryPolicy.TASK;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -81,6 +83,8 @@ public class TaskInfoFetcher

private final AtomicReference<SpoolingOutputStats.Snapshot> spoolingOutputStats = new AtomicReference<>();

private final RetryPolicy retryPolicy;

@GuardedBy("this")
private boolean running;

Expand All @@ -104,7 +108,8 @@ public TaskInfoFetcher(
ScheduledExecutorService updateScheduledExecutor,
ScheduledExecutorService errorScheduledExecutor,
RemoteTaskStats stats,
Optional<DataSize> estimatedMemory)
Optional<DataSize> estimatedMemory,
RetryPolicy retryPolicy)
{
requireNonNull(initialTask, "initialTask is null");
requireNonNull(errorScheduledExecutor, "errorScheduledExecutor is null");
Expand All @@ -127,6 +132,7 @@ public TaskInfoFetcher(
this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
this.stats = requireNonNull(stats, "stats is null");
this.estimatedMemory = requireNonNull(estimatedMemory, "estimatedMemory is null");
this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null");
}

public TaskInfo getTaskInfo()
Expand Down Expand Up @@ -268,7 +274,7 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo)

if (newTaskInfo.getTaskStatus().getState().isDone()) {
boolean wasSet = spoolingOutputStats.compareAndSet(null, newTaskInfo.getOutputBuffers().getSpoolingOutputStats().orElse(null));
if (wasSet && spoolingOutputStats.get() == null) {
if (retryPolicy == TASK && wasSet && spoolingOutputStats.get() == null) {
log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail.", taskId);
}
newTaskInfo = newTaskInfo.pruneSpoolingOutputStats();
Expand Down

0 comments on commit c106cf5

Please sign in to comment.