diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java index 69084fad..72dc9a8c 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java @@ -28,7 +28,6 @@ * to contain above placeholder. */ public class IncrementAnIndexPaginationIterator extends BaseHttpPaginationIterator { - private static final Logger LOG = LoggerFactory.getLogger(IncrementAnIndexPaginationIterator.class); public static final String PAGINATION_INDEX_PLACEHOLDER_REGEX = "\\{pagination.index\\}"; private final Long indexIncrement; @@ -46,22 +45,30 @@ public IncrementAnIndexPaginationIterator(BaseHttpSourceConfig config, Paginatio this.index = config.getStartIndex() - this.indexIncrement; } - this.nextPageUrl = getNextPageUrl(); + this.nextPageUrl = getNextPageUrl(null); } - private String getNextPageUrl() { + private String getNextPageUrl(BasePage page) { index += indexIncrement; - if (maxIndex != null && index > maxIndex) { - return null; + if (maxIndex != null) { + // If the index is greater than max index, we stop the pagination + if (index > maxIndex) { + return null; + } } else { - return config.getUrl().replaceAll(PAGINATION_INDEX_PLACEHOLDER_REGEX, index.toString()); + // If the page received is empty, we stop the pagination + if (page != null && page.next().getRecord() == null) { + return null; + } } + + return config.getUrl().replaceAll(PAGINATION_INDEX_PLACEHOLDER_REGEX, index.toString()); } @Override protected String getNextPageUrl(HttpResponse response, BasePage page) { - return getNextPageUrl(); + return getNextPageUrl(page); } @Override diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java index b89d5afe..4e505614 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java @@ -31,9 +31,6 @@ import java.util.List; import java.util.Map; -/** - * Returns elements from json one by one by given json path. - */ class JsonPage extends BasePage { private final String insideElementJsonPathPart; private final Iterator iterator; @@ -43,9 +40,11 @@ class JsonPage extends BasePage { private final BaseHttpSourceConfig config; private final List optionalFields; + // TODO : handle case where the result json object is null JsonPage(BaseHttpSourceConfig config, HttpResponse httpResponse) { super(httpResponse); this.config = config; + String body = httpResponse.getBody(); this.json = JSONUtil.toJsonElement(httpResponse.getBody()); this.schema = config.getSchema(); this.optionalFields = getOptionalFields(); @@ -53,7 +52,7 @@ class JsonPage extends BasePage { JsonElement jsonElement = json; if (json.isJsonObject()) { JSONUtil.JsonQueryResponse queryResponse = - JSONUtil.getJsonElementByPath(json.getAsJsonObject(), config.getResultPath(), optionalFields); + JSONUtil.getJsonElementByPath(json.getAsJsonObject(), config.getResultPath(), optionalFields); this.insideElementJsonPathPart = queryResponse.getUnretrievedPath(); jsonElement = queryResponse.get(); } else { @@ -62,11 +61,11 @@ class JsonPage extends BasePage { if (jsonElement.isJsonArray()) { this.iterator = jsonElement.getAsJsonArray().iterator(); - } else if (jsonElement.isJsonObject()) { + } else if (jsonElement.isJsonObject() || jsonElement.isJsonNull()) { this.iterator = Collections.singleton(jsonElement).iterator(); } else { throw new IllegalArgumentException(String.format("Element found by '%s' json path is expected to be an object " + - "or an array. Primitive found", config.getResultPath())); + "or an array. Primitive found", config.getResultPath())); } this.fieldsMapping = config.getFullFieldsMapping(); @@ -79,35 +78,35 @@ public boolean hasNext() { /** * Converts a next element from json into a json object which is defined by fieldsMapping. - * + *

* Example next element: - * { - * "id":"19124", - * "key":"NETTY-13", - * "fields":{ - * "issuetype":{ - * "self":"https://issues.cask.co/rest/api/2/issuetype/4", - * "name":"Improvement", - * "subtask":false - * }, - * "fixVersions":[ - * - * ], - * "description":"Test description for NETTY-13", - * "project":{ - * "id":"10301", - * "key":"NETTY", - * "name":"Netty-HTTP", - * "projectCategory":{ - * "id":"10002", - * "name":"Infrastructure" - * } - * } - * } - * } - * + * { + * "id":"19124", + * "key":"NETTY-13", + * "fields":{ + * "issuetype":{ + * "self":"https://issues.cask.co/rest/api/2/issuetype/4", + * "name":"Improvement", + * "subtask":false + * }, + * "fixVersions":[ + *

+ * ], + * "description":"Test description for NETTY-13", + * "project":{ + * "id":"10301", + * "key":"NETTY", + * "name":"Netty-HTTP", + * "projectCategory":{ + * "id":"10002", + * "name":"Infrastructure" + * } + * } + * } + * } + *

* The mapping is: - * + *

* | Field Name | Field Path | * | --------------- |:-----------------------------------------:| * | type | /fields/issuetype/name | @@ -115,20 +114,20 @@ public boolean hasNext() { * | projectCategory | /fields/project/projectCategory/name | * | isSubtask | /fields/issuetype/subtask | * | fixVersions | /fields/fixVersions | - * + *

* The result returned by function is: - * + *

* { - * "key":"NETTY-13", - * "type":"Improvement", - * "isSubtask":false, - * "description":"Test description for NETTY-13", - * "projectCategory":"Infrastructure", - * "fixVersions":[ - * - * ] + * "key":"NETTY-13", + * "type":"Improvement", + * "isSubtask":false, + * "description":"Test description for NETTY-13", + * "projectCategory":"Infrastructure", + * "fixVersions":[ + *

+ * ] * } - * + *

* Note: * This also supports "insideElementJsonPath". Example would be the following: if path is * '/bookstore/items/bookPublisherDetails'. The array which elements are retrieved from is /bookstore/items @@ -139,36 +138,47 @@ public boolean hasNext() { */ @Override public PageEntry next() { - JsonObject currentJsonObject = iterator.next().getAsJsonObject(); - - JsonObject resultJson = new JsonObject(); - int numPartiallyRetrieved = 0; - for (Map.Entry entry : fieldsMapping.entrySet()) { - String schemaFieldName = entry.getKey(); - String fieldPath = insideElementJsonPathPart + "/" + StringUtils.stripStart(entry.getValue(), "/"); - - JSONUtil.JsonQueryResponse queryResponse = - JSONUtil.getJsonElementByPath(currentJsonObject, fieldPath, optionalFields); - - if (!queryResponse.isFullyRetrieved()) { - numPartiallyRetrieved++; - } - - resultJson.add(schemaFieldName, queryResponse.get()); - } - - String jsonString = resultJson.toString(); - try { - StructuredRecord record = StructuredRecordStringConverter.fromJsonString(jsonString, schema); - if (numPartiallyRetrieved > 0) { - InvalidEntry error = - new InvalidEntry<>(1, "Couldn't find all required fields in the record", record); - return new PageEntry(error, config.getErrorHandling()); + if (iterator.hasNext()) { + JsonElement jsonElement = iterator.next(); + if (jsonElement.isJsonNull()) { + return new PageEntry(null); + } else { + JsonObject currentJsonObject = jsonElement.getAsJsonObject(); + + JsonObject resultJson = new JsonObject(); + int numPartiallyRetrieved = 0; + for (Map.Entry entry : fieldsMapping.entrySet()) { + String schemaFieldName = entry.getKey(); + String fieldPath = insideElementJsonPathPart + "/" + StringUtils.stripStart(entry.getValue(), "/"); + + JSONUtil.JsonQueryResponse queryResponse = + JSONUtil.getJsonElementByPath(currentJsonObject, fieldPath, optionalFields); + + if (!queryResponse.isFullyRetrieved()) { + numPartiallyRetrieved++; + } + + if (!queryResponse.getRetrievedPath().equals("/")) { + resultJson.add(schemaFieldName, queryResponse.get()); + } + + } + + String jsonString = resultJson.toString(); + try { + StructuredRecord record = StructuredRecordStringConverter.fromJsonString(jsonString, schema); + if (numPartiallyRetrieved > 0) { + InvalidEntry error = + new InvalidEntry<>(1, "Couldn't find all required fields in the record", record); + return new PageEntry(error, config.getErrorHandling()); + } + return new PageEntry(record); + } catch (Throwable e) { + return new PageEntry(InvalidEntryCreator.buildStringError(jsonString, e), config.getErrorHandling()); + } } - return new PageEntry(record); - } catch (Throwable e) { - return new PageEntry(InvalidEntryCreator.buildStringError(jsonString, e), config.getErrorHandling()); } + return new PageEntry(null); } private List getOptionalFields() { @@ -196,7 +206,7 @@ private List getOptionalFields() { public String getPrimitiveByPath(String path) { if (json.isJsonObject()) { JSONUtil.JsonQueryResponse queryResponse = JSONUtil.getJsonElementByPath(json.getAsJsonObject(), - path, optionalFields); + path, optionalFields); if (queryResponse.isFullyRetrieved()) { return queryResponse.getAsJsonPrimitive().getAsString(); }