Skip to content

Add a proof-of-concept for "Observer-like" batch loading #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/main/java/org/dataloader/BatchObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.dataloader;

/**
* A interface intended as a delegate for other Observer-like classes used in other libraries, to be invoked by the calling
* {@link ObserverBatchLoader}.
* <p>
* Some examples include:
* <ul>
* <li>Project Reactor's <a href="https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html">{@code Subscriber}</a>
* <li>gRPC's <a href="https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html">{@code StreamObserver}</a>
* <li>RX Java's <a href="https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html">{@code Flowable}</a>
* </ul>
* @param <V> the value type of the {@link ObserverBatchLoader}
*/
public interface BatchObserver<V> {

/**
* To be called by the {@link ObserverBatchLoader} to load a new value.
*/
void onNext(V value);

/**
* To be called by the {@link ObserverBatchLoader} to indicate all values have been successfully processed.
* This {@link BatchObserver} should not have any method invoked after this is called.
*/
void onCompleted();

/**
* To be called by the {@link ObserverBatchLoader} to indicate an unrecoverable error has been encountered.
* This {@link BatchObserver} should not have any method invoked after this is called.
*/
void onError(Throwable e);
}
Copy link
Member

@bbakerman bbakerman May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost exactly java.util.concurrent.Flow.Subscriber but without the subscription part. I guess one can say that the data loader code IS the subscriber

But I think if this was backed under the covers but reactive code publishing "new items" then something has to subscribe to them - should we expose that and use a Java native class?

Maybe not - because we have other variations of this "observer" such as your MappedBatchedObserver which is the same but its onNext is K+V

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ps if we did use Flow we would have to move to Java 11 - right now data loader is still java 8 - not a big deal but still

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other graphql-java libs are on Java 11, is there anything blocking dataloader getting upgraded to Java 11 too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocking reason to go to 11 - just that we havent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - maybe all these new classes should be in their own package - we have 6 new classes that are all variations of what is now called Observer should I think that should be in their observer package

Also.... I hate the name Observer as stated elsewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easier to do a Java 11 upgrade ahead of this PR I can help out

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #150

267 changes: 260 additions & 7 deletions src/main/java/org/dataloader/DataLoaderHelper.java

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions src/main/java/org/dataloader/MappedBatchObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.dataloader;

/**
* A interface intended as a delegate for other Observer-like classes used in other libraries, to be invoked by the calling
* {@link MappedObserverBatchLoader}.
* <p>
* Some examples include:
* <ul>
* <li>Project Reactor's <a href="https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html">{@code Subscriber}</a>
* <li>gRPC's <a href="https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html">{@code StreamObserver}</a>
* <li>RX Java's <a href="https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html">{@code Flowable}</a>
* </ul>
* @param <K> the key type of the calling {@link MappedObserverBatchLoader}.
* @param <V> the value type of the calling {@link MappedObserverBatchLoader}.
*/
public interface MappedBatchObserver<K, V> {

/**
* To be called by the {@link MappedObserverBatchLoader} to process a new key/value pair.
*/
void onNext(K key, V value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same class as BatchObserver<K, V> except onNext has a key and value

How about making it

public interface MappedBatchObserver<K, V> extends BatchObserver<K, V>  {

    void onNext(java.util.Map.Entry<K,V> nextItem);

?? (if that compiles)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe reduce the amount of code you need to handle the two AND also maps on reactive streams Publisher better which is only ever 1 value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I've elected to deprecate both in favour of:

  • Publisher<V>
  • Publisher<Map.Entry<K, V>>


/**
* To be called by the {@link MappedObserverBatchLoader} to indicate all values have been successfully processed.
* This {@link MappedBatchObserver} should not have any method invoked after this method is called.
*/
void onCompleted();

/**
* To be called by the {@link MappedObserverBatchLoader} to indicate an unrecoverable error has been encountered.
* This {@link MappedBatchObserver} should not have any method invoked after this method is called.
*/
void onError(Throwable e);
}
17 changes: 17 additions & 0 deletions src/main/java/org/dataloader/MappedObserverBatchLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.dataloader;

import java.util.List;

/**
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
* <p>
* The function will call the provided {@link MappedBatchObserver} to process the key/value pairs it has retrieved to allow
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
* (rather than when all values have been retrieved).
*
* @param <K> type parameter indicating the type of keys to use for data load requests.
* @param <V> type parameter indicating the type of values returned
*/
public interface MappedObserverBatchLoader<K, V> {
void load(List<K> keys, MappedBatchObserver<K, V> observer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.dataloader;

import java.util.List;

/**
* A {@link MappedObserverBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}.
*/
public interface MappedObserverBatchLoaderWithContext<K, V> {
void load(List<K> keys, MappedBatchObserver<K, V> observer, BatchLoaderEnvironment environment);
}
19 changes: 19 additions & 0 deletions src/main/java/org/dataloader/ObserverBatchLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.dataloader;

import java.util.List;

/**
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
* <p>
* The function will call the provided {@link BatchObserver} to process the values it has retrieved to allow
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
* (rather than when all values have been retrieved).
* <p>
* It is required that values be returned in the same order as the keys provided.
*
* @param <K> type parameter indicating the type of keys to use for data load requests.
* @param <V> type parameter indicating the type of values returned
*/
public interface ObserverBatchLoader<K, V> {
void load(List<K> keys, BatchObserver<V> observer);
}
10 changes: 10 additions & 0 deletions src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.dataloader;

import java.util.List;

/**
* An {@link ObserverBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}.
*/
public interface ObserverBatchLoaderWithContext<K, V> {
void load(List<K> keys, BatchObserver<V> observer, BatchLoaderEnvironment environment);
}
21 changes: 21 additions & 0 deletions src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;
import org.dataloader.MappedBatchLoader;
import org.dataloader.MappedObserverBatchLoader;
import org.dataloader.ObserverBatchLoader;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -42,6 +44,13 @@ interface ScheduledMappedBatchLoaderCall<K, V> {
CompletionStage<Map<K, V>> invoke();
}

/**
* This represents a callback that will invoke a {@link ObserverBatchLoader} or {@link MappedObserverBatchLoader} function under the covers
*/
interface ScheduledObserverBatchLoaderCall {
void invoke();
}

/**
* This is called to schedule a {@link BatchLoader} call.
*
Expand Down Expand Up @@ -71,4 +80,16 @@ interface ScheduledMappedBatchLoaderCall<K, V> {
* @return a promise to the values that come from the {@link BatchLoader}
*/
<K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment);

/**
* This is called to schedule a {@link ObserverBatchLoader} call.
*
* @param scheduledCall the callback that needs to be invoked to allow the {@link ObserverBatchLoader} to proceed.
* @param keys this is the list of keys that will be passed to the {@link ObserverBatchLoader}.
* This is provided only for informative reasons and, you can't change the keys that are used
* @param environment this is the {@link BatchLoaderEnvironment} in place,
* which can be null if it's a simple {@link ObserverBatchLoader} call
* @param <K> the key type
*/
<K> void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment);
}
6 changes: 6 additions & 0 deletions src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMapp
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}

@Override
public <K> void scheduleObserverBatchLoader(ScheduledObserverBatchLoaderCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
snooze(10);
scheduledCall.invoke();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.dataloader;

import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Arrays.asList;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.awaitility.Awaitility.await;
import static org.dataloader.DataLoaderFactory.mkDataLoader;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public class DataLoaderMappedObserverBatchLoaderTest {

@Test
public void should_Build_a_really_really_simple_data_loader() {
AtomicBoolean success = new AtomicBoolean();
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions());

CompletionStage<Integer> future1 = identityLoader.load(1);

future1.thenAccept(value -> {
assertThat(value, equalTo(1));
success.set(true);
});
identityLoader.dispatch();
await().untilAtomic(success, is(true));
}

@Test
public void should_Support_loading_multiple_keys_in_one_call() {
AtomicBoolean success = new AtomicBoolean();
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions());

CompletionStage<List<Integer>> futureAll = identityLoader.loadMany(asList(1, 2));
futureAll.thenAccept(promisedValues -> {
assertThat(promisedValues.size(), is(2));
success.set(true);
});
identityLoader.dispatch();
await().untilAtomic(success, is(true));
assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2)));
}

@Test
public void simple_dataloader() {
DataLoader<String, String> loader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions());

loader.load("A");
loader.load("B");
loader.loadMany(asList("C", "D"));

List<String> results = loader.dispatchAndJoin();

assertThat(results.size(), equalTo(4));
assertThat(results, equalTo(asList("A", "B", "C", "D")));
}

@Test
public void should_observer_batch_multiple_requests() throws ExecutionException, InterruptedException {
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), new DataLoaderOptions());

CompletableFuture<Integer> future1 = identityLoader.load(1);
CompletableFuture<Integer> future2 = identityLoader.load(2);
identityLoader.dispatch();

await().until(() -> future1.isDone() && future2.isDone());
assertThat(future1.get(), equalTo(1));
assertThat(future2.get(), equalTo(2));
}

// A simple wrapper class intended as a proof external libraries can leverage this.
private static class Publisher<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can put say project reactor as a test dependency into this code base.

That way you can test with real Publisher / Subscriber s

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - though I spent more time than I'd like to admit trying to figure out why my Subscriber wouldn't run (until I triggered subscriber.request(...)).


private final MappedBatchObserver<K, V> delegate;
private Publisher(MappedBatchObserver<K, V> delegate) { this.delegate = delegate; }
void onNext(Map.Entry<K, V> entry) { delegate.onNext(entry.getKey(), entry.getValue()); }
void onCompleted() { delegate.onCompleted(); }
void onError(Throwable e) { delegate.onError(e); }
// Mock 'subscribe' methods to simulate what would happen in the real thing.
void subscribe(Map<K, V> valueByKey) {
valueByKey.entrySet().forEach(this::onNext);
this.onCompleted();
}
void subscribe(Map<K, V> valueByKey, Throwable e) {
valueByKey.entrySet().forEach(this::onNext);
this.onError(e);
}
}

private static <K> MappedObserverBatchLoader<K, K> keysAsValues() {
return (keys, observer) -> {
Publisher<K, K> publisher = new Publisher<>(observer);
Map<K, K> valueByKey = keys.stream().collect(toMap(identity(), identity()));
publisher.subscribe(valueByKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this idea of subscription as a key aspect to be addressed.

Imagine some one had a reactive real Publisher of events. At what point is a subscription made?

Do people have to write this own wrappers so that they can move between a Reactor stream of items and this ObserverBatchLoader / MappedObserverBatchLoader say?

This could be tedious. In theory this library COULD become dependent on reactive streams like graphql-java is

    def reactiveStreamsVersion = '1.0.3'
    api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion

This would allow you to have interfaces that can take a official Publisher of events and you could write the Subscription interfacing code that made your actual ObserverBatchLoader / MappedObserverBatchLoader

I think if we allowed this PR then I would have this extra dependency - not to a implementation like Reactor - but to the raw streams api

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine adator code like

return (keys, observer) -> {
     Publisher reactiveImplPublisher = createReactivePublisherFromKeys(keys);
     ReactiveAdapater.subscribeTo(reactiveImplPublisher, observer);
}

This would allow you to call put to a real reactive system and then adapt the publisher back to this observer via this helper code we supply. Rather than writing a subscription youself that ends up fowarding everything to the observer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, these now are based on reactive-stream's Subscribers.

};
}
}
108 changes: 108 additions & 0 deletions src/test/java/org/dataloader/DataLoaderObserverBatchLoaderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.dataloader;

import org.junit.Test;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Arrays.asList;
import static org.awaitility.Awaitility.await;
import static org.dataloader.DataLoaderFactory.mkDataLoader;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public class DataLoaderObserverBatchLoaderTest {

@Test
public void should_Build_a_really_really_simple_data_loader() {
AtomicBoolean success = new AtomicBoolean();
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions());

CompletionStage<Integer> future1 = identityLoader.load(1);

future1.thenAccept(value -> {
assertThat(value, equalTo(1));
success.set(true);
});
identityLoader.dispatch();
await().untilAtomic(success, is(true));
}

@Test
public void should_Support_loading_multiple_keys_in_one_call() {
AtomicBoolean success = new AtomicBoolean();
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions());

CompletionStage<List<Integer>> futureAll = identityLoader.loadMany(asList(1, 2));
futureAll.thenAccept(promisedValues -> {
assertThat(promisedValues.size(), is(2));
success.set(true);
});
identityLoader.dispatch();
await().untilAtomic(success, is(true));
assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2)));
}

@Test
public void simple_dataloader() {
DataLoader<String, String> loader = mkDataLoader(keysAsValues(), DataLoaderOptions.newOptions());

loader.load("A");
loader.load("B");
loader.loadMany(asList("C", "D"));

List<String> results = loader.dispatchAndJoin();

assertThat(results.size(), equalTo(4));
assertThat(results, equalTo(asList("A", "B", "C", "D")));
}

@Test
public void should_observer_batch_multiple_requests() throws ExecutionException, InterruptedException {
DataLoader<Integer, Integer> identityLoader = mkDataLoader(keysAsValues(), new DataLoaderOptions());

CompletableFuture<Integer> future1 = identityLoader.load(1);
CompletableFuture<Integer> future2 = identityLoader.load(2);
identityLoader.dispatch();

await().until(() -> future1.isDone() && future2.isDone());
assertThat(future1.get(), equalTo(1));
assertThat(future2.get(), equalTo(2));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need tests to show how you may ask for N keys bit only some of them complete. We should be able to see that dl.load(k1) has completed but dl.load(k2) is not completed say

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting to indicate that I have not yet completed this but I have not forgotten this.


// A simple wrapper class intended as a proof external libraries can leverage this.
private static class Publisher<V> {
private final BatchObserver<V> delegate;
private Publisher(BatchObserver<V> delegate) { this.delegate = delegate; }
void onNext(V value) { delegate.onNext(value); }
void onCompleted() { delegate.onCompleted(); }
void onError(Throwable e) { delegate.onError(e); }
// Mock 'subscribe' methods to simulate what would happen in the real thing.
void subscribe(List<V> values) {
values.forEach(this::onNext);
this.onCompleted();
}
void subscribe(List<V> values, Throwable e) {
values.forEach(this::onNext);
this.onError(e);
}
}

private static <K> ObserverBatchLoader<K, K> keysAsValues() {
return (keys, observer) -> {
Publisher<K> publisher = new Publisher<>(observer);
publisher.subscribe(keys);
};
}

private static <K, V> ObserverBatchLoader<K, V> keysWithValuesAndException(List<V> values, Throwable e) {
return (keys, observer) -> {
Publisher<V> publisher = new Publisher<>(observer);
publisher.subscribe(values, e);
};
}
}
Loading
Loading