Skip to content
This repository was archived by the owner on Mar 11, 2022. It is now read-only.

Commit 6efd50b

Browse files
jjrodrigtomblench
authored andcommitted
Add support for mango selectors in Pull replicator (#566)
Include support for replication filtering with selectors
1 parent 1920323 commit 6efd50b

17 files changed

+541
-155
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Unreleased
2+
- [NEW] Added API for specifying a mango selector in the filtered pull replicator
23
- [IMPROVED] Improved efficiency of sub-query when picking winning
34
revisions. This improves performance when inserting revisions,
45
including during pull replication.

cloudant-sync-datastore-core/build.gradle

+12-1
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,22 @@ test {
5353
'com.cloudant.common.SystemTest', \
5454
'com.cloudant.common.RequireRunningCouchDB', \
5555
'com.cloudant.common.PerformanceTest', \
56-
'com.cloudant.common.RequireRunningProxy'
56+
'com.cloudant.common.RequireRunningProxy', \
57+
'com.cloudant.common.RequireCloudant'
5758
}
5859
}
5960

6061
task integrationTest(type: Test, dependsOn: testClasses) {
62+
useJUnit {
63+
excludeCategories \
64+
'com.cloudant.common.SystemTest', \
65+
'com.cloudant.common.PerformanceTest', \
66+
'com.cloudant.common.RequireRunningProxy', \
67+
'com.cloudant.common.RequireCloudant'
68+
}
69+
}
70+
71+
task integrationTestCloudant(type: Test, dependsOn: testClasses) {
6172
useJUnit {
6273
excludeCategories \
6374
'com.cloudant.common.SystemTest', \

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/internal/mazha/CouchClient.java

+73-42
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.cloudant.sync.internal.documentstore.MultipartAttachmentWriter;
3131
import com.cloudant.sync.internal.util.JSONUtils;
3232
import com.cloudant.sync.internal.util.Misc;
33+
import com.cloudant.sync.replication.PullFilter;
3334
import com.fasterxml.jackson.core.type.TypeReference;
3435

3536
import org.apache.commons.io.IOUtils;
@@ -51,7 +52,7 @@
5152
import java.util.logging.Level;
5253
import java.util.logging.Logger;
5354

54-
public class CouchClient {
55+
public class CouchClient {
5556

5657
private CouchURIHelper uriHelper;
5758
private List<HttpConnectionRequestInterceptor> requestInterceptors;
@@ -65,11 +66,11 @@ public CouchClient(URI rootUri,
6566
this.requestInterceptors = new ArrayList<HttpConnectionRequestInterceptor>();
6667
this.responseInterceptors = new ArrayList<HttpConnectionResponseInterceptor>();
6768

68-
if(requestInterceptors != null) {
69+
if (requestInterceptors != null) {
6970
this.requestInterceptors.addAll(requestInterceptors);
7071
}
7172

72-
if(responseInterceptors != null) {
73+
if (responseInterceptors != null) {
7374
this.responseInterceptors.addAll(responseInterceptors);
7475
}
7576
}
@@ -82,16 +83,14 @@ public URI getRootUri() {
8283
// - stream non-null and exception null: the call was successful, result in stream
8384
// - stream null and exception non-null: the call was unsuccessful, details in exception
8485
// - fatal: set to true when exception non-null, indicates call should not be retried
85-
private static class ExecuteResult
86-
{
86+
private static class ExecuteResult {
8787
private ExecuteResult(InputStream stream,
8888
InputStream errorStream,
8989
int responseCode,
9090
String responseMessage,
91-
Throwable cause)
92-
{
91+
Throwable cause) {
9392
boolean needsCouchException = false;
94-
switch(responseCode / 100) {
93+
switch (responseCode / 100) {
9594
case 1:
9695
case 2:
9796
// 1xx and 2xx are OK
@@ -103,7 +102,8 @@ private ExecuteResult(InputStream stream,
103102
break;
104103
case 3:
105104
// 3xx redirection
106-
throw new CouchException("Unexpected redirection (3xx) code encountered", responseCode);
105+
throw new CouchException("Unexpected redirection (3xx) code encountered",
106+
responseCode);
107107
case 4:
108108
// 4xx errors normally mean we are not authenticated so we shouldn't retry
109109
this.fatal = true;
@@ -135,7 +135,8 @@ private ExecuteResult(InputStream stream,
135135
ce.setReason(json.get("reason"));
136136
this.exception = ce;
137137
} catch (Exception e) {
138-
CouchException ce = new CouchException("Error deserializing server response", cause,
138+
CouchException ce = new CouchException("Error deserializing server response",
139+
cause,
139140
responseCode);
140141
this.exception = ce;
141142
}
@@ -152,7 +153,8 @@ private ExecuteResult(InputStream stream,
152153
// - if there's a couch error returned as json, un-marshall and throw
153154
// - anything else, just throw the IOException back, use the cause part of the exception?
154155

155-
// it needs to catch eg FileNotFoundException and rethrow to emulate the previous exception handling behaviour
156+
// it needs to catch eg FileNotFoundException and rethrow to emulate the previous exception
157+
// handling behaviour
156158
private ExecuteResult execute(HttpConnection connection) {
157159

158160
InputStream inputStream = null; // input stream - response from server on success
@@ -248,7 +250,7 @@ private <T> T executeToJsonObjectWithRetry(final HttpConnection connection,
248250
}
249251

250252
private <T> T executeWithRetry(final HttpConnection connection,
251-
InputStreamProcessor<T> processor) throws
253+
InputStreamProcessor<T> processor) throws
252254
CouchException {
253255
// all CouchClient requests want to receive application/json responses
254256
connection.requestProperties.put("Accept", "application/json");
@@ -284,33 +286,53 @@ private Map<String, Object> getDefaultChangeFeedOptions() {
284286
return options;
285287
}
286288

287-
public ChangesResult changes(Object since, Integer limit) {
288-
return this.changes(null, null, since, limit);
289-
}
290-
291-
public ChangesResult changes(String filterName, Map<String, String> filterParameters, Object since, Integer limit) {
289+
private Map<String, Object> getParametrizedChangeFeedOptions(Object since, Integer limit) {
292290
Map<String, Object> options = getDefaultChangeFeedOptions();
293-
if(filterName != null) {
294-
options.put("filter", filterName);
295-
if(filterParameters != null) {
296-
options.putAll(filterParameters);
297-
}
298-
}
299-
if(since != null) {
291+
if (since != null) {
300292
options.put("since", since);
301293
}
302294
if (limit != null) {
303295
options.put("limit", limit);
304296
}
305297
// seq_interval: improve performance and reduce load on the remote database
306-
if(limit != null) {
298+
if (limit != null) {
307299
options.put("seq_interval", limit);
308300
} else {
309301
options.put("seq_interval", 1000);
310302
}
303+
return options;
304+
}
305+
306+
public ChangesResult changes(Object since, Integer limit) {
307+
Map<String, Object> options = getParametrizedChangeFeedOptions(since, limit);
311308
return this.changes(options);
312309
}
313310

311+
public ChangesResult changes(PullFilter filter, Object since, Integer limit) {
312+
Map<String, Object> options = getParametrizedChangeFeedOptions(since, limit);
313+
if (filter != null) {
314+
String filterName = filter.getName();
315+
Map filterParameters = filter.getParameters();
316+
if (filterName != null) {
317+
options.put("filter", filterName);
318+
if (filterParameters != null) {
319+
options.putAll(filterParameters);
320+
}
321+
}
322+
}
323+
return this.changes(options);
324+
}
325+
326+
public ChangesResult changes(String selector, Object since, Integer limit) {
327+
Misc.checkNotNullOrEmpty(selector,null);
328+
Map<String, Object> options = getParametrizedChangeFeedOptions(since, limit);
329+
options.put("filter", "_selector");
330+
URI changesFeedUri = uriHelper.changesUri(options);
331+
HttpConnection connection = Http.POST(changesFeedUri, "application/json");
332+
connection.setRequestBody(selector);
333+
return executeToJsonObjectWithRetry(connection, ChangesResult.class);
334+
}
335+
314336
public ChangesResult changes(final Map<String, Object> options) {
315337
URI changesFeedUri = uriHelper.changesUri(options);
316338
HttpConnection connection = Http.GET(changesFeedUri);
@@ -349,7 +371,8 @@ public Response create(Object document) {
349371
}
350372
}
351373

352-
public <T> T processAttachmentStream(String id, String rev, String attachmentName, final boolean acceptGzip, InputStreamProcessor<T> processor) {
374+
public <T> T processAttachmentStream(String id, String rev, String attachmentName, final
375+
boolean acceptGzip, InputStreamProcessor<T> processor) {
353376
Misc.checkNotNullOrEmpty(id, "id");
354377
Misc.checkNotNullOrEmpty(rev, "rev");
355378
Map<String, Object> queries = new HashMap<String, Object>();
@@ -362,7 +385,8 @@ public <T> T processAttachmentStream(String id, String rev, String attachmentNam
362385
return executeWithRetry(connection, processor);
363386
}
364387

365-
public void putAttachmentStream(String id, String rev, String attachmentName, String contentType, byte[] attachmentData) {
388+
public void putAttachmentStream(String id, String rev, String attachmentName, String
389+
contentType, byte[] attachmentData) {
366390
Misc.checkNotNullOrEmpty(id, "id");
367391
Misc.checkNotNullOrEmpty(rev, "rev");
368392
Map<String, Object> queries = new HashMap<String, Object>();
@@ -387,18 +411,18 @@ public void putAttachmentStream(String id, String rev, String attachmentName, St
387411
* "2-65ddd7d56da84f25af544e84a3267ccf" ]
388412
* }
389413
*/
390-
public Map<String,Object> getDocConflictRevs(String id) {
414+
public Map<String, Object> getDocConflictRevs(String id) {
391415
Map<String, Object> options = new HashMap<String, Object>();
392416
options.put("conflicts", true);
393417
return this.getDocument(id, options, JSONUtils.STRING_MAP_TYPE_DEF);
394418
}
395419

396420
/**
397-
* Convenience method to get document with revision history for a given list of open revisions. It does that by
421+
* Convenience method to get document with revision history for a given list of open
422+
* revisions. It does that by
398423
* adding "open_revs=["rev1", "rev2"]" option to the GET request.
399424
*
400425
* It must return a list because that is how CouchDB return its results.
401-
*
402426
*/
403427
public List<OpenRevision> getDocWithOpenRevisions(String id, Collection<String> revisions,
404428
Collection<String> attsSince,
@@ -433,7 +457,8 @@ public List<OpenRevision> getDocWithOpenRevisions(String id, Collection<String>
433457
* Each time the iterator is advanced, a DocumentRevsList is returned, which represents the
434458
* leaf nodes and their ancestries for a given document ID.
435459
* </p>
436-
* @param request A request for 1 or more (ID,rev) pairs.
460+
*
461+
* @param request A request for 1 or more (ID,rev) pairs.
437462
* @param pullAttachmentsInline If true, retrieve attachments as inline base64
438463
* @return An iterator representing the result of calling the _bulk_docs endpoint.
439464
*/
@@ -458,7 +483,8 @@ public Iterable<DocumentRevsList> bulkReadDocsWithOpenRevisions(List<BulkGetRequ
458483
// deserialise response
459484
BulkGetResponse response = executeToJsonObjectWithRetry(connection, BulkGetResponse.class);
460485

461-
Map<String,ArrayList<DocumentRevs>> revsMap = new HashMap<String,ArrayList<DocumentRevs>>();
486+
Map<String, ArrayList<DocumentRevs>> revsMap = new HashMap<String,
487+
ArrayList<DocumentRevs>>();
462488

463489
// merge results back in, so there is one list of DocumentRevs per ID
464490
for (BulkGetResponse.Result result : response.results) {
@@ -486,7 +512,7 @@ public Map<String, Object> getDocument(String id) {
486512
return this.getDocument(id, new HashMap<String, Object>(), JSONUtils.STRING_MAP_TYPE_DEF);
487513
}
488514

489-
public <T> T getDocument(String id, final Class<T> type) {
515+
public <T> T getDocument(String id, final Class<T> type) {
490516
return this.getDocument(id, new HashMap<String, Object>(),
491517
new CouchClientTypeReference<T>(type));
492518
}
@@ -527,10 +553,10 @@ public <T> T getDocument(String id, String rev, final Class<T> type) {
527553
}
528554

529555
/**
530-
* Get document along with its revision history, and the result is converted to a <code>DocumentRevs</code> object.
556+
* Get document along with its revision history, and the result is converted to a
557+
* <code>DocumentRevs</code> object.
531558
*
532559
* @see <code>DocumentRevs</code>
533-
*
534560
*/
535561
public DocumentRevs getDocRevisions(String id, String rev) {
536562
return getDocRevisions(id, rev,
@@ -579,7 +605,7 @@ public String getDocumentRev(String id) {
579605

580606
String rev = head.getConnection().getHeaderField("ETag");
581607
// Remove enclosing "" before returning
582-
return rev.substring(1, rev.length()-1);
608+
return rev.substring(1, rev.length() - 1);
583609
}
584610

585611
public Response delete(String id, String rev) {
@@ -608,7 +634,8 @@ private List<Response> bulkCreateDocs(String payload) {
608634
URI uri = this.uriHelper.bulkDocsUri();
609635
HttpConnection connection = Http.POST(uri, "application/json");
610636
connection.setRequestBody(payload);
611-
return executeToJsonObjectWithRetry(connection, new CouchClientTypeReference<List<Response>>());
637+
return executeToJsonObjectWithRetry(connection, new
638+
CouchClientTypeReference<List<Response>>());
612639
}
613640

614641
/**
@@ -639,8 +666,8 @@ public List<Response> bulkCreateSerializedDocs(List<String> serializedDocs) {
639666
private String generateBulkSerializedDocsPayload(List<String> serializedDocs) {
640667
String newEditsVal = "\"new_edits\": false, ";
641668
StringBuilder sb = new StringBuilder("[");
642-
for(String doc : serializedDocs) {
643-
if(sb.length() > 1) {
669+
for (String doc : serializedDocs) {
670+
if (sb.length() > 1) {
644671
sb.append(", ");
645672
}
646673
sb.append(doc);
@@ -671,7 +698,9 @@ private String generateBulkSerializedDocsPayload(List<String> serializedDocs) {
671698
* If the ID has no missing revision, it should not appear in the Map's key set. If all IDs
672699
* do not have missing revisions, the returned Map should be empty map, but never null.
673700
*
674-
* @see <a target="_blank" href="http://wiki.apache.org/couchdb/HttpPostRevsDiff">HttpPostRevsDiff documentation</a>
701+
* @see
702+
* <a target="_blank" href="http://wiki.apache.org/couchdb/HttpPostRevsDiff">HttpPostRevsDiff
703+
* documentation</a>
675704
*/
676705
public Map<String, MissingRevisions> revsDiff(Map<String, Set<String>> revisions) {
677706
Misc.checkNotNull(revisions, "Input revisions");
@@ -735,7 +764,8 @@ private static class CouchClientTypeReference<T> extends TypeReference<T> {
735764
CouchClientTypeReference() {
736765
this(null);
737766
}
738-
CouchClientTypeReference(Class<T> type){
767+
768+
CouchClientTypeReference(Class<T> type) {
739769
this.type = type;
740770
}
741771

@@ -753,6 +783,7 @@ public Type getType() {
753783
private static final class TypeInputStreamProcessor<T> implements InputStreamProcessor<T> {
754784

755785
private final TypeReference<T> typeReference;
786+
756787
TypeInputStreamProcessor(TypeReference<T> typeReference) {
757788
this.typeReference = typeReference;
758789
}
@@ -774,4 +805,4 @@ public Void processStream(InputStream stream) {
774805
return null;
775806
}
776807
}
777-
}
808+
}

0 commit comments

Comments
 (0)