Skip to content

Commit 844c28c

Browse files
crossoverJiesteveraoheyams
authored
Add support for PowerJob (#12086)
Co-authored-by: Steve Rao <[email protected]> Co-authored-by: Helen <[email protected]>
1 parent 7ecc678 commit 844c28c

16 files changed

+851
-0
lines changed

docs/supported-libraries.md

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ These are the supported libraries and frameworks:
107107
| [OSHI](https://github.com/oshi/oshi/) | 5.3.1+ | [opentelemetry-oshi](../instrumentation/oshi/library) | [System Metrics] (partial support) |
108108
| [Play MVC](https://github.com/playframework/playframework) | 2.4+ | N/A | Provides `http.route` [2], Controller Spans [3] |
109109
| [Play WS](https://github.com/playframework/play-ws) | 1.0+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
110+
| [PowerJob](http://www.powerjob.tech/) | 4.0.0+ | N/A | none |
110111
| [Quarkus Resteasy Reactive](https://quarkus.io/extensions/io.quarkus/quarkus-resteasy-reactive/) | 2.16.7+ | N/A | Provides `http.route` [2] |
111112
| [Quartz](https://www.quartz-scheduler.org/) | 2.0+ | [opentelemetry-quartz-2.0](../instrumentation/quartz-2.0/library) | none |
112113
| [R2DBC](https://r2dbc.io/) | 1.0+ | [opentelemetry-r2dbc-1.0](../instrumentation/r2dbc-1.0/library) | [Database Client Spans] |
+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Settings for the PowerJob instrumentation
2+
3+
| System property | Type | Default | Description |
4+
|--------------------------------------------------------------|---------|---------|-----------------------------------------------------|
5+
| `otel.instrumentation.powerjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("tech.powerjob")
8+
module.set("powerjob-worker")
9+
versions.set("[4.0.0,)")
10+
assertInverse.set(true)
11+
extraDependency("tech.powerjob:powerjob-official-processors:1.1.0")
12+
}
13+
}
14+
15+
dependencies {
16+
library("tech.powerjob:powerjob-worker:4.0.0")
17+
library("tech.powerjob:powerjob-official-processors:1.1.0")
18+
}
19+
20+
tasks.withType<Test>().configureEach {
21+
// required on jdk17
22+
jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED")
23+
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
24+
jvmArgs("-Dotel.instrumentation.powerjob.experimental-span-attributes=true")
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
7+
8+
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
9+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
10+
import static io.opentelemetry.javaagent.instrumentation.powerjob.v4_0.PowerJobSingletons.instrumenter;
11+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
12+
import static net.bytebuddy.matcher.ElementMatchers.named;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
14+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
15+
16+
import io.opentelemetry.context.Context;
17+
import io.opentelemetry.context.Scope;
18+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
19+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
20+
import net.bytebuddy.asm.Advice;
21+
import net.bytebuddy.description.type.TypeDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import tech.powerjob.worker.core.processor.ProcessResult;
24+
import tech.powerjob.worker.core.processor.TaskContext;
25+
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
26+
27+
public class BasicProcessorInstrumentation implements TypeInstrumentation {
28+
@Override
29+
public ElementMatcher<TypeDescription> typeMatcher() {
30+
return implementsInterface(named("tech.powerjob.worker.core.processor.sdk.BasicProcessor"));
31+
}
32+
33+
@Override
34+
public void transform(TypeTransformer transformer) {
35+
transformer.applyAdviceToMethod(
36+
named("process")
37+
.and(isPublic())
38+
.and(
39+
takesArguments(1)
40+
.and(
41+
takesArgument(
42+
0, named("tech.powerjob.worker.core.processor.TaskContext")))),
43+
BasicProcessorInstrumentation.class.getName() + "$ProcessAdvice");
44+
}
45+
46+
public static class ProcessAdvice {
47+
48+
@SuppressWarnings("unused")
49+
@Advice.OnMethodEnter(suppress = Throwable.class)
50+
public static void onSchedule(
51+
@Advice.This BasicProcessor handler,
52+
@Advice.Argument(0) TaskContext taskContext,
53+
@Advice.Local("otelRequest") PowerJobProcessRequest request,
54+
@Advice.Local("otelContext") Context context,
55+
@Advice.Local("otelScope") Scope scope) {
56+
Context parentContext = currentContext();
57+
request =
58+
PowerJobProcessRequest.createRequest(
59+
taskContext.getJobId(),
60+
handler,
61+
"process",
62+
taskContext.getJobParams(),
63+
taskContext.getInstanceParams());
64+
65+
if (!instrumenter().shouldStart(parentContext, request)) {
66+
return;
67+
}
68+
context = instrumenter().start(parentContext, request);
69+
scope = context.makeCurrent();
70+
}
71+
72+
@SuppressWarnings("unused")
73+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
74+
public static void stopSpan(
75+
@Advice.Return ProcessResult result,
76+
@Advice.Thrown Throwable throwable,
77+
@Advice.Local("otelRequest") PowerJobProcessRequest request,
78+
@Advice.Local("otelContext") Context context,
79+
@Advice.Local("otelScope") Scope scope) {
80+
if (scope == null) {
81+
return;
82+
}
83+
scope.close();
84+
instrumenter().end(context, request, result, throwable);
85+
}
86+
}
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
7+
8+
import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter;
9+
import javax.annotation.Nullable;
10+
11+
class PowerJobCodeAttributesGetter implements CodeAttributesGetter<PowerJobProcessRequest> {
12+
13+
@Nullable
14+
@Override
15+
public Class<?> getCodeClass(PowerJobProcessRequest powerJobProcessRequest) {
16+
return powerJobProcessRequest.getDeclaringClass();
17+
}
18+
19+
@Nullable
20+
@Override
21+
public String getMethodName(PowerJobProcessRequest powerJobProcessRequest) {
22+
return powerJobProcessRequest.getMethodName();
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
7+
8+
import io.opentelemetry.api.common.AttributeKey;
9+
import io.opentelemetry.api.common.AttributesBuilder;
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
12+
import javax.annotation.Nullable;
13+
import tech.powerjob.worker.core.processor.ProcessResult;
14+
15+
class PowerJobExperimentalAttributeExtractor
16+
implements AttributesExtractor<PowerJobProcessRequest, ProcessResult> {
17+
18+
private static final AttributeKey<Long> POWERJOB_JOB_ID =
19+
AttributeKey.longKey("scheduling.powerjob.job.id");
20+
private static final AttributeKey<String> POWERJOB_JOB_PARAM =
21+
AttributeKey.stringKey("scheduling.powerjob.job.param");
22+
private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_PARAM =
23+
AttributeKey.stringKey("scheduling.powerjob.job.instance.param");
24+
private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_TYPE =
25+
AttributeKey.stringKey("scheduling.powerjob.job.type");
26+
27+
@Override
28+
public void onStart(
29+
AttributesBuilder attributes,
30+
Context parentContext,
31+
PowerJobProcessRequest powerJobProcessRequest) {
32+
attributes.put(POWERJOB_JOB_ID, powerJobProcessRequest.getJobId());
33+
attributes.put(POWERJOB_JOB_PARAM, powerJobProcessRequest.getJobParams());
34+
attributes.put(POWERJOB_JOB_INSTANCE_PARAM, powerJobProcessRequest.getInstanceParams());
35+
attributes.put(POWERJOB_JOB_INSTANCE_TYPE, powerJobProcessRequest.getJobType());
36+
}
37+
38+
@Override
39+
public void onEnd(
40+
AttributesBuilder attributes,
41+
Context context,
42+
PowerJobProcessRequest powerJobProcessRequest,
43+
@Nullable ProcessResult unused,
44+
@Nullable Throwable error) {}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
7+
8+
import com.google.auto.service.AutoService;
9+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
10+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
11+
import java.util.Collections;
12+
import java.util.List;
13+
14+
@AutoService(InstrumentationModule.class)
15+
public class PowerJobInstrumentationModule extends InstrumentationModule {
16+
public PowerJobInstrumentationModule() {
17+
super("powerjob", "powerjob-4.0");
18+
}
19+
20+
@Override
21+
public List<TypeInstrumentation> typeInstrumentations() {
22+
return Collections.singletonList(new BasicProcessorInstrumentation());
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
7+
8+
import java.util.Arrays;
9+
import java.util.List;
10+
import tech.powerjob.official.processors.impl.FileCleanupProcessor;
11+
import tech.powerjob.official.processors.impl.HttpProcessor;
12+
import tech.powerjob.official.processors.impl.script.PythonProcessor;
13+
import tech.powerjob.official.processors.impl.script.ShellProcessor;
14+
import tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor;
15+
import tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor;
16+
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
17+
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
18+
import tech.powerjob.worker.core.processor.sdk.MapProcessor;
19+
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
20+
21+
public final class PowerJobProcessRequest {
22+
private final String methodName;
23+
private final Long jobId;
24+
private final String jobType;
25+
private final Class<?> declaringClass;
26+
private final String jobParams;
27+
private final String instanceParams;
28+
private static final List<Class<?>> KNOWN_PROCESSORS =
29+
Arrays.asList(
30+
FileCleanupProcessor.class,
31+
BroadcastProcessor.class,
32+
MapReduceProcessor.class,
33+
MapProcessor.class,
34+
ShellProcessor.class,
35+
PythonProcessor.class,
36+
HttpProcessor.class,
37+
SpringDatasourceSqlProcessor.class,
38+
DynamicDatasourceSqlProcessor.class);
39+
40+
private PowerJobProcessRequest(
41+
Long jobId,
42+
String methodName,
43+
Class<?> declaringClass,
44+
String jobParams,
45+
String instanceParams,
46+
String jobType) {
47+
this.jobId = jobId;
48+
this.methodName = methodName;
49+
this.jobType = jobType;
50+
this.declaringClass = declaringClass;
51+
this.jobParams = jobParams;
52+
this.instanceParams = instanceParams;
53+
}
54+
55+
public static PowerJobProcessRequest createRequest(
56+
Long jobId,
57+
BasicProcessor handler,
58+
String methodName,
59+
String jobParams,
60+
String instanceParams) {
61+
String jobType = "BasicProcessor";
62+
for (Class<?> processorClass : KNOWN_PROCESSORS) {
63+
if (processorClass.isInstance(handler)) {
64+
jobType = processorClass.getSimpleName();
65+
break;
66+
}
67+
}
68+
return new PowerJobProcessRequest(
69+
jobId, methodName, handler.getClass(), jobParams, instanceParams, jobType);
70+
}
71+
72+
public String getMethodName() {
73+
return methodName;
74+
}
75+
76+
public Long getJobId() {
77+
return jobId;
78+
}
79+
80+
public Class<?> getDeclaringClass() {
81+
return declaringClass;
82+
}
83+
84+
public String getJobParams() {
85+
return jobParams;
86+
}
87+
88+
public String getInstanceParams() {
89+
return instanceParams;
90+
}
91+
92+
public String getJobType() {
93+
return jobType;
94+
}
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.api.common.AttributeKey;
10+
import io.opentelemetry.api.trace.StatusCode;
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesExtractor;
12+
import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeSpanNameExtractor;
13+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
14+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
15+
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
16+
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
17+
import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor;
18+
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;
19+
import tech.powerjob.worker.core.processor.ProcessResult;
20+
21+
public final class PowerJobSingletons {
22+
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.powerjob-4.0";
23+
24+
private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
25+
AgentInstrumentationConfig.get()
26+
.getBoolean("otel.instrumentation.powerjob.experimental-span-attributes", false);
27+
private static final Instrumenter<PowerJobProcessRequest, ProcessResult> INSTRUMENTER = create();
28+
29+
public static Instrumenter<PowerJobProcessRequest, ProcessResult> instrumenter() {
30+
return INSTRUMENTER;
31+
}
32+
33+
private static Instrumenter<PowerJobProcessRequest, ProcessResult> create() {
34+
PowerJobCodeAttributesGetter codeAttributesGetter = new PowerJobCodeAttributesGetter();
35+
SpanNameExtractor<PowerJobProcessRequest> spanNameExtractor =
36+
CodeSpanNameExtractor.create(codeAttributesGetter);
37+
38+
InstrumenterBuilder<PowerJobProcessRequest, ProcessResult> builder =
39+
Instrumenter.<PowerJobProcessRequest, ProcessResult>builder(
40+
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
41+
.addAttributesExtractor(CodeAttributesExtractor.create(codeAttributesGetter))
42+
.setSpanStatusExtractor(
43+
(spanStatusBuilder, powerJobProcessRequest, response, error) -> {
44+
if (response != null && !response.isSuccess()) {
45+
spanStatusBuilder.setStatus(StatusCode.ERROR);
46+
} else {
47+
SpanStatusExtractor.getDefault()
48+
.extract(spanStatusBuilder, powerJobProcessRequest, response, error);
49+
}
50+
});
51+
52+
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
53+
builder.addAttributesExtractor(
54+
AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "powerjob"));
55+
builder.addAttributesExtractor(new PowerJobExperimentalAttributeExtractor());
56+
}
57+
58+
return builder.buildInstrumenter();
59+
}
60+
61+
private PowerJobSingletons() {}
62+
}

0 commit comments

Comments
 (0)