25
25
import io .opentelemetry .context .Context ;
26
26
import io .opentelemetry .context .Scope ;
27
27
import io .opentelemetry .instrumentation .api .annotation .support .async .AsyncOperationEndStrategies ;
28
+ import io .opentelemetry .instrumentation .api .internal .GuardedBy ;
28
29
import io .reactivex .rxjava3 .core .Completable ;
29
30
import io .reactivex .rxjava3 .core .CompletableObserver ;
30
31
import io .reactivex .rxjava3 .core .Flowable ;
40
41
import io .reactivex .rxjava3 .parallel .ParallelFlowable ;
41
42
import io .reactivex .rxjava3 .plugins .RxJavaPlugins ;
42
43
import javax .annotation .Nullable ;
43
- import org .checkerframework .checker .lock .qual .GuardedBy ;
44
44
import org .reactivestreams .Subscriber ;
45
45
46
46
/**
@@ -158,6 +158,7 @@ public void disable() {
158
158
}
159
159
}
160
160
161
+ @ GuardedBy ("TracingAssembly.class" )
161
162
@ SuppressWarnings ({"rawtypes" , "unchecked" })
162
163
private static void enableParallel () {
163
164
oldOnParallelAssembly = RxJavaPlugins .getOnParallelAssembly ();
@@ -167,6 +168,7 @@ private static void enableParallel() {
167
168
parallelFlowable -> new TracingParallelFlowable (parallelFlowable , Context .current ())));
168
169
}
169
170
171
+ @ GuardedBy ("TracingAssembly.class" )
170
172
private static void enableCompletable () {
171
173
oldOnCompletableSubscribe = RxJavaPlugins .getOnCompletableSubscribe ();
172
174
RxJavaPlugins .setOnCompletableSubscribe (
@@ -180,6 +182,7 @@ private static void enableCompletable() {
180
182
}));
181
183
}
182
184
185
+ @ GuardedBy ("TracingAssembly.class" )
183
186
@ SuppressWarnings ({"rawtypes" , "unchecked" })
184
187
private static void enableFlowable () {
185
188
oldOnFlowableSubscribe = RxJavaPlugins .getOnFlowableSubscribe ();
@@ -199,6 +202,7 @@ private static void enableFlowable() {
199
202
}));
200
203
}
201
204
205
+ @ GuardedBy ("TracingAssembly.class" )
202
206
@ SuppressWarnings ({"rawtypes" , "unchecked" })
203
207
private static void enableObservable () {
204
208
oldOnObservableSubscribe = RxJavaPlugins .getOnObservableSubscribe ();
@@ -213,6 +217,7 @@ private static void enableObservable() {
213
217
}));
214
218
}
215
219
220
+ @ GuardedBy ("TracingAssembly.class" )
216
221
@ SuppressWarnings ({"rawtypes" , "unchecked" })
217
222
private static void enableSingle () {
218
223
oldOnSingleSubscribe = RxJavaPlugins .getOnSingleSubscribe ();
@@ -227,6 +232,7 @@ private static void enableSingle() {
227
232
}));
228
233
}
229
234
235
+ @ GuardedBy ("TracingAssembly.class" )
230
236
@ SuppressWarnings ({"rawtypes" , "unchecked" })
231
237
private static void enableMaybe () {
232
238
oldOnMaybeSubscribe = RxJavaPlugins .getOnMaybeSubscribe ();
@@ -254,31 +260,37 @@ private static void enableWithSpanStrategy(boolean captureExperimentalSpanAttrib
254
260
AsyncOperationEndStrategies .instance ().registerStrategy (asyncOperationEndStrategy );
255
261
}
256
262
263
+ @ GuardedBy ("TracingAssembly.class" )
257
264
private static void disableParallel () {
258
265
RxJavaPlugins .setOnParallelAssembly (oldOnParallelAssembly );
259
266
oldOnParallelAssembly = null ;
260
267
}
261
268
269
+ @ GuardedBy ("TracingAssembly.class" )
262
270
private static void disableObservable () {
263
271
RxJavaPlugins .setOnObservableSubscribe (oldOnObservableSubscribe );
264
272
oldOnObservableSubscribe = null ;
265
273
}
266
274
275
+ @ GuardedBy ("TracingAssembly.class" )
267
276
private static void disableCompletable () {
268
277
RxJavaPlugins .setOnCompletableSubscribe (oldOnCompletableSubscribe );
269
278
oldOnCompletableSubscribe = null ;
270
279
}
271
280
281
+ @ GuardedBy ("TracingAssembly.class" )
272
282
private static void disableFlowable () {
273
283
RxJavaPlugins .setOnFlowableSubscribe (oldOnFlowableSubscribe );
274
284
oldOnFlowableSubscribe = null ;
275
285
}
276
286
287
+ @ GuardedBy ("TracingAssembly.class" )
277
288
private static void disableSingle () {
278
289
RxJavaPlugins .setOnSingleSubscribe (oldOnSingleSubscribe );
279
290
oldOnSingleSubscribe = null ;
280
291
}
281
292
293
+ @ GuardedBy ("TracingAssembly.class" )
282
294
@ SuppressWarnings ({"rawtypes" , "unchecked" })
283
295
private static void disableMaybe () {
284
296
RxJavaPlugins .setOnMaybeSubscribe (
0 commit comments