Skip to content
This repository was archived by the owner on Mar 11, 2022. It is now read-only.

Commit c428252

Browse files
jjrodrigtomblench
authored andcommitted
Support for doc_ids filter in Pull filtered replication. (#582)
* Support for doc_ids filter in Pull filtered replication. Fixes #578
1 parent b509c42 commit c428252

File tree

11 files changed

+250
-10
lines changed

11 files changed

+250
-10
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Unreleased
22
- [IMPROVED] Forced a TLS1.2 `SSLSocketFactory` where possible on Android API versions < 20 (it is
33
already enabled by default on newer API levels).
4+
- [NEW] Added API for specifying a list of document IDs in the filtered pull replicator
45

56
# 2.2.0 (2018-02-14)
67
- [NEW] Added API for specifying a mango selector in the filtered pull replicator

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/internal/mazha/CouchClient.java

+26-7
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ private Map<String, Object> getParametrizedChangeFeedOptions(Object since, Integ
446446

447447
public ChangesResult changes(Object since, Integer limit) {
448448
Map<String, Object> options = getParametrizedChangeFeedOptions(since, limit);
449-
return this.changes(options);
449+
return this.changesRequestWithGet(options);
450450
}
451451

452452
public ChangesResult changes(PullFilter filter, Object since, Integer limit) {
@@ -461,22 +461,41 @@ public ChangesResult changes(PullFilter filter, Object since, Integer limit) {
461461
}
462462
}
463463
}
464-
return this.changes(options);
464+
return this.changesRequestWithGet(options);
465465
}
466466

467467
public ChangesResult changes(String selector, Object since, Integer limit) {
468-
Misc.checkNotNullOrEmpty(selector,null);
468+
Misc.checkNotNullOrEmpty(selector, null);
469+
469470
Map<String, Object> options = getParametrizedChangeFeedOptions(since, limit);
470471
options.put("filter", "_selector");
472+
473+
return changesRequestWithPost(selector, options);
474+
}
475+
476+
public ChangesResult changes(List<String> docIds, Object since, Integer limit) {
477+
Misc.checkState((docIds != null && !docIds.isEmpty()), null);
478+
479+
Map<String, Object> options = getParametrizedChangeFeedOptions(since, limit);
480+
options.put("filter", "_doc_ids");
481+
482+
Map<String, Object> docIdsMap = new HashMap<String, Object>();
483+
docIdsMap.put("doc_ids", docIds);
484+
String docsIdsDoc = JSONUtils.serializeAsString(docIdsMap);
485+
486+
return changesRequestWithPost(docsIdsDoc, options);
487+
}
488+
489+
private ChangesResult changesRequestWithGet(final Map<String, Object> options) {
471490
URI changesFeedUri = uriHelper.changesUri(options);
472-
HttpConnection connection = Http.POST(changesFeedUri, "application/json");
473-
connection.setRequestBody(selector);
491+
HttpConnection connection = Http.GET(changesFeedUri);
474492
return executeToJsonObjectWithRetry(connection, ChangesResult.class);
475493
}
476494

477-
public ChangesResult changes(final Map<String, Object> options) {
495+
private ChangesResult changesRequestWithPost( String body, final Map<String, Object> options) {
478496
URI changesFeedUri = uriHelper.changesUri(options);
479-
HttpConnection connection = Http.GET(changesFeedUri);
497+
HttpConnection connection = Http.POST(changesFeedUri, "application/json");
498+
connection.setRequestBody(body);
480499
return executeToJsonObjectWithRetry(connection, ChangesResult.class);
481500
}
482501

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/internal/replication/CouchClientWrapper.java

+9
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,15 @@ public ChangesResult changes(String selector, Object lastSequence, int limit) {
150150
}
151151
}
152152

153+
@Override
154+
public ChangesResult changes(List<String> docIds, Object lastSequence, int limit) {
155+
if (docIds == null || docIds.isEmpty()) {
156+
return couchClient.changes(lastSequence, limit);
157+
} else {
158+
return couchClient.changes(docIds, lastSequence, limit);
159+
}
160+
}
161+
153162
@Override
154163
public Iterable<DocumentRevsList> bulkGetRevisions(List<BulkGetRequest> requests,
155164
boolean pullAttachmentsInline) {

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/internal/replication/CouchDB.java

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ interface CouchDB {
5050
ChangesResult changes(Object lastSequence, int limit);
5151
ChangesResult changes(PullFilter filter, Object lastSequence, int limit);
5252
ChangesResult changes(String selector, Object lastSequence, int limit);
53+
ChangesResult changes(List<String> docIds, Object lastSequence, int limit);
5354
List<DocumentRevs> getRevisions(String documentId,
5455
Collection<String> revisionIds,
5556
Collection<String> attsSince,

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/internal/replication/PullStrategy.java

+18
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.net.URI;
4343
import java.nio.charset.Charset;
4444
import java.util.ArrayList;
45+
import java.util.Collections;
4546
import java.util.HashMap;
4647
import java.util.HashSet;
4748
import java.util.List;
@@ -81,6 +82,7 @@ private static class State {
8182

8283
PullFilter filter;
8384
String selector;
85+
List<String> docIds;
8486

8587
DatastoreWrapper targetDb;
8688

@@ -101,10 +103,15 @@ public PullStrategy(URI source,
101103
Database target,
102104
PullFilter filter,
103105
String selector,
106+
List<String> docIds,
104107
List<HttpConnectionRequestInterceptor> requestInterceptors,
105108
List<HttpConnectionResponseInterceptor> responseInterceptors) {
106109
this.filter = filter;
107110
this.selector = selector;
111+
this.docIds = docIds;
112+
if (docIds != null && !docIds.isEmpty()) {
113+
Collections.sort(docIds);
114+
}
108115
this.sourceDb = new CouchClientWrapper(new CouchClient(source, requestInterceptors,
109116
responseInterceptors));
110117
this.targetDb = new DatastoreWrapper((DatabaseImpl) target);
@@ -114,6 +121,10 @@ public PullStrategy(URI source,
114121
filter.getName());
115122
} else if (selector != null) {
116123
replicatorName = String.format("%s <-- %s (%s)", target.getPath(), source, selector);
124+
} else if (docIds != null && !docIds.isEmpty()) {
125+
String concatenatedIds = Misc.join(",",docIds);
126+
replicatorName = String.format("%s <-- %s (%s)", target.getPath(), source,
127+
concatenatedIds);
117128
} else {
118129
replicatorName = String.format("%s <-- %s ", target.getPath(), source);
119130
}
@@ -444,6 +455,8 @@ public String getReplicationId() throws DocumentStoreException {
444455
dict.put("filter", this.filter.toQueryString());
445456
} else if (selector != null) {
446457
dict.put("selector", this.selector);
458+
} else if (docIds != null && !docIds.isEmpty()) {
459+
dict.put("docIds", Misc.join(",",docIds));
447460
}
448461
// get raw SHA-1 of dictionary
449462
try {
@@ -467,6 +480,11 @@ private ChangesResultWrapper nextBatch() throws DocumentStoreException {
467480
this.selector,
468481
lastCheckpoint,
469482
this.changeLimitPerBatch);
483+
} else if (this.docIds != null && !this.docIds.isEmpty()) {
484+
changeFeeds = this.sourceDb.changes(
485+
this.docIds,
486+
lastCheckpoint,
487+
this.changeLimitPerBatch);
470488
} else {
471489
changeFeeds = this.sourceDb.changes(
472490
this.filter,

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/replication/PullFilter.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ public PullFilter(String filterName) {
7474
}
7575

7676
/**
77-
* Constructs a filter object for a function that requires no
78-
* parameters.
77+
* Constructs a filter object for a function that requires parameters.
7978
*
8079
*
8180
*
@@ -86,7 +85,7 @@ public PullFilter(String filterName) {
8685
* and the name of the filter function, separated by a slash. For example,
8786
* {@code filterDoc/filterFunctionName}
8887
*
89-
* @param parameters Any parameters required for the function. Can be {@code null}. The contents
88+
* @param parameters Any parameters required for the function. Must not be {@code null}. The contents
9089
* of {@code properties} are expanded to {@code key=value} pairs when
9190
* constructing the {@code _changes} feed call for the remote database.
9291
* Integer values should be added as String objects.

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/replication/ReplicatorBuilder.java

+23
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ public static class Pull extends ReplicatorBuilder<URI, DocumentStore, Pull> {
259259

260260
private String pullPullSelector = null;
261261

262+
private List<String> pullDocIds = null;
263+
262264
private int changeLimitPerBatch = 1000;
263265

264266
private int insertBatchSize = 100;
@@ -272,6 +274,10 @@ public Replicator build() {
272274
"Source and target cannot be null");
273275
Misc.checkState(this.pullPullFilter == null || this.pullPullSelector == null,
274276
"Filter and selector cannot be defined at the same time");
277+
if (this.pullPullFilter != null || this.pullPullSelector != null) {
278+
Misc.checkState(this.pullDocIds == null || this.pullDocIds.isEmpty(),
279+
"Doc Ids cannot be provided at the same time than selecto or filter");
280+
}
275281

276282
// add cookie interceptor and remove creds from URI if required
277283
super.source = super.addAuthInterceptorIfRequired(super.source);
@@ -280,6 +286,7 @@ public Replicator build() {
280286
super.target.database(),
281287
pullPullFilter,
282288
pullPullSelector,
289+
pullDocIds,
283290
super.requestInterceptors,
284291
super.responseInterceptors);
285292

@@ -321,6 +328,22 @@ public Pull selector(String pullPullSelector) {
321328
return this;
322329
}
323330

331+
/**
332+
* <p>Sets the list of doc IDs to use as filtering criteria when a pull replication calls the
333+
* source database's {@code _changes} feed.
334+
* </p>
335+
* Note: Doc IDs filtering are supported only when replicating against a CouchDB 2.x compliant database
336+
*
337+
* @see
338+
* <a target="_blank" href="https://console.bluemix.net/docs/services/Cloudant/api/database.html#get-changes">See doc_ids filtering</a>
339+
*
340+
* @param docIds - List of document Ids
341+
* @return This instance of {@link ReplicatorBuilder}
342+
*/
343+
public Pull docIds(List<String> docIds) {
344+
this.pullDocIds = docIds;
345+
return this;
346+
}
324347
/**
325348
* Sets the number of changes to fetch from the _changes feed per batch
326349
*

cloudant-sync-datastore-core/src/test/java/com/cloudant/sync/internal/replication/PullReplicatorTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.net.URI;
3131
import java.net.URISyntaxException;
32+
import java.util.Arrays;
3233
import java.util.HashMap;
3334
import java.util.Map;
3435

@@ -124,6 +125,19 @@ public void testPullReplicationCreatedSuccessfullyWithSelector() throws Exceptio
124125
Assert.assertNotNull(replicator);
125126
}
126127

128+
@Test
129+
public void testPullReplicationCreatedSuccessfullyWithDocIds() throws Exception {
130+
131+
Replicator replicator = ReplicatorBuilder.pull()
132+
.from(this.source)
133+
.to(this.documentStore)
134+
.docIds(Arrays.asList("id1","id2"))
135+
.build();
136+
137+
Assert.assertNotNull(replicator);
138+
}
139+
140+
127141
@Test(expected = IllegalStateException.class)
128142
public void testPullReplicationSelectorAndFilterIncompatible() throws Exception {
129143

@@ -136,6 +150,29 @@ public void testPullReplicationSelectorAndFilterIncompatible() throws Exception
136150

137151
}
138152

153+
@Test(expected = IllegalStateException.class)
154+
public void testPullReplicationSelectorAndDocIdIncompatible() throws Exception {
155+
156+
Replicator replicator = ReplicatorBuilder.pull()
157+
.from(this.source)
158+
.to(this.documentStore)
159+
.selector("{\"selector\":{\"class\":\"a_class\"}}")
160+
.docIds(Arrays.asList("id1"))
161+
.build();
162+
163+
}
164+
165+
@Test(expected = IllegalStateException.class)
166+
public void testPullReplicationFilterAndDocIdIncompatible() throws Exception {
167+
168+
Replicator replicator = ReplicatorBuilder.pull()
169+
.from(this.source)
170+
.to(this.documentStore)
171+
.filter(new PullFilter("a_filter"))
172+
.docIds(Arrays.asList("id1"))
173+
.build();
174+
}
175+
139176
@Test
140177
public void testPullReplicationCreatedSuccessfullyWithFilter() throws Exception {
141178

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright © 2018 IBM Corp. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
5+
* except in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the
10+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
11+
* either express or implied. See the License for the specific language governing permissions
12+
* and limitations under the License.
13+
*/
14+
15+
package com.cloudant.sync.internal.replication;
16+
17+
import com.cloudant.sync.internal.mazha.AnimalDb;
18+
import com.cloudant.sync.internal.mazha.ClientTestUtils;
19+
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
import java.util.Arrays;
24+
import java.util.List;
25+
26+
public class PullStrategyDocIdTest extends ReplicationTestBase {
27+
28+
// we use this utility method rather than ReplicationTestBase.pull() because some
29+
// methods want to make assertions on the BasicPullStrategy after running the replication
30+
private void pull(PullStrategy replicator, int expectedDocs) throws Exception {
31+
TestStrategyListener listener = new TestStrategyListener();
32+
replicator.getEventBus().register(listener);
33+
replicator.run();
34+
listener.assertReplicationCompletedOrThrow();
35+
Assert.assertEquals(expectedDocs, listener.documentsReplicated);
36+
}
37+
38+
@Test
39+
public void pull_filterDocIdsFromAnimalDb_twoDocShouldBePulled() throws Exception {
40+
org.junit.Assume.assumeTrue(ClientTestUtils.isCouchDBV2(remoteDb.couchClient.getRootUri()));
41+
List<String> docIds = Arrays.asList("snipe","kookaburra");
42+
PullStrategy replicator = super.getPullStrategy(docIds);
43+
44+
Assert.assertEquals(0, datastore.getDocumentCount());
45+
46+
AnimalDb.populate(remoteDb.couchClient);
47+
this.pull(replicator, 2);
48+
49+
Assert.assertEquals(2, datastore.getDocumentCount());
50+
51+
for (String bird : docIds) {
52+
Assert.assertTrue(datastore.contains(bird));
53+
}
54+
}
55+
56+
@Test
57+
public void
58+
pull_filterSelectorMammalFromAnimalDbUsingParameterizedFilter_eightDocShouldBePulled()
59+
throws Exception {
60+
org.junit.Assume.assumeTrue(ClientTestUtils.isCouchDBV2(remoteDb.couchClient.getRootUri()));
61+
List<String> docIds = Arrays.asList("aardvark", "badger", "elephant", "giraffe", "lemur", "llama",
62+
"panda", "zebra");
63+
PullStrategy replicator = super.getPullStrategy(docIds);
64+
65+
Assert.assertEquals(0, datastore.getDocumentCount());
66+
67+
AnimalDb.populate(remoteDb.couchClient);
68+
this.pull(replicator, 8);
69+
70+
Assert.assertEquals(8, datastore.getDocumentCount());
71+
72+
for (String mammal : docIds) {
73+
Assert.assertTrue(datastore.contains(mammal));
74+
}
75+
}
76+
77+
}

cloudant-sync-datastore-core/src/test/java/com/cloudant/sync/internal/replication/ReplicationTestBase.java

+20
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,22 @@ protected ReplicatorBuilder.Pull getPullBuilder(String selector) {
202202
return pull;
203203
}
204204

205+
protected ReplicatorBuilder.Pull getPullBuilder(List<String> docIds) {
206+
ReplicatorBuilder.Pull pull = ReplicatorBuilder.pull().
207+
from(this.couchConfig.getRootUri()).
208+
to(this.documentStore)
209+
.docIds(docIds)
210+
.addRequestInterceptors(couchConfig.getRequestInterceptors(false))
211+
.addResponseInterceptors(couchConfig.getResponseInterceptors(false));
212+
if (couchConfig.getUsername() != null && couchConfig.getPassword() != null) {
213+
214+
pull.username(couchConfig.getUsername())
215+
.password(couchConfig.getPassword());
216+
}
217+
218+
return pull;
219+
}
220+
205221
protected PushStrategy getPushStrategy() {
206222
return (PushStrategy)((ReplicatorImpl)this.getPushBuilder().build()).strategy;
207223
}
@@ -218,6 +234,10 @@ protected PullStrategy getPullStrategy(String selector) {
218234
return (PullStrategy)((ReplicatorImpl)this.getPullBuilder(selector).build()).strategy;
219235
}
220236

237+
protected PullStrategy getPullStrategy(List<String> docIds) {
238+
return (PullStrategy)((ReplicatorImpl)this.getPullBuilder(docIds).build()).strategy;
239+
}
240+
221241
protected PushResult push() throws Exception {
222242
TestStrategyListener listener = new TestStrategyListener();
223243
PushStrategy replicator = this.getPushStrategy();

0 commit comments

Comments
 (0)