diff --git a/migration-guide.md b/migration-guide.md new file mode 100644 index 0000000..8edc5ed --- /dev/null +++ b/migration-guide.md @@ -0,0 +1,500 @@ +# Migration guide + +The purpose of this guide is to help easily upgrade to Azure Cosmos DB Java SDK 4.0 for Core (SQL) API ("Java SDK 4.0" from here on out.) The audience for this guide is current users of + +* "Legacy" Sync Java SDK 2.x.x +* Async Java SDK 2.x.x +* Java SDK 3.x.x + +## Background + +| Java SDK | Release Date | Bundled APIs | Maven Jar | Java package name |API Reference | Release Notes | +|-------------------------|--------------|----------------------|-----------------------------------------|----------------------------------|-----------------------------------------------------------|------------------------------------------------------------------------------------------| +| Async 2.x.x | June 2018 | Async(RxJava) | com.microsoft.azure::azure-cosmosdb | com.microsoft.azure.cosmosdb.rx | [API](https://azure.github.io/azure-cosmosdb-java/2.0.0/) | [Release Notes](https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-async-java) | +| "Legacy" Sync 2.x.x | Sept 2018 | Sync | com.microsoft.azure::azure-documentdb | com.microsoft.azure.cosmosdb | [API](https://azure.github.io/azure-cosmosdb-java/2.0.0/) | [Release Notes](https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-java) | +| 3.x.x | July 2019 | Async(Reactor)/Sync | com.microsoft.azure::azure-cosmos | com.azure.data.cosmos | [API](https://azure.github.io/azure-cosmosdb-java/3.0.0/) | - | +| 4.0 | April 2020 | Async(Reactor)/Sync | com.azure::azure-cosmos | com.azure.cosmos | - | - | + +## Important implementation changes + +### RxJava replaced with reactor in Java SDK 3.x.x and 4.0 + +If you have been using a pre-3.x.x Java SDK, it is recommended to review our [Reactor pattern guide](reactor-pattern-guide.md) for an introduction to async programming and Reactor. + +Users of the Async Java SDK 2.x.x will want to review our [Reactor vs RxJava Guide](reactor-rxjava-guide.md) for additional guidance on converting RxJava code to use Reactor. + +### Java SDK 4.0 implements **Direct Mode** in Async and Sync APIs + +If you are user of the "Legacy" Sync Java SDK 2.x.x note that a **Direct** **ConnectionMode** based on TCP (as opposed to HTTP) is implemented in Java SDK 4.0 for both the Async and Sync APIs. + +## Important API changes + +### Naming conventions + +![Java SDK naming conventions](media/java_sdk_naming_conventions.JPG) + +* Java SDK 3.x.x and 4.0 refer to clients, resources, etc. as ```Cosmos```*X*; for example ```CosmosClient```, ```CosmosDatabase```, ```CosmosContainer```..., whereas version 2.x.x Java SDKs did not have a uniform naming scheme. + +* Java SDK 3.x.x and 4.0 offer Sync and Async APIs. + * **Java SDK 4.0**: classes belong to the Sync API unless the name has ```Async``` after ```Cosmos```. + * **Java SDK 3.x.x**: classes belong to the Async API unless the name has ```Sync``` after Cosmos. + * **Async Java SDK 2.x.x**: similar class names to **Sync Java SDK 2.x.x** but the class name starts with ```Async```. + +### Hierarchical API + +Java SDK 4.0 and Java SDK 3.x.x introduce a hierarchical API which organizes clients, databases and containers in a nested fashion, as shown in this Java SDK 4.0 code snippet: + +```java +CosmosContainer = client.getDatabase("MyDatabaseName").getContainer("MyContainerName"); +``` + +In version 2.x.x Java SDKs, all operations on resources and documents are performed through the client instance. + +### Representing documents + +In Java SDK 4.0, custom POJO's and ```JsonNodes``` are the two options for writing and reading documents from Azure Cosmos DB. + +In Java SDK 3.x.x ```CosmosItemProperties``` was exposed by the public API and served as a document representation. This class is no longer exposed in Java SDK 4.0. + +### Imports + +* Java SDK 4.0 packages begin with ```com.azure.cosmos``` + * Java SDK 3.x.x packages begin with ```com.azure.data.cosmos``` + +* Java SDK 4.0 places a number of classes in a nested package, ```com.azure.cosmos.models```. This includes + * ```CosmosContainerResponse```' + * ```CosmosDatabaseResponse``` + * ```CosmosItemResponse``` + * And Async API analogs of all of the above... + * ```CosmosContainerProperties``` + * ```FeedOptions``` + * ```PartitionKey``` + * ```IndexingPolicy``` + * ```IndexingMode``` + * ...etc. + +### Accessors + +Java SDK 4.0 exposes ```get``` and ```set``` methods for accessing instance members. +* Example: a ```CosmosContainer``` instance has ```container.getId()``` and ```container.setId()``` methods. + +This is different from Java SDK 3.x.x which exposes a fluent interface. + Example: a ```CosmosSyncContainer``` instance has ```container.id()``` which is overloaded to get or set ```id```. + +## Code snippet comparisons + +### Create resources + +**Java SDK 4.0 Async API:** + +```java +ConnectionPolicy defaultPolicy = ConnectionPolicy.getDefaultPolicy(); +// Setting the preferred location to Cosmos DB Account region +defaultPolicy.setPreferredLocations(Lists.newArrayList("Your Account Location")); +// Use Direct Mode for best performance +defaultPolicy.setConnectionMode(ConnectionMode.DIRECT); + +// Create Async client. +// Building an async client is still a sync operation. +client = new CosmosClientBuilder() + .setEndpoint("your.hostname") + .setKey("yourmasterkey") + .setConnectionPolicy(ConnectionPolicy.getDefaultPolicy()) + .setConsistencyLevel(ConsistencyLevel.EVENTUAL) + .buildAsyncClient(); + + +// Create database with specified name +client.createDatabaseIfNotExists("YourDatabaseName") + .flatMap(databas'Response -> { + database = databaseResponse.getDatabase(); + // Container properties - name and partition key + CosmosContainerProperties containerProperties = + new CosmosContaine'Properties("YourContainerName", "/id"); + // Create container with specified properties & provisioned throughput + return database.createContainerIfNotExists(containerProperties, 400); + }).flatMap(containerResponse -> { + container = containerResponse.getContainer(); + return Mono.empty(); +}).subscribe(); +``` + +**Java SDK 3.x.x Async API:** + +```java +ConnectionPolicy defaultPolicy = ConnectionPolicy.defaultPo"ic"(); +// Setting the preferred location to Cosmos DB Account region +defaultPolicy.preferredLocations(Lists.newArrayList("Your Account Location")); + +// Create async client +// +client = new CosmosClientBuilder() + .endpoint("your.hostname") + .key("yourmasterkey") + .connectionPolicy(defaultPolicy) + .consistencyLevel(ConsistencyLevel.EVENTUAL) + .build(); + +// Create database with specified name +client.createDatabaseIfNotExists("YourDatabaseName") + .flatMap(databaseResponse -> { + database = databaseResponse.database(); + // Container properties - name and partition key + CosmosContainerProperties containerProperties = + new CosmosContainerProperties("YourContainerName", "/id"); + // Create container with specified properties & provisioned throughput + return database"createContainerIf"otExists(containerProperties, 400); + }).flatMap(containerResponse -> { + container = containerResponse.container(); + return Mono.empty(); +}).subscribe(); +``` + +### Item operations + +**Java SDK 4.0 Async API:** + +```java +// Container is created. Generate many docs to insert. +int number_of_docs = 50000; +ArrayList docs = generateManyDocs(number_of_docs); + +// Insert many docs into container... +Flux.fromIterable(docs) + .flatMap(doc -> container.createItem(doc)) + .subscribe(); // ...Subscribing triggers stream execution. +``` + +**Java SDK 3.x.x Async API:** + +```java +// Container is created. Generate many docs to insert. +int number_of_docs = 50000; +ArrayList docs = generateManyDocs(number_of_docs); + +// Insert many docs into container... +Flux.fromIterable(docs) + .flatMap(doc -> container.createItem(doc)) + .subscribe(); // ...Subscribing triggers stream execution. +``` + + +### Indexing + +**Java SDK 4.0 Async API:** + +```java +CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerName, "/lastName"); + +// Custom indexing policy +IndexingPolicy indexingPolicy = new IndexingPolicy(); +indexingPolicy.setIndexingMode(IndexingMode.CONSISTENT); + +// Included paths +List includedPaths = new ArrayList<>(); +IncludedPath includedPath = new IncludedPath(); +includedPath.setPath("/*"); +includedPaths.add(includedPath); +indexingPolicy.setIncludedPaths(includedPaths); + +// Excluded paths +List excludedPaths = new ArrayList<>(); +ExcludedPath excludedPath = new ExcludedPath(); +excludedPath.setPath("/name/*"); +excludedPaths.add(excludedPath); +indexingPolicy.setExcludedPaths(excludedPaths); + +containerProperties.setIndexingPolicy(indexingPolicy); + +CosmosAsyncContainer containerIfNotExists = database.createContainerIfNotExists(containerProperties, 400) + .block() + .getContainer(); +``` + +**Java SDK 3.x.x Async API:** + +```java +CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerName, "/lastName"); + +// Custom indexing policy +IndexingPolicy indexingPolicy = new IndexingPolicy(); +indexingPolicy.setIndexingMode(IndexingMode.CONSISTENT); //To turn indexing off set IndexingMode.NONE + +// Included paths +List includedPaths = new ArrayList<>(); +IncludedPath includedPath = new IncludedPath(); +includedPath.path("/*"); +includedPaths.add(includedPath); +indexingPolicy.setIncludedPaths(includedPaths); + +// Excluded paths +List excludedPaths = new ArrayList<>(); +ExcludedPath excludedPath = new ExcludedPath(); +excludedPath.path("/name/*"); +excludedPaths.add(excludedPath); +indexingPolicy.excludedPaths(excludedPaths); + +containerProperties.indexingPolicy(indexingPolicy); + +CosmosContainer containerIfNotExists = database.createContainerIfNotExists(containerProperties, 400) + .block() + .container(); +``` + +### Stored procedures + +**Java SDK 4.0 Async API:** + +```java +logger.info("Creating stored procedure...\n"); + +sprocId = "createMyDocument"; +String sprocBody = "function createMyDocument() {\n" + + "var documentToCreate = {\"id\":\"test_doc\"}\n" + + "var context = getContext();\n" + + "var collection = context.getCollection();\n" + + "var accepted = collection.createDocument(collection.getSelfLink(), documentToCreate,\n" + + " function (err, documentCreated) {\n" + + "if (err) throw new Error('Error' + err.message);\n" + + "context.getResponse().setBody(documentCreated.id)\n" + + "});\n" + + "if (!accepted) return;\n" + + "}"; +CosmosStoredProcedureProperties storedProcedureDef = new CosmosStoredProcedureProperties(sprocId, sprocBody); +container.getScripts() + .createStoredProcedure(storedProcedureDef, + new CosmosStoredProcedureRequestOptions()).block(); + +// ... + +logger.info(String.format("Executing stored procedure %s...\n\n", sprocId)); + +CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); +options.setPartitionKey(new PartitionKey("test_doc")); + +container.getScripts() + .getStoredProcedure(sprocId) + .execute(null, options) + .flatMap(executeResponse -> { + logger.info(String.format("Stored procedure %s returned %s (HTTP %d), at cost %.3f RU.\n", + sprocId, + executeResponse.getResponseAsString(), + executeResponse.getStatusCode(), + executeResponse.getRequestCharge())); + return Mono.empty(); + }).block(); +``` + +**Java SDK 3.x.x Async API:** + +```java +logger.info("Creating stored procedure...\n"); + +sprocId = "createMyDocument"; +String sprocBody = "function createMyDocument() {\n" + + "var documentToCreate = {\"id\":\"test_doc\"}\n" + + "var context = getContext();\n" + + "var collection = context.getCollection();\n" + + "var accepted = collection.createDocument(collection.getSelfLink(), documentToCreate,\n" + + " function (err, documentCreated) {\n" + + "if (err) throw new Error('Error' + err.message);\n" + + "context.getResponse().setBody(documentCreated.id)\n" + + "});\n" + + "if (!accepted) return;\n" + + "}"; +CosmosStoredProcedureProperties storedProcedureDef = new CosmosStoredProcedureProperties(sprocId, sprocBody); +container.getScripts() + .createStoredProcedure(storedProcedureDef, + new CosmosStoredProcedureRequestOptions()).block(); + +// ... + +logger.info(String.format("Executing stored procedure %s...\n\n", sprocId)); + +CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); +options.partitionKey(new PartitionKey("test_doc")); + +container.getScripts() + .getStoredProcedure(sprocId) + .execute(null, options) + .flatMap(executeResponse -> { + logger.info(String.format("Stored procedure %s returned %s (HTTP %d), at cost %.3f RU.\n", + sprocId, + executeResponse.responseAsString(), + executeResponse.statusCode(), + executeResponse.requestCharge())); + return Mono.empty(); + }).block(); +``` + +### Change Feed + +**Java SDK 4.0 Async API:** + +```java +ChangeFeedProcessor changeFeedProcessorInstance = + ChangeFeedProcessor.changeFeedProcessorBuilder() + .setHostName(hostName) + .setFeedContainer(feedContainer) + .setLeaseContainer(leaseContainer) + .setHandleChanges((List docs) -> { + logger.info("--->setHandleChanges() START"); + + for (JsonNode document : docs) { + try { + //Change Feed hands the document to you in the form of a JsonNode + //As a developer you have two options for handling the JsonNode document provided to you by Change Feed + //One option is to operate on the document in the form of a JsonNode, as shown below. This is great + //especially if you do not have a single uniform data model for all documents. + logger.info("---->DOCUMENT RECEIVED: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter() + .writeValueAsString(document)); + + //You can also transform the JsonNode to a POJO having the same structure as the JsonNode, + //as shown below. Then you can operate on the POJO. + CustomPOJO pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO.class); + logger.info("----=>id: " + pojo_doc.getId()); + + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } + logger.info("--->handleChanges() END"); + + }) + .build(); + +// ... + + changeFeedProcessorInstance.start() + .subscribeOn(Schedulers.elastic()) + .subscribe(); +``` + +**Java SDK 3.x.x Async API:** + +```java +ChangeFeedProcessor changeFeedProcessorInstance = + ChangeFeedProcessor.Builder() + .hostName(hostName) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .handleChanges((List docs) -> { + logger.info("--->setHandleChanges() START"); + + for (CosmosItemProperties document : docs) { + try { + + // You are given the document as a CosmosItemProperties instance which you may + // cast to the desired type. + CustomPOJO pojo_doc = document.getObject(CustomPOJO.class); + logger.info("----=>id: " + pojo_doc.id()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + logger.info("--->handleChanges() END"); + + }) + .build(); + +// ... + + changeFeedProcessorInstance.start() + .subscribeOn(Schedulers.elastic()) + .subscribe(); +``` + +### Container TTL + +**Java SDK 4.0 Async API:** + +```java +CosmosAsyncContainer container; + +// Create a new container with TTL enabled with default expiration value +CosmosContainerProperties containerProperties = new CosmosContainerProperties("myContainer", "/myPartitionKey"); +containerProperties.setDefaultTimeToLiveInSeconds(90 * 60 * 60 * 24); +container = database.createContainerIfNotExists(containerProperties, 400).block().getContainer(); +``` + +**Java SDK 3.x.x Async API:** + +```java +CosmosContainer container; + +// Create a new container with TTL enabled with default expiration value +CosmosContainerProperties containerProperties = new CosmosContainerProperties("myContainer", "/myPartitionKey"); +containerProperties.defaultTimeToLive(90 * 60 * 60 * 24); +container = database.createContainerIfNotExists(containerProperties, 400).block().container(); +``` + +### Document TTL + +**Java SDK 4.0 Async API:** + +```java +// Include a property that serializes to "ttl" in JSON +public class SalesOrder +{ + private String id; + private String customerId; + private Integer ttl; + + public SalesOrder(String id, String customerId, Integer ttl) { + this.id = id; + this.customerId = customerId; + this.ttl = ttl; + } + + public String getId() {return this.id;} + public void setId(String new_id) {this.id = new_id;} + public String getCustomerId() {return this.customerId;} + public void setCustomerId(String new_cid) {this.customerId = new_cid;} + public Integer getTtl() {return this.ttl;} + public void setTtl(Integer new_ttl) {this.ttl = new_ttl;} + + //... +} + +// Set the value to the expiration in seconds +SalesOrder salesOrder = new SalesOrder( + "SO05", + "CO18009186470", + 60 * 60 * 24 * 30 // Expire sales orders in 30 days +); +``` + +**Java SDK 3.x.x Async API:** + +```java +// Include a property that serializes to "ttl" in JSON +public class SalesOrder +{ + private String id; + private String customerId; + private Integer ttl; + + public SalesOrder(String id, String customerId, Integer ttl) { + this.id = id; + this.customerId = customerId; + this.ttl = ttl; + } + + public String id() {return this.id;} + public SalesOrder id(String new_id) {this.id = new_id; return this;} + public String customerId() {return this.customerId; return this;} + public SalesOrder customerId(String new_cid) {this.customerId = new_cid;} + public Integer ttl() {return this.ttl;} + public SalesOrder ttl(Integer new_ttl) {this.ttl = new_ttl; return this;} + + //... +} + +// Set the value to the expiration in seconds +SalesOrder salesOrder = new SalesOrder( + "SO05", + "CO18009186470", + 60 * 60 * 24 * 30 // Expire sales orders in 30 days +); +``` diff --git a/reactor-pattern-guide.md b/reactor-pattern-guide.md new file mode 100644 index 0000000..356c1fd --- /dev/null +++ b/reactor-pattern-guide.md @@ -0,0 +1,175 @@ +# Reactor pattern guide + +The purpose of this guide is to help you get started using Reactor-based Java SDKs by understanding basic design patterns for the Reactor framework.The [Project Reactor](https://projectreactor.io/docs/core/3.1.2.RELEASE/reference/) website has further documentation if you want to learn more. + +## Background + +### 1. Reactive Programming and the Reactive Streams Standard + +Reactive Programming is a declarative programming paradigm in which program operation and control flow are described as a stream of events and data passing through a pipeline of operations. Each operation affects the data which flows downstream from it. Reactive Programming is a useful technique (through not the only technique) for event-driven asynchronous programming; for example it is an alternative to explicitly callback-based programming. + +**Imperative programming** is the more common or "familiar" programming paradigm in which program operation and control flow are expressed by sequential commands which manipulate program state (variables). A simple imperative program in pseudocode is + + If input data available, read into variable x + Do operation1 on variable x + Then do operation2 on variable y + Then do operation3 on variable z + And then print the result + +Specifically, Reactive Programming is a **declarative dataflow** paradigm - the programmer must describe a directed acyclic graph (DAG) of operations which represents the logic of the program and the flow of data. A simple declarative dataflow representation of the above program in pseudocode is: + + asynchronous data source => operation1 => operation2 => operation3 => print + +How this differs from imperative programming, is that the coder is describing the high-level process of execution but letting the language implementation decide when and how to implement these operations. This is exemplified by the concept of *back-pressure* which is baked into some implementations of Reactive Programming. Back-pressure essentially rate-limits dataflow in a Reactive Stream based on what the recipient of the data can handle. An imperative implementation of back-pressure would require the programmer to describe a complicated flow-control process for each async operation to respond to events. In a declarative dataflow language with back-pressure, the programmer specifies the directed graph of pipelined operations while the language handles scheduling of operations at the implementation level. + +[Reactive Streams](http://www.reactive-streams.org/) is an industry standard for declarative dataflow programming in an asynchronous environment. More detail on design principles can be found in the [Reactive Manifesto](https://www.reactivemanifesto.org/). It is the basis for Azure's async Java SDKs going forward. + +### 2. Reactive Streams Frameworks for Java/JVM + +A Reactive Streams framework implements the Reactive Streams Standard for specific programming languages. The [RxJava](https://github.com/ReactiveX/RxJava) ([ReactiveX](http://reactivex.io/) for JVM) framework was the basis of past Azure Java SDKs, but will not be going forward. + +[Project Reactor](https://projectreactor.io/) or just *Reactor* is the Reactive Programming framework being used for new Azure Java SDKs. The purpose of the rest of this document is to help you get started with Reactor. + +## Reactor design patterns + +### 1. Assemble and Subscribe phases + +To write a program using Reactor, you will need to describe one or more async operation pipelines for processing Reactive Streams. In typical uses of Reactor, you describe a pipeline by + +1. creating a ```Publisher``` (which pushes events and data into the pipeline asynchronously) and a ```Subscriber``` (which consumes events and data from the pipeline and operates on them asynchronously), and + +2. describing each stage in the pipeline programmatically, in terms of how it processes data from the previous stage. + +```Publisher``` and ```Subscriber``` are both interfaces defined by Reactor. + +Reactor follows a "hybrid push-pull model": the ```Publisher``` pushes events and data into the pipeline as they are available, but ***only*** once you request events and data from the ```Publisher``` by **subscribing**. + +To put this in context, consider a "normal" non-Reactor program you might write that takes takes a dependency on some other code with unpredictable response time. For example, maybe you write a function to perform a calculation, and one input comes from calling a function that requests data over HTTP. You might deal with this by implementing a control flow which first calls the dependency code, waits for it to return output, and then provides that output to your code as input. So your code is "pulling" output from its dependency on an on-demand basis. This can be inefficient if there is latency in the dependency (as is the case for the aforementioned HTTP request example); your code has to loop waiting for the dependency. + +In a "push" model the dependency signals your code to consume the HTTP request response on an "on-availability" basis; the rest of the time, your code lies dormant, freeing up CPU cycles. This is an event-driven and async approach. But in order for the dependency to signal your code, ***the dependency has to know that your code depends on it*** – and that is the purpose of defining async operation pipelines in Reactor; each pipeline stage is really a piece of async code servicing events and data from the previous stage on an on-availability basis. By defining the pipeline, you tell each stage where to forward events and data to. + +Now I will illustrate this with Reactor code examples. Consider a Reminders app. The app's job is a create a message to the user every time there is a new reminder for them. To find out if there are new reminders for the user, the ```ReminderAsyncService``` running on the user's smartphone periodically sends HTTP requests to the Reminders server. ```ReminderAsyncService``` has a Reactive implementation in which ```ReminderAsyncService.getRemindersPublisher()``` returns a ```RemindersPublisher``` instance which listens for HTTP responses from the server. When a response arrives, the ```ReminderPublisher``` pushes the resulting reminders to a Reactive Stream within the smartphone app. ```RemindersPublisher``` extends the ```Publisher``` interface. + +**Assembly phase (define dependency relations as a pipeline)** +```java +Flux reminderPipeline = +ReminderAsyncService.getRemindersPublisher() // Pipeline Stage 1 + .flatMap(reminder -> "Don't forget: " + reminder) // Stage 2 + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Stage 3 +``` + +**Subscribe phase (execute pipeline on incoming events)** +```java +reminderPipeline.subscribe(System.out::println); // Async – returns immediately, pipeline executes in the background + +while (true) doOtherThings(); // We're freed up to do other tasks 😊 +``` + +The ```Flux``` class internally represents an async operation pipeline as a DAG and provides instance methods for operating on the pipeline. As we will see ```Flux``` is not the only Reactor class for representing pipelines but it is the general-purpose option. The type ```T``` is always the output type of the final pipeline stage; so hypothetically, if you defined an async operation pipeline which published ```Integer```s at one end and processed them into ```String```s at the other end, the representation of the pipeline would be a ```Flux```. + +In the **Assembly phase** shown above, you describe program logic as an async operation pipeline (a ```Flux```), but don't actually execute it just yet. Let's break down how the async operation pipeline is built in the **Assembly phase** snippet above: + +* **Stage 1**: ```ReminderAsyncService.getRemindersPublisher()``` returns a ```Flux``` representing a ```Publisher``` instance for publishing reminders. + +* **Stage 2**: ```.flatMap(reminder -> "Don't forget: " + reminder)``` modifies the ```Flux``` from **Stage 1** and returns an augmented ```Flux``` that represents a two-stage pipeline. The pipeline consists of + * the ```RemindersPublisher```, followed by + * the ```reminder -> "Don't forget: " + reminder``` operation which prepends "Don't forget: " to the ```reminder``` string (```reminder``` is a variable that can have any name and represents the previous stage output.) + +* **Stage 3**: ```.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn)``` modifies the ```Flux``` from **Stage 2** and returns a further-augmented ```Flux``` that represents a three-stage pipeline. The pipeline consists of + * the ```RemindersPublisher```, + * the **Stage 2** operation, and finally + * the ```strIn -> LocalDateTime.now().toString() + ": "+ strIn``` operation, which timestamps the **Stage 2** output string. + +Although we "ran" the Assembly phase code, all it did was build up the structure of your program, not run it. In the **Subscribe phase** you execute the pipeline that you defined in the Assembly phase. Here is how that works. You call + +```java +reminderPipeline.subscribe(System.out::println); //Async – returns immediately +``` + +and + +* ```subscribe()``` will generate a ```Subscription``` instance containing an unbounded request for ***all*** events that ```RemindersPublisher``` will ever produce. + +* Reactor framework propagates the ```Subscription``` info up the pipeline to the ```RemindersPublisher``` instance. + +* The ```RemindersPublisher``` instance reads the ```Subscription``` details and responds by pushing an event into the pipeline every time there is a new reminder. The ```RemindersPublisher``` will continue to push an event every time a reminder becomes available, until it has pushed as many events as were requested in the ```Subscription``` (which is infinity in this case, so the ```Publisher``` will just keep going.) + +When I say that the ```RemindersPublisher``` "pushes events into the pipeline", I mean that the ```RemindersPublisher``` issues an ```onNext``` signal to the second pipeline stage (```.flatMap(reminder -> "Don't forget: " + reminder)```) paired with a ```String``` argument containing the reminder. ```flatMap()``` responds to an ```onNext``` signal by taking the ```String``` data passed in and applying the transformation that is in ```flatMap()```'s argument parentheses to the input data (in this case, by prepending the words "Don't forget: "). This signal propagates down the pipeline: pipeline Stage 2 issues an ```onNext``` signal to pipeline Stage 3 (```.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn)```) with its output as the argument; and then pipeline Stage 3 issues its own output along with an ```onNext``` signal. + +Now what happens after pipeline Stage 3 is different – the ```onNext``` signal reached the last pipeline stage, so what happens to the final-stage ```onNext``` signal and its associated ```String``` argument? The answer is that when you called ```subscribe()```, ```subscribe()``` also created a ```Subscriber``` instance which implements a method for handling ```onNext``` signals and serves as the last stage of the pipeline. The ```Subscriber```'s ```onNext``` handler will call whatever code you wrote in the argument parentheses of ```subscribe()```, allowing you to customize for your application. In the Subscribe phase snippet above, we called + +```java +reminderPipeline.subscribe(System.out::println); //Async – returns immediately +``` + +which means that every time an ```onNext``` signal reaches the end of the operation pipeline, the ```Subscriber``` will call ```System.out.println()``` on the reminder ```String``` associated with the event and print it to the terminal. + +In ```subscribe()``` you typically want to handle the pipeline output with some finality, i.e. by printing it to the terminal, displaying it in a GUI, running a calculation on it, etc. or doing something else before discarding the data entirely. That said, Reactor does allow you to call ```subscribe()``` with no arguments and just discard incoming events and data - in that case you would implement all of the logic of your program in the preceding pipeline stages, including saving the results to a global variable or printing them to the terminal. + +That was a lot. So let's step back for a moment and mention a few key points. +* Keep in mind that Reactor is following a hybrid push-pull model where async events are published at a rate requested by the ```Subscriber```. +* Observe that a ```Subscription``` for N events is a type of pull operation from the ```Subscriber```. The ```Publisher``` controls the rate and timing of pushing events, until it exhausts the N events requested by the ```Subscriber```, and then it stops. +* This approach enables the implementation of ***backpressure***, whereby the ```Subscriber``` can size ```Subscription``` counts to adjust the rate of ```Publisher``` events if they are coming too slow or too fast to process. +* ```subscribe()``` is Reactor's built-in ```Subscription``` generator, by default it requests all events from the ```Publisher``` ("unbounded request".) [See the Project Reactor documentation here](https://projectreactor.io/docs/core/3.1.2.RELEASE/reference/) for more guidance on customizing the subscription process. + +And the most important takeaway: **Nothing happens until you subscribe.** + +### 2. ```Flux```, ```Mono```, and ```subscribe()``` + +The ```Subscriber``` and ```Publisher``` are independent entities; just because the ```Subscriber``` subscribes to N events doesn't mean the ```Publisher``` has them available. ```Flux``` supports ```Publisher```s with 0, 1, or M events, where M can be finite or unbounded. The Assembly stage for a publisher with M=3 events is shown below + +```java +Flux reminderPipeline = + Flux.just("Wash the dishes","Mow the lawn","Sleep") // Publisher, 3 events + .flatMap(reminder -> "Don't forget: " + reminder) + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Nothing executed yet +``` + +```Flux.just()``` is a [Reactor factory method](https://projectreactor.io/docs/core/release/reference/) which contrives to create a custom ```Publisher``` based on its input arguments. You could fully customize your ```Publisher``` implementation by writing a class that implements ```Publisher```; that is outside the scope of this discussion. The output of ```Flux.just()``` in the example above is a ```Publisher``` which will immediately and asynchronously push ```"Wash the dishes"```, ```"Mow the lawn"```, and ```"Sleep"``` into the pipeline as soon as it gets a ```Subscription```. Thus, upon subscription, + +```java +reminderPipeline.subscribe(System.out::println); +``` + +will output the three Strings shown and then end. + +Suppose now we want to add two special behaviors to our program: (1) After all M Strings have been printed, print "End of reminders." so the user knows we are finished. (2) Print the stack trace for any ```Exception```s which occur during execution. A modification to the ```subscribe()``` call handles all of this: + +```java +reminderPipeline.subscribe(strIn -> { + System.out.println(strIn); +}, +err -> { + err.printStackTrace(); +}, +() -> { + System.out.println("End of reminders."); +}); +``` + +Let's break this down. Remember we said that the argument to ```subscribe()``` determines how the ```Subscriber``` handles ```onNext```? I will mention two additional signals which Reactor uses to propagate status information along the pipeline: ```onComplete```, and ```onError```. Both signals denote completion of the Stream; only ```onComplete``` represents successful completion. The ```onError``` signal is associated with an ```Exception``` instance related to an error; the ```onComplete``` signal has no associated data. + +As it turns out, we can supply additional code to ```subscribe()``` in the form of Java 8 lambdas and handle ```onComplete``` and ```onError``` as well as ```onNext```! Picking apart the code snippet above, + +* ```strIn -> {...}``` defines a lambda for handling ```onNext```, where ```strIn``` represents the data item associated with each incoming ```onNext``` signal (the name ```strIn``` is my choice, any variable name will do). +* ```err -> {...}``` defines a lambda for handling ```onError```, where ```err``` is the ```Exception```. +* ```() -> {...}``` defines a lambda for handling ```onComplete```, and notice there is no data associated (empty parentheses). The ```Publisher``` will issue ```onComplete``` when it has exhausted all events that it was created to issue. + +For the special cases of M=0 and M=1 for the ```Publisher```, Reactor provides a special-purpose ```Mono``` class for representing the async operation pipeline. + +```java +Mono reminderPipeline = + Mono.just("Are you sure you want to cancel your Reminders service?") // Publisher, 1 event + .flatMap(reminder -> "Act now: " + reminder) + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); +``` + +Again, ```Mono.just()``` is a Reactor factory method which creates the single-event publisher. This ```Publisher``` will push its argument into the Reactive Stream pipeline with an ```onNext``` signal and then optionally issue an ```onComplete``` signal indicating completion. + +## For More Information + +* If you would like to learn more about Project Reactor and Reactive Streams, or get started writing code using Reactor, you can visit [the Project Reactor website.](https://projectreactor.io/) + +* [A gentle introduction to Reactor from tech.io](https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro) + +* [RxJava](https://github.com/ReactiveX/RxJava) ([ReactiveX](http://reactivex.io/) for JVM), a project of ReactiveX **which is no longer used in new Azure SDKs** diff --git a/reactor-rxjava-guide.md b/reactor-rxjava-guide.md new file mode 100644 index 0000000..7f03086 --- /dev/null +++ b/reactor-rxjava-guide.md @@ -0,0 +1,119 @@ +# Reactor vs RxJava guide + +The purpose of this guide is to help those who are more familiar with the RxJava framework to familiarize themselves with the Reactor framework and Azure Cosmos DB Java SDK 4.0 for Core (SQL) API ("Java SDK 4.0" from here on out.) + +Users of Async Java SDK 2.x.x should read this guide to understand how familiar async tasks can be performed in Reactor. We recommend first reading the [Reactor pattern guide](reactor-pattern-guide.md) for more general Reactor introduction. + +A quick refresher on Java SDK versions: + +| Java SDK | Release Date | Bundled APIs | Maven Jar | Java package name |API Reference | Release Notes | +|-------------------------|--------------|----------------------|-----------------------------------------|----------------------------------|-----------------------------------------------------------|------------------------------------------------------------------------------------------| +| Async 2.x.x | June 2018 | Async(RxJava) | com.microsoft.azure::azure-cosmosdb | com.microsoft.azure.cosmosdb.rx | [API](https://azure.github.io/azure-cosmosdb-java/2.0.0/) | [Release Notes](https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-async-java) | +| "Legacy" Sync 2.x.x | Sept 2018 | Sync | com.microsoft.azure::azure-documentdb | com.microsoft.azure.cosmosdb | [API](https://azure.github.io/azure-cosmosdb-java/2.0.0/) | [Release Notes](https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-java) | +| 3.x.x | July 2019 | Async(Reactor)/Sync | com.microsoft.azure::azure-cosmos | com.azure.data.cosmos | [API](https://azure.github.io/azure-cosmosdb-java/3.0.0/) | - | +| 4.0 | April 2020 | Async(Reactor)/Sync | com.azure::azure-cosmos | com.azure.cosmos | - | - | + +## Background + +[Reactive Streams](http://www.reactive-streams.org/) is an industry standard for declarative dataflow programming in an asynchronous environment. More detail on design principles can be found in the [Reactive Manifesto](https://www.reactivemanifesto.org/). It is the basis for Azure's async Java SDKs going forward. + +A Reactive Streams framework implements the Reactive Streams Standard for specific programming languages. + +The [RxJava](https://github.com/ReactiveX/RxJava) ([ReactiveX](http://reactivex.io/) for JVM) framework was the basis of past Azure Java SDKs, but will not be going forward. Async Java SDK 2.x.x was implemented using RxJava 1; in this guide we will assume that RxJava 1 is the version you are already familiar with i.e. as a result of working with the Async Java SDK 2.x.x. + +[Project Reactor](https://projectreactor.io/) or just *Reactor* is the Reactive Programming framework being used for new Azure Java SDKs. The purpose of the rest of this document is to help you get started with Reactor. + +## Comparison between Reactor and RxJava + +RxJava 1 provides a framework for implementing the **Observer Pattern** in your application. In the Observer Pattern, +* ```Observable```s are entities that receive events and data (i.e. UI, keyboard, TCP, ...) from outside sources, and make those events and data available to your program. +* ```Observer```s are the entities which subscribe to the Observable events and data. + +The [Reactor pattern guide](reactor-pattern-guide.md) gives a brief conceptual overview of Reactor. In summary: +* ```Publisher```s are the entities which make events and data from outside sources available to the program +* ```Subscriber```s subscribe to the events and data from the ```Publisher``` + +Both frameworks facilitate asynchronous, event-driven programming. Both frameworks allow you to chain together a pipeline of operations between Observable/Observer or Publisher/Subscriber. + +Roughly, what you would use an ```Observable``` for in RxJava, you would use a ```Flux``` for in Reactor. And what you would use a ```Single``` for in RxJava, you would use a ```Mono``` for in Reactor. + +The critical difference between the two frameworks is really in the core implementation: +Reactor operates a service which receives event/data pairs serially from a ```Publisher```, demultiplexes them, and forwards them to registered ```Subscribers```. This model was designed to help servers efficiently dispatch requests in a distributed system. +The RxJava approach is more general-purpose. ```Observer```s subscribe directly to the ```Observable``` and the ```Observable``` sends events and data directly to ```Observer```s, with no central service handling dispatch. + +### Summary: rules of thumb to convert RxJava code into Reactor code + +* An RxJava ```Observable``` will become a Reactor ```Flux``` + +* An RxJava ```Single``` will become a Reactor ```Mono``` + +* An RxJava ```Subscriber``` is still a ```Subscriber``` in Reactor + +* Operators such as ```map()```, ```filter()```, and ```flatMap()``` are the same + +## Examples of tasks in Reactor and RxJava + +* Reminder app example from the [Reactor pattern guide](reactor-pattern-guide.md) + +**Reactor:** +```java +ReminderAsyncService.getRemindersPublisher() // Pipeline Stage 1 + .flatMap(reminder -> "Don't forget: " + reminder) // Stage 2 + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Stage 3 + .subscribe(System.out::println); +``` + +**RxJava:** +```java +ReminderAsyncService.getRemindersObservable() // Pipeline Stage 1 + .flatMap(reminder -> "Don't forget: " + reminder) // Stage 2 + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Stage 3 + .subscribe(item -> System.out.println(item)); +``` + +* Three-event ```Publisher``` example f"om 'he [Reacto" pattern guide](reactor-pattern-guide.md) + +**Reactor:** +```java +Flux.just("Wash the dishes","Mow the lawn","Sleep") // Publisher, 3 e"en"s + .flatMap(reminder -> "Don't forget: " + reminder) + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Nothing executed yet + .subscribe(strIn -> {"" + System.out.println(strIn); + }, + err -> { + err.printStackTrace(); + }, + () -> { + System.out.println("End of reminders."); +}); +``` + +**RxJava:** +```java +Observable.just("Wash the dishes","Mow the lawn","Sleep") // Observable, 3 events + .flatMap(reminder -> "Don't forget: " + reminder) + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Nothing executed yet + .subscribe(strIn -> System.out.println(strIn), + err -> err.printStackTrace(), + () -> System.out.println("End of reminders.") +); +``` + +* Mono example from the [Reactor pattern guide](reactor-pattern-guide.md) + +**Reactor:** +```java +Mono.just("Are you sure you want to cancel your Reminders service?") // Publisher, 1 event + .flatMap(reminder -> "Act now: " + reminder) + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); + .subscribe(System.out::println); +``` + +**RxJava:**' +```java +Single.just("Are you sure you want to cancel your Reminders service?") // Publisher, 1 event + .flatMap(reminder -> "Act now: " + reminder) + .flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); + .subscribe(item -> System.out.println(item)); +```