Skip to content

Commit f8e82c2

Browse files
authored
[Chore][Master] Supplemental workflow instance trace id in log (#17605)
1 parent 1394d58 commit f8e82c2

File tree

2 files changed

+100
-54
lines changed

2 files changed

+100
-54
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2929
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
3030
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
31+
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
3132
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3233
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
3334
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
@@ -129,10 +130,15 @@ public void run() {
129130

130131
List<CompletableFuture<Void>> allCompleteFutures = new ArrayList<>();
131132
for (Command command : commands) {
132-
CompletableFuture<Void> completableFuture = bootstrapCommand(command)
133+
CompletableFuture<Void> completableFuture = supplyAsync(() -> {
134+
LogUtils.setWorkflowInstanceIdMDC(command.getWorkflowInstanceId());
135+
return command;
136+
}, commandHandleThreadPool)
137+
.thenApply(this::bootstrapCommand)
133138
.thenAccept(this::bootstrapWorkflowExecutionRunnable)
134139
.thenAccept((unused) -> bootstrapSuccess(command))
135-
.exceptionally(throwable -> bootstrapError(command, throwable));
140+
.exceptionally(throwable -> bootstrapError(command, throwable))
141+
.whenComplete((result, throwable) -> LogUtils.removeWorkflowInstanceIdMDC());
136142
allCompleteFutures.add(completableFuture);
137143
}
138144
CompletableFuture.allOf(allCompleteFutures.toArray(new CompletableFuture[0])).join();
@@ -148,14 +154,14 @@ public void run() {
148154
}
149155
}
150156

151-
private CompletableFuture<IWorkflowExecutionRunnable> bootstrapCommand(Command command) {
152-
return supplyAsync(
153-
() -> workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command), commandHandleThreadPool);
157+
private IWorkflowExecutionRunnable bootstrapCommand(Command command) {
158+
return workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command);
154159
}
155160

156161
private CompletableFuture<Void> bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable) {
157162
final WorkflowInstance workflowInstance =
158163
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
164+
159165
if (workflowInstance.getState() == WorkflowExecutionStatus.SERIAL_WAIT) {
160166
log.info("The workflow {} state is: {} will not be trigger now",
161167
workflowInstance.getName(),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java

Lines changed: 89 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.dolphinscheduler.server.master.rpc;
1919

2020
import org.apache.dolphinscheduler.extract.master.ITaskExecutorEventListener;
21+
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2122
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
2223
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2324
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -53,79 +54,118 @@ public class TaskExecutorEventListenerImpl implements ITaskExecutorEventListener
5354

5455
@Override
5556
public void onTaskExecutorDispatched(final TaskExecutorDispatchedLifecycleEvent taskExecutorDispatchedLifecycleEvent) {
56-
final ITaskExecutionRunnable taskExecutionRunnable =
57-
getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
58-
final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent = TaskDispatchedLifecycleEvent.builder()
59-
.taskExecutionRunnable(taskExecutionRunnable)
60-
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
61-
.build();
62-
63-
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
57+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId());
58+
try {
59+
final ITaskExecutionRunnable taskExecutionRunnable =
60+
getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
61+
final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent = TaskDispatchedLifecycleEvent.builder()
62+
.taskExecutionRunnable(taskExecutionRunnable)
63+
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
64+
.build();
65+
66+
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
67+
} finally {
68+
LogUtils.removeWorkflowInstanceIdMDC();
69+
}
6470
}
6571

6672
@Override
6773
public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskExecutorStartedLifecycleEvent) {
68-
final ITaskExecutionRunnable taskExecutionRunnable =
69-
getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
70-
final TaskRunningLifecycleEvent taskRunningEvent = TaskRunningLifecycleEvent.builder()
71-
.taskExecutionRunnable(taskExecutionRunnable)
72-
.startTime(new Date(taskExecutorStartedLifecycleEvent.getStartTime()))
73-
.logPath(taskExecutorStartedLifecycleEvent.getLogPath())
74-
.build();
75-
76-
taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
74+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorStartedLifecycleEvent.getWorkflowInstanceId());
75+
try {
76+
final ITaskExecutionRunnable taskExecutionRunnable =
77+
getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
78+
final TaskRunningLifecycleEvent taskRunningEvent = TaskRunningLifecycleEvent.builder()
79+
.taskExecutionRunnable(taskExecutionRunnable)
80+
.startTime(new Date(taskExecutorStartedLifecycleEvent.getStartTime()))
81+
.logPath(taskExecutorStartedLifecycleEvent.getLogPath())
82+
.build();
83+
84+
taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
85+
} finally {
86+
LogUtils.removeWorkflowInstanceIdMDC();
87+
}
7788
}
7889

7990
@Override
8091
public void onTaskExecutorRuntimeContextChanged(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorRuntimeContextChangedLifecycleEventr) {
81-
final ITaskExecutionRunnable taskExecutionRunnable =
82-
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
83-
84-
final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent = TaskRuntimeContextChangedEvent.builder()
85-
.taskExecutionRunnable(taskExecutionRunnable)
86-
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
87-
.build();
88-
89-
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
92+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorRuntimeContextChangedLifecycleEventr.getWorkflowInstanceId());
93+
try {
94+
final ITaskExecutionRunnable taskExecutionRunnable =
95+
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
96+
97+
final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent =
98+
TaskRuntimeContextChangedEvent.builder()
99+
.taskExecutionRunnable(taskExecutionRunnable)
100+
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
101+
.build();
102+
103+
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
104+
} finally {
105+
LogUtils.removeWorkflowInstanceIdMDC();
106+
}
90107
}
91108

92109
@Override
93110
public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent taskExecutorSuccessLifecycleEvent) {
94-
final ITaskExecutionRunnable taskExecutionRunnable =
95-
getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
96-
final TaskSuccessLifecycleEvent taskSuccessEvent = TaskSuccessLifecycleEvent.builder()
97-
.taskExecutionRunnable(taskExecutionRunnable)
98-
.endTime(new Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
99-
.varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
100-
.build();
101-
taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
111+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId());
112+
try {
113+
final ITaskExecutionRunnable taskExecutionRunnable =
114+
getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
115+
final TaskSuccessLifecycleEvent taskSuccessEvent = TaskSuccessLifecycleEvent.builder()
116+
.taskExecutionRunnable(taskExecutionRunnable)
117+
.endTime(new Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
118+
.varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
119+
.build();
120+
taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
121+
} finally {
122+
LogUtils.removeWorkflowInstanceIdMDC();
123+
}
102124
}
103125

104126
@Override
105127
public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent taskExecutorFailedLifecycleEvent) {
106-
final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
107-
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
108-
.taskExecutionRunnable(taskExecutionRunnable)
109-
.endTime(new Date(taskExecutorFailedLifecycleEvent.getEndTime()))
110-
.build();
111-
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
128+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorFailedLifecycleEvent.getWorkflowInstanceId());
129+
try {
130+
final ITaskExecutionRunnable taskExecutionRunnable =
131+
getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
132+
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
133+
.taskExecutionRunnable(taskExecutionRunnable)
134+
.endTime(new Date(taskExecutorFailedLifecycleEvent.getEndTime()))
135+
.build();
136+
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
137+
} finally {
138+
LogUtils.removeWorkflowInstanceIdMDC();
139+
}
112140
}
113141

114142
@Override
115143
public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent taskExecutorKilledLifecycleEvent) {
116-
final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
117-
final TaskKilledLifecycleEvent taskKilledEvent = TaskKilledLifecycleEvent.builder()
118-
.taskExecutionRunnable(taskExecutionRunnable)
119-
.endTime(new Date(taskExecutorKilledLifecycleEvent.getEndTime()))
120-
.build();
121-
taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
144+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorKilledLifecycleEvent.getWorkflowInstanceId());
145+
try {
146+
final ITaskExecutionRunnable taskExecutionRunnable =
147+
getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
148+
final TaskKilledLifecycleEvent taskKilledEvent = TaskKilledLifecycleEvent.builder()
149+
.taskExecutionRunnable(taskExecutionRunnable)
150+
.endTime(new Date(taskExecutorKilledLifecycleEvent.getEndTime()))
151+
.build();
152+
taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
153+
} finally {
154+
LogUtils.removeWorkflowInstanceIdMDC();
155+
}
122156
}
123157

124158
@Override
125159
public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent taskExecutorPausedLifecycleEvent) {
126-
final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
127-
final TaskPausedLifecycleEvent taskPausedEvent = TaskPausedLifecycleEvent.of(taskExecutionRunnable);
128-
taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
160+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorPausedLifecycleEvent.getWorkflowInstanceId());
161+
try {
162+
final ITaskExecutionRunnable taskExecutionRunnable =
163+
getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
164+
final TaskPausedLifecycleEvent taskPausedEvent = TaskPausedLifecycleEvent.of(taskExecutionRunnable);
165+
taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
166+
} finally {
167+
LogUtils.removeWorkflowInstanceIdMDC();
168+
}
129169
}
130170

131171
private ITaskExecutionRunnable getTaskExecutionRunnable(final IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent) {

0 commit comments

Comments
 (0)