Skip to content

Commit 3c3cc99

Browse files
Have MappedBatchPublisher take in a Set<K> keys
This is more symmetric with `MappedbatchLoader` and preserves efficiency; we do not need to emit a `Map.Entry` for duplicate keys (given the strong intention that this will be used to create a `Map`).
1 parent 4b9356e commit 3c3cc99

File tree

5 files changed

+23
-8
lines changed

5 files changed

+23
-8
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
536536
private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
537537
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
538538
Subscriber<Map.Entry<K, V>> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration());
539-
539+
Set<K> setOfKeys = new LinkedHashSet<>(keys);
540540
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
541541
if (batchLoadFunction instanceof MappedBatchPublisherWithContext) {
542542
//noinspection unchecked
@@ -551,10 +551,10 @@ private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List
551551
//noinspection unchecked
552552
MappedBatchPublisher<K, V> loadFunction = (MappedBatchPublisher<K, V>) batchLoadFunction;
553553
if (batchLoaderScheduler != null) {
554-
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber);
554+
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(setOfKeys, subscriber);
555555
batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null);
556556
} else {
557-
loadFunction.load(keys, subscriber);
557+
loadFunction.load(setOfKeys, subscriber);
558558
}
559559
}
560560
return loadResult;

src/main/java/org/dataloader/MappedBatchPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import org.reactivestreams.Subscriber;
44

5-
import java.util.List;
65
import java.util.Map;
6+
import java.util.Set;
77

88
/**
99
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
@@ -26,5 +26,5 @@ public interface MappedBatchPublisher<K, V> {
2626
* @param keys the collection of keys to load
2727
* @param subscriber as values arrive you must call the subscriber for each value
2828
*/
29-
void load(List<K> keys, Subscriber<Map.Entry<K, V>> subscriber);
29+
void load(Set<K> keys, Subscriber<Map.Entry<K, V>> subscriber);
3030
}

src/test/java/org/dataloader/DataLoaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader
740740
assertThat(future1.get(), equalTo("A"));
741741
assertThat(future2.get(), equalTo("B"));
742742
assertThat(future3.get(), equalTo("A"));
743-
if (factory instanceof MappedDataLoaderFactory) {
743+
if (factory instanceof MappedDataLoaderFactory || factory instanceof MappedPublisherDataLoaderFactory) {
744744
assertThat(loadCalls, equalTo(singletonList(asList("A", "B"))));
745745
} else {
746746
assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A"))));

src/test/java/org/dataloader/fixtures/UserManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.dataloader.fixtures;
22

3+
import org.reactivestreams.Subscriber;
4+
import reactor.core.publisher.Flux;
5+
36
import java.util.HashMap;
47
import java.util.LinkedHashMap;
58
import java.util.List;
@@ -52,6 +55,14 @@ public List<User> loadUsersById(List<Long> userIds) {
5255
return userIds.stream().map(this::loadUserById).collect(Collectors.toList());
5356
}
5457

58+
public void publishUsersById(List<Long> userIds, Subscriber<? super User> userSubscriber) {
59+
Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber);
60+
}
61+
62+
public void publishUsersById(Set<Long> userIds, Subscriber<? super Map.Entry<Long, User>> userEntrySubscriber) {
63+
Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber);
64+
}
65+
5566
public Map<Long, User> loadMapOfUsersByIds(SecurityCtx callCtx, Set<Long> userIds) {
5667
Map<Long, User> map = new HashMap<>();
5768
userIds.forEach(userId -> {

src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
import java.util.HashMap;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.Set;
14+
import java.util.stream.Collectors;
1315
import java.util.stream.Stream;
1416

17+
import static java.util.stream.Collectors.toList;
18+
import static java.util.stream.Collectors.toSet;
1519
import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader;
1620
import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry;
1721

@@ -69,7 +73,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions opti
6973
return newMappedPublisherDataLoader((keys, subscriber) -> {
7074
loadCalls.add(new ArrayList<>(keys));
7175

72-
List<K> nKeys = keys.subList(0, N);
76+
List<K> nKeys = keys.stream().limit(N).collect(toList());
7377
Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
7478
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
7579
.subscribe(subscriber);
@@ -81,7 +85,7 @@ public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions op
8185
return newMappedPublisherDataLoader((keys, subscriber) -> {
8286
loadCalls.add(new ArrayList<>(keys));
8387

84-
List<String> nKeys = keys.subList(0, N);
88+
List<String> nKeys = keys.stream().limit(N).collect(toList());
8589
Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
8690
.subscribe(subscriber);
8791
}, options);

0 commit comments

Comments
 (0)