Skip to content

Commit 8393bd8

Browse files
committed
1 parent aae0b59 commit 8393bd8

30 files changed

+1028
-103
lines changed

checkstyle/checkstyle.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@
223223
<property name="substitutionClassName" value="org.apache.ignite.internal.thread.IgniteScheduledThreadPoolExecutor"/>
224224
</module>
225225

226+
<module name="org.apache.ignite.tools.checkstyle.ClassUsageRestrictionRule">
227+
<property name="className" value="java.util.concurrent.CompletableFuture"/>
228+
<property name="factoryMethods" value="allOf, anyOf, supplyAsync, runAsync, completedFuture"/>
229+
<property name="substitutionClassName" value="org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture"/>
230+
</module>
231+
226232
<module name="SuppressionXpathSingleFilter">
227233
<property name="checks" value="org.apache.ignite.tools.checkstyle.ClassUsageRestrictionRule"/>
228234
<property name="files" value="DumpReader\.java|CacheLoadOnlyStoreAdapter\.java|[\\/]io[\\/]opencensus[\\/]|[\\/]jdbc[\\/]thin[\\/]|[\\/]test[\\/]|[\\/]tests[\\/]|[\\/]internal[\\/]client[\\/]|[\\/]org[\\/]apache[\\/]ignite[\\/]internal[\\/]thread[\\/]"/>

modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java

Lines changed: 407 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.thread.context.function;
19+
20+
import java.util.function.BiConsumer;
21+
import org.apache.ignite.internal.thread.context.Scope;
22+
import org.apache.ignite.internal.thread.context.ThreadContext;
23+
import org.apache.ignite.internal.thread.context.ThreadContextAwareWrapper;
24+
import org.apache.ignite.internal.thread.context.ThreadContextSnapshot;
25+
import org.jetbrains.annotations.NotNull;
26+
27+
/** */
28+
public class ThreadContextAwareBiConsumer<T, U> extends ThreadContextAwareWrapper<BiConsumer<T, U>> implements BiConsumer<T, U> {
29+
/** */
30+
public ThreadContextAwareBiConsumer(BiConsumer<T, U> delegate, ThreadContextSnapshot snapshot) {
31+
super(delegate, snapshot);
32+
}
33+
34+
/** {@inheritDoc} */
35+
@Override public void accept(T t, U u) {
36+
try (Scope ignored = ThreadContext.withSnapshot(snapshot)) {
37+
delegate.accept(t, u);
38+
}
39+
}
40+
41+
/** {@inheritDoc} */
42+
@NotNull @Override public BiConsumer<T, U> andThen(@NotNull BiConsumer<? super T, ? super U> after) {
43+
return BiConsumer.super.andThen(wrap(after));
44+
}
45+
46+
/** */
47+
public static <T, U> BiConsumer<T, U> wrap(BiConsumer<T, U> delegate) {
48+
return wrap(delegate, ThreadContextAwareBiConsumer::new);
49+
}
50+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.thread.context.function;
19+
20+
import java.util.function.BiFunction;
21+
import java.util.function.Function;
22+
import org.apache.ignite.internal.thread.context.Scope;
23+
import org.apache.ignite.internal.thread.context.ThreadContext;
24+
import org.apache.ignite.internal.thread.context.ThreadContextAwareWrapper;
25+
import org.apache.ignite.internal.thread.context.ThreadContextSnapshot;
26+
import org.jetbrains.annotations.NotNull;
27+
28+
/** */
29+
public class ThreadContextAwareBiFunction<T, U, R> extends ThreadContextAwareWrapper<BiFunction<T, U, R>> implements BiFunction<T, U, R> {
30+
/** */
31+
public ThreadContextAwareBiFunction(BiFunction<T, U, R> delegate, ThreadContextSnapshot snapshot) {
32+
super(delegate, snapshot);
33+
}
34+
35+
/** {@inheritDoc} */
36+
@Override public R apply(T t, U u) {
37+
try (Scope ignored = ThreadContext.withSnapshot(snapshot)) {
38+
return delegate.apply(t, u);
39+
}
40+
}
41+
42+
/** {@inheritDoc} */
43+
@NotNull @Override public <V> BiFunction<T, U, V> andThen(@NotNull Function<? super R, ? extends V> after) {
44+
return BiFunction.super.andThen(ThreadContextAwareFunction.wrap(after));
45+
}
46+
47+
/** */
48+
public static <T, U, R> BiFunction<T, U, R> wrap(BiFunction<T, U, R> delegate) {
49+
return wrap(delegate, ThreadContextAwareBiFunction::new);
50+
}
51+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.thread.context.function;
19+
20+
import org.apache.ignite.internal.thread.context.Scope;
21+
import org.apache.ignite.internal.thread.context.ThreadContext;
22+
import org.apache.ignite.internal.thread.context.ThreadContextAwareWrapper;
23+
import org.apache.ignite.internal.thread.context.ThreadContextSnapshot;
24+
import org.apache.ignite.lang.IgniteClosure;
25+
26+
/** */
27+
public class ThreadContextAwareClosure<E, R> extends ThreadContextAwareWrapper<IgniteClosure<E, R>> implements IgniteClosure<E, R> {
28+
/** */
29+
private static final long serialVersionUID = 0L;
30+
31+
/** */
32+
public ThreadContextAwareClosure(IgniteClosure<E, R> delegate, ThreadContextSnapshot snapshot) {
33+
super(delegate, snapshot);
34+
}
35+
36+
/** {@inheritDoc} */
37+
@Override public R apply(E e) {
38+
try (Scope ignored = ThreadContext.withSnapshot(snapshot)) {
39+
return delegate.apply(e);
40+
}
41+
}
42+
43+
/** */
44+
public static <E, R> IgniteClosure<E, R> wrap(IgniteClosure<E, R> delegate) {
45+
return wrap(delegate, ThreadContextAwareClosure::new);
46+
}
47+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.thread.context.function;
19+
20+
import java.util.function.Consumer;
21+
import org.apache.ignite.internal.thread.context.Scope;
22+
import org.apache.ignite.internal.thread.context.ThreadContext;
23+
import org.apache.ignite.internal.thread.context.ThreadContextAwareWrapper;
24+
import org.apache.ignite.internal.thread.context.ThreadContextSnapshot;
25+
import org.jetbrains.annotations.NotNull;
26+
27+
/** */
28+
public class ThreadContextAwareConsumer<T> extends ThreadContextAwareWrapper<Consumer<T>> implements Consumer<T> {
29+
/** */
30+
public ThreadContextAwareConsumer(Consumer<T> delegate, ThreadContextSnapshot snapshot) {
31+
super(delegate, snapshot);
32+
}
33+
34+
/** {@inheritDoc} */
35+
@NotNull @Override public Consumer<T> andThen(@NotNull Consumer<? super T> after) {
36+
return Consumer.super.andThen(wrap(after));
37+
}
38+
39+
/** {@inheritDoc} */
40+
@Override public void accept(T t) {
41+
try (Scope ignored = ThreadContext.withSnapshot(snapshot)) {
42+
delegate.accept(t);
43+
}
44+
}
45+
46+
/** */
47+
public static <T> Consumer<T> wrap(Consumer<T> delegate) {
48+
return wrap(delegate, ThreadContextAwareConsumer::new);
49+
}
50+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.thread.context.function;
19+
20+
import java.util.function.Function;
21+
import org.apache.ignite.internal.thread.context.Scope;
22+
import org.apache.ignite.internal.thread.context.ThreadContext;
23+
import org.apache.ignite.internal.thread.context.ThreadContextAwareWrapper;
24+
import org.apache.ignite.internal.thread.context.ThreadContextSnapshot;
25+
import org.jetbrains.annotations.NotNull;
26+
27+
/** */
28+
public class ThreadContextAwareFunction<T, R> extends ThreadContextAwareWrapper<Function<T, R>> implements Function<T, R> {
29+
/** */
30+
private ThreadContextAwareFunction(Function<T, R> delegate, ThreadContextSnapshot snapshot) {
31+
super(delegate, snapshot);
32+
}
33+
34+
/** {@inheritDoc} */
35+
@Override public R apply(T t) {
36+
try (Scope ignored = ThreadContext.withSnapshot(snapshot)) {
37+
return delegate.apply(t);
38+
}
39+
}
40+
41+
/** {@inheritDoc} */
42+
@NotNull @Override public <V> Function<V, R> compose(@NotNull Function<? super V, ? extends T> before) {
43+
return Function.super.compose(wrap(before));
44+
}
45+
46+
/** {@inheritDoc} */
47+
@NotNull @Override public <V> Function<T, V> andThen(@NotNull Function<? super R, ? extends V> after) {
48+
return Function.super.andThen(wrap(after));
49+
}
50+
51+
/** */
52+
public static <T, R> Function<T, R> wrap(Function<T, R> delegate) {
53+
return wrap(delegate, ThreadContextAwareFunction::new);
54+
}
55+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.thread.context.function;
19+
20+
import org.apache.ignite.internal.thread.context.Scope;
21+
import org.apache.ignite.internal.thread.context.ThreadContext;
22+
import org.apache.ignite.internal.thread.context.ThreadContextAwareWrapper;
23+
import org.apache.ignite.internal.thread.context.ThreadContextSnapshot;
24+
import org.apache.ignite.lang.IgniteInClosure;
25+
26+
/** */
27+
public class ThreadContextAwareInClosure<E> extends ThreadContextAwareWrapper<IgniteInClosure<E>> implements IgniteInClosure<E> {
28+
/** */
29+
private static final long serialVersionUID = 0L;
30+
31+
/** */
32+
public ThreadContextAwareInClosure(IgniteInClosure<E> delegate, ThreadContextSnapshot snapshot) {
33+
super(delegate, snapshot);
34+
}
35+
36+
/** {@inheritDoc} */
37+
@Override public void apply(E e) {
38+
try (Scope ignored = ThreadContext.withSnapshot(snapshot)) {
39+
delegate.apply(e);
40+
}
41+
}
42+
43+
/** */
44+
public static <E> IgniteInClosure<E> wrap(IgniteInClosure<E> delefate) {
45+
return wrap(delefate, ThreadContextAwareInClosure::new);
46+
}
47+
48+
}

modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.Random;
5454
import java.util.Set;
5555
import java.util.UUID;
56-
import java.util.concurrent.CompletableFuture;
5756
import java.util.concurrent.ConcurrentHashMap;
5857
import java.util.concurrent.ConcurrentSkipListMap;
5958
import java.util.concurrent.ExecutionException;
@@ -120,6 +119,7 @@
120119
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionClientContext;
121120
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
122121
import org.apache.ignite.internal.thread.IgniteThreadFactory;
122+
import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
123123
import org.apache.ignite.internal.util.HostAndPortRange;
124124
import org.apache.ignite.internal.util.future.GridFutureAdapter;
125125
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2587,7 +2587,7 @@ public boolean handleResult(JdbcBinaryTypeGetResult res) {
25872587
*/
25882588
private abstract class BlockingJdbcChannel {
25892589
/** Request ID -> Jdbc result map. */
2590-
private Map<Long, CompletableFuture<JdbcResult>> results = new ConcurrentHashMap<>();
2590+
private Map<Long, IgniteCompletableFuture<JdbcResult>> results = new ConcurrentHashMap<>();
25912591

25922592
/**
25932593
* Do request in blocking style. It just call
@@ -2602,9 +2602,9 @@ <R extends JdbcResult> R doRequest(JdbcRequest req) throws SQLException, Interru
26022602
R res;
26032603

26042604
if (isStream()) {
2605-
CompletableFuture<JdbcResult> resFut = new CompletableFuture<>();
2605+
IgniteCompletableFuture<JdbcResult> resFut = new IgniteCompletableFuture<>();
26062606

2607-
CompletableFuture<JdbcResult> oldFut = results.put(req.requestId(), resFut);
2607+
IgniteCompletableFuture<JdbcResult> oldFut = results.put(req.requestId(), resFut);
26082608

26092609
assert oldFut == null : "Another request with the same id is waiting for result.";
26102610

@@ -2627,7 +2627,7 @@ <R extends JdbcResult> R doRequest(JdbcRequest req) throws SQLException, Interru
26272627
boolean handleResult(long reqId, JdbcResult res) {
26282628
boolean handled = false;
26292629

2630-
CompletableFuture<JdbcResult> fut = results.remove(reqId);
2630+
IgniteCompletableFuture<JdbcResult> fut = results.remove(reqId);
26312631

26322632
if (fut != null) {
26332633
fut.complete(res);

0 commit comments

Comments
 (0)