Skip to content

Commit a5b2bba

Browse files
committed
POC- Orchestration of DLs
1 parent 4ec5fe8 commit a5b2bba

File tree

6 files changed

+363
-2
lines changed

6 files changed

+363
-2
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package org.dataloader.orchestration;
2+
3+
import org.dataloader.DataLoader;
4+
import org.dataloader.impl.Assertions;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.function.Function;
10+
11+
public class Orchestrator<K, V> {
12+
13+
private final Tracker tracker;
14+
private final DataLoader<K, V> startingDL;
15+
private final List<Step<?, ?>> steps = new ArrayList<>();
16+
17+
/**
18+
* This will create a new {@link Orchestrator} that can allow multiple calls to multiple data-loader's
19+
* to be orchestrated so they all run optimally.
20+
*
21+
* @param dataLoader the data loader to start with
22+
* @param <K> the key type
23+
* @param <V> the value type
24+
* @return a new {@link Orchestrator}
25+
*/
26+
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader) {
27+
return new Orchestrator<>(new Tracker(), dataLoader);
28+
}
29+
30+
public Tracker getTracker() {
31+
return tracker;
32+
}
33+
34+
private Orchestrator(Tracker tracker, DataLoader<K, V> dataLoader) {
35+
this.tracker = tracker;
36+
this.startingDL = dataLoader;
37+
}
38+
39+
40+
public Step<K, V> load(K key) {
41+
return load(key, null);
42+
}
43+
44+
public Step<K, V> load(K key, Object keyContext) {
45+
return Step.loadImpl(this, castAs(startingDL), key, keyContext);
46+
}
47+
48+
static <T> T castAs(Object o) {
49+
//noinspection unchecked
50+
return (T) o;
51+
}
52+
53+
54+
<KT, VT> void record(Step<KT, VT> step) {
55+
steps.add(step);
56+
tracker.incrementStepCount();
57+
}
58+
59+
/**
60+
* This is the callback point saying to start the DataLoader loading process.
61+
* <p>
62+
* The type of object returned here depends on the value type of the last Step. We cant be truly generic
63+
* here and must be case.
64+
*
65+
* @param <VT> the value type
66+
* @return the final composed value
67+
*/
68+
<VT> CompletableFuture<VT> execute() {
69+
Assertions.assertState(!steps.isEmpty(), () -> "How can the steps to run be empty??");
70+
int index = 0;
71+
Step<?, ?> firstStep = steps.get(index);
72+
73+
CompletableFuture<Object> currentCF = castAs(firstStep.codeToRun().apply(null)); // first load uses variable capture
74+
whenComplete(index, firstStep, currentCF);
75+
76+
for (index++; index < steps.size(); index++) {
77+
Step<?, ?> nextStep = steps.get(index);
78+
Function<Object, CompletableFuture<?>> codeToRun = castAs(nextStep.codeToRun());
79+
CompletableFuture<Object> nextCF = currentCF.thenCompose(value -> castAs(codeToRun.apply(value)));
80+
currentCF = nextCF;
81+
82+
// side effect when this step is complete
83+
whenComplete(index, nextStep, nextCF);
84+
}
85+
return castAs(currentCF);
86+
87+
}
88+
89+
private void whenComplete(int index, Step<?, ?> step, CompletableFuture<Object> cf) {
90+
cf.whenComplete((v, throwable) -> {
91+
getTracker().loadCallComplete(step.dataLoader());
92+
// replace with instrumentation code
93+
if (throwable != null) {
94+
// TODO - should we be cancelling future steps here - no need for dispatch tracking if they will never run
95+
System.out.println("A throwable has been thrown on step " + index + ": " + throwable.getMessage());
96+
throwable.printStackTrace(System.out);
97+
} else {
98+
System.out.println("step " + index + " returned : " + v);
99+
}
100+
});
101+
}
102+
103+
104+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.dataloader.orchestration;
2+
3+
import org.dataloader.DataLoader;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.function.Function;
7+
8+
import static org.dataloader.orchestration.Orchestrator.castAs;
9+
10+
public class Step<K, V> {
11+
private final Orchestrator<?, ?> orchestrator;
12+
private final DataLoader<Object, Object> dl;
13+
private final Function<K, CompletableFuture<V>> codeToRun;
14+
15+
Step(Orchestrator<?, ?> orchestrator, DataLoader<?, ?> dataLoader, Function<K, CompletableFuture<V>> codeToRun) {
16+
this.orchestrator = orchestrator;
17+
this.dl = castAs(dataLoader);
18+
this.codeToRun = codeToRun;
19+
}
20+
21+
DataLoader<Object, Object> dataLoader() {
22+
return dl;
23+
}
24+
25+
public Function<K, CompletableFuture<V>> codeToRun() {
26+
return codeToRun;
27+
}
28+
29+
public <KT, VT> With<KT, VT> with(DataLoader<KT, VT> dataLoader) {
30+
return new With<>(orchestrator, dataLoader);
31+
}
32+
33+
public Step<K, V> load(K key, Object keyContext) {
34+
return loadImpl(orchestrator, dl, key, keyContext);
35+
}
36+
37+
public Step<V, V> thenLoad(Function<V, K> codeToRun) {
38+
return thenLoadImpl(orchestrator, dl, codeToRun);
39+
}
40+
41+
static <K, V> Step<K, V> loadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, K key, Object keyContext) {
42+
Function<K, CompletableFuture<V>> codeToRun = k -> {
43+
CompletableFuture<V> cf = castAs(dl.load(key, keyContext));
44+
orchestrator.getTracker().loadCall(dl);
45+
return cf;
46+
};
47+
Step<K, V> step = new Step<>(orchestrator, dl, codeToRun);
48+
orchestrator.record(step);
49+
return step;
50+
}
51+
52+
static <K, V> Step<V, V> thenLoadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, Function<V, K> codeToRun) {
53+
Function<V, CompletableFuture<V>> actualCodeToRun = v -> {
54+
K key = codeToRun.apply(v);
55+
CompletableFuture<V> cf = castAs(dl.load(key));
56+
orchestrator.getTracker().loadCall(dl);
57+
return cf;
58+
};
59+
Step<V, V> step = new Step<>(orchestrator, dl, actualCodeToRun);
60+
orchestrator.record(step);
61+
return step;
62+
}
63+
64+
public CompletableFuture<V> toCompletableFuture() {
65+
return orchestrator.execute();
66+
}
67+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.dataloader.orchestration;
2+
3+
import org.dataloader.DataLoader;
4+
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
/**
10+
* This needs HEAPS more work - heaps more - I am not sure if its just counts of call backs or what.
11+
* <p>
12+
* This is just POC stuff for now
13+
*/
14+
public class Tracker {
15+
private final AtomicInteger stepCount = new AtomicInteger();
16+
private final Map<DataLoader<?,?>, AtomicInteger> counters = new HashMap<>();
17+
18+
public int getOutstandingLoadCount(DataLoader<?,?> dl) {
19+
synchronized (this) {
20+
return getDLCounter(dl).intValue();
21+
}
22+
}
23+
24+
public int getOutstandingLoadCount() {
25+
int count = 0;
26+
synchronized (this) {
27+
for (AtomicInteger atomicInteger : counters.values()) {
28+
count += atomicInteger.get();
29+
}
30+
}
31+
return count;
32+
}
33+
34+
public int getStepCount() {
35+
return stepCount.get();
36+
}
37+
38+
void incrementStepCount() {
39+
this.stepCount.incrementAndGet();
40+
}
41+
42+
void loadCall(DataLoader<?,?> dl) {
43+
synchronized (this) {
44+
getDLCounter(dl).incrementAndGet();
45+
}
46+
}
47+
48+
void loadCallComplete(DataLoader<?,?> dl) {
49+
synchronized (this) {
50+
getDLCounter(dl).decrementAndGet();
51+
}
52+
}
53+
54+
private AtomicInteger getDLCounter(DataLoader<?, ?> dl) {
55+
return counters.computeIfAbsent(dl, key -> new AtomicInteger());
56+
}
57+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.dataloader.orchestration;
2+
3+
import org.dataloader.DataLoader;
4+
5+
import java.util.function.Function;
6+
7+
import static org.dataloader.orchestration.Orchestrator.castAs;
8+
import static org.dataloader.orchestration.Step.loadImpl;
9+
10+
/**
11+
* A transitional step that allows a new step to be started with a new data loader in play
12+
*
13+
* @param <K> the key type
14+
* @param <V> the value type
15+
*/
16+
public class With<K, V> {
17+
private final Orchestrator<?, ?> orchestrator;
18+
private final DataLoader<K, V> dl;
19+
20+
public With(Orchestrator<?, ?> orchestrator, DataLoader<K, V> dl) {
21+
this.orchestrator = orchestrator;
22+
this.dl = dl;
23+
}
24+
25+
public Step<K, V> load(K key) {
26+
return load(key, null);
27+
}
28+
29+
public Step<K, V> load(K key, Object keyContext) {
30+
return loadImpl(orchestrator, castAs(dl), key,keyContext);
31+
}
32+
33+
public Step<V, V> thenLoad(Function<V, K> codeToRun) {
34+
return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun);
35+
}
36+
}

src/test/java/org/dataloader/fixtures/TestKit.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import java.util.ArrayList;
1212
import java.util.Arrays;
1313
import java.util.Collection;
14-
import java.util.LinkedHashSet;
1514
import java.util.HashMap;
15+
import java.util.LinkedHashSet;
1616
import java.util.List;
1717
import java.util.Map;
1818
import java.util.Set;
@@ -60,6 +60,26 @@ public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
6060
};
6161
}
6262

63+
public static BatchLoader<String, String> upperCaseBatchLoader() {
64+
return keys -> CompletableFuture.completedFuture(keys.stream().map(String::toUpperCase).collect(toList()));
65+
}
66+
67+
public static BatchLoader<String, String> lowerCaseBatchLoader() {
68+
return keys -> CompletableFuture.completedFuture(keys.stream().map(String::toLowerCase).collect(toList()));
69+
}
70+
71+
public static BatchLoader<String, String> reverseBatchLoader() {
72+
return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::reverse).collect(toList()));
73+
}
74+
75+
public static String reverse(String s) {
76+
StringBuilder sb = new StringBuilder();
77+
for (int i = s.length() - 1; i >= 0; i--) {
78+
sb.append(s.charAt(i));
79+
}
80+
return sb.toString();
81+
}
82+
6383
public static <K, V> DataLoader<K, V> idLoader() {
6484
return idLoader(null, new ArrayList<>());
6585
}
@@ -104,7 +124,7 @@ public static <T> Set<T> asSet(Collection<T> elements) {
104124

105125
public static boolean areAllDone(CompletableFuture<?>... cfs) {
106126
for (CompletableFuture<?> cf : cfs) {
107-
if (! cf.isDone()) {
127+
if (!cf.isDone()) {
108128
return false;
109129
}
110130
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.dataloader.orchestration;
2+
3+
import org.dataloader.DataLoader;
4+
import org.dataloader.DataLoaderOptions;
5+
import org.dataloader.DataLoaderRegistry;
6+
import org.junit.jupiter.api.Test;
7+
8+
import java.util.concurrent.CompletableFuture;
9+
10+
import static org.awaitility.Awaitility.await;
11+
import static org.dataloader.DataLoaderFactory.newDataLoader;
12+
import static org.dataloader.fixtures.TestKit.lowerCaseBatchLoader;
13+
import static org.dataloader.fixtures.TestKit.reverseBatchLoader;
14+
import static org.dataloader.fixtures.TestKit.upperCaseBatchLoader;
15+
import static org.hamcrest.MatcherAssert.assertThat;
16+
import static org.hamcrest.Matchers.equalTo;
17+
18+
class OrchestratorTest {
19+
20+
DataLoaderOptions cachingAndBatchingOptions = DataLoaderOptions.newOptions().setBatchingEnabled(true).setCachingEnabled(true);
21+
22+
DataLoader<String, String> dlUpper = newDataLoader(upperCaseBatchLoader(), cachingAndBatchingOptions);
23+
DataLoader<String, String> dlLower = newDataLoader(lowerCaseBatchLoader(), cachingAndBatchingOptions);
24+
DataLoader<String, String> dlReverse = newDataLoader(reverseBatchLoader(), cachingAndBatchingOptions);
25+
26+
@Test
27+
void canOrchestrate() {
28+
29+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
30+
.register("upper", dlUpper)
31+
.register("lower", dlLower)
32+
.register("reverse", dlReverse)
33+
.build();
34+
35+
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper);
36+
Step<String, String> step1 = orchestrator.load("aBc", null);
37+
With<String, String> with1 = step1.with(dlLower);
38+
Step<String, String> step2 = with1.thenLoad(key -> key);
39+
With<String, String> with2 = step2.with(dlReverse);
40+
Step<String, String> step3 = with2.thenLoad(key -> key);
41+
CompletableFuture<String> cf = step3.toCompletableFuture();
42+
43+
// because all the dls are dispatched in "perfect order" here they all end up dispatching
44+
// at JUST the right time. A change in order would be different
45+
registry.dispatchAll();
46+
47+
await().until(cf::isDone);
48+
49+
assertThat(cf.join(), equalTo("cba"));
50+
}
51+
52+
@Test
53+
void canOrchestrateWhenNotInPerfectOrder() {
54+
55+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
56+
.register("reverse", dlReverse)
57+
.register("lower", dlLower)
58+
.register("upper", dlUpper)
59+
.build();
60+
61+
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper);
62+
CompletableFuture<String> cf = orchestrator.load("aBc", null)
63+
.with(dlLower).thenLoad(key1 -> key1)
64+
.with(dlReverse).thenLoad(key -> key)
65+
.toCompletableFuture();
66+
67+
registry.dispatchAll();
68+
69+
assertThat(cf.isDone(), equalTo(false));
70+
71+
assertThat(orchestrator.getTracker().getOutstandingLoadCount(),equalTo(2));
72+
73+
await().until(cf::isDone);
74+
75+
assertThat(cf.join(), equalTo("cba"));
76+
}
77+
}

0 commit comments

Comments
 (0)