diff --git a/CHANGELOG.md b/CHANGELOG.md index 702be3d6f45b0..1b145ff39f864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix NullPointerException when restoring remote snapshot with missing shard size information ([#19684](https://github.com/opensearch-project/OpenSearch/pull/19684)) - Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650)) - Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010)) +- Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937)) ### Dependencies - Bump Apache Lucene from 10.3.1 to 10.3.2 ([#20026](https://github.com/opensearch-project/OpenSearch/pull/20026)) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index 3b59a28590901..c2aa802762a54 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -232,7 +232,6 @@ public Map> getSecureAuxTransports( if (client == null || queryRegistry == null) { throw new RuntimeException("createComponents must be called first to initialize server provided resources."); } - return Collections.singletonMap(GRPC_SECURE_TRANSPORT_SETTING_KEY, () -> { List grpcServices = new ArrayList<>( List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils)) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java index 07c16fcfd21d8..6a754483c9793 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java @@ -38,7 +38,7 @@ private FetchSourceContextProtoUtils() { * @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided */ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.BulkRequest request) { - Boolean fetchSource = true; + Boolean fetchSource = null; String[] sourceExcludes = null; String[] sourceIncludes = null; diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java index b9997013f7700..57e021a8498e3 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java @@ -8,6 +8,8 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; +import com.google.protobuf.ByteString; +import org.apache.lucene.util.BytesRef; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkRequestParser; import org.opensearch.action.delete.DeleteRequest; @@ -15,6 +17,7 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -83,17 +86,38 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) { } /** - * Detects the media type from the byte content, with fallback to JSON if detection fails. + * Converts a protobuf ByteString to OpenSearch BytesReference. + * + * This method uses ByteString.toByteArray() which delegates to protobuf's internal + * implementation. Protobuf optimizes this based on the ByteString's internal representation: + * - For ByteStrings created with UnsafeByteOperations.unsafeWrap(), it returns the wrapped array directly (zero-copy) + * - For other ByteStrings, it creates a copy + * + * @param byteString The protobuf ByteString to convert + * @return A BytesReference wrapping the ByteString data + */ + private static BytesReference byteStringToBytesReference(ByteString byteString) { + if (byteString == null || byteString.isEmpty()) { + return BytesArray.EMPTY; + } + // Let protobuf handle the conversion efficiently + // For ByteStrings created with UnsafeByteOperations.unsafeWrap(), this returns the backing array + return new BytesArray(byteString.toByteArray()); + } + + /** + * Detects the media type from BytesReference content, with fallback to JSON if detection fails. * This enables support for JSON, SMILE, and CBOR formats in gRPC bulk requests. * - * @param document The document content as bytes + * @param document The document content as BytesReference * @return The detected MediaType, or JSON if detection fails or document is empty */ - static MediaType detectMediaType(byte[] document) { - if (document == null || document.length == 0) { + static MediaType detectMediaType(BytesReference document) { + if (document == null || document.length() == 0) { return MediaTypeRegistry.JSON; } - MediaType detectedType = MediaTypeRegistry.mediaTypeFromBytes(document, 0, document.length); + BytesRef bytesRef = document.toBytesRef(); + MediaType detectedType = MediaTypeRegistry.mediaTypeFromBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); return detectedType != null ? detectedType : MediaTypeRegistry.JSON; } @@ -143,7 +167,7 @@ public static DocWriteRequest[] getDocWriteRequests( case CREATE: docWriteRequest = buildCreateRequest( operationContainer.getCreate(), - bulkRequestBodyEntry.getObject().toByteArray(), + bulkRequestBodyEntry.getObject(), index, id, routing, @@ -158,7 +182,7 @@ public static DocWriteRequest[] getDocWriteRequests( case INDEX: docWriteRequest = buildIndexRequest( operationContainer.getIndex(), - bulkRequestBodyEntry.getObject().toByteArray(), + bulkRequestBodyEntry.getObject(), opType, index, id, @@ -172,9 +196,19 @@ public static DocWriteRequest[] getDocWriteRequests( ); break; case UPDATE: + // Extract the doc field from UpdateAction + // Use ByteString directly to avoid unnecessary byte array allocation + ByteString updateDocBytes = ByteString.EMPTY; + if (bulkRequestBodyEntry.hasUpdateAction() && bulkRequestBodyEntry.getUpdateAction().hasDoc()) { + updateDocBytes = bulkRequestBodyEntry.getUpdateAction().getDoc(); + } else if (bulkRequestBodyEntry.hasObject()) { + // Fallback to object field for backwards compatibility + // TODO: Remove this fallback once all clients use UpdateAction.doc + updateDocBytes = bulkRequestBodyEntry.getObject(); + } docWriteRequest = buildUpdateRequest( operationContainer.getUpdate(), - bulkRequestBodyEntry.getObject().toByteArray(), + updateDocBytes, bulkRequestBodyEntry, index, id, @@ -215,7 +249,7 @@ public static DocWriteRequest[] getDocWriteRequests( * Builds an IndexRequest with create flag set to true from a CreateOperation protobuf message. * * @param createOperation The create operation protobuf message - * @param document The document content as bytes + * @param documentBytes The document content as ByteString (zero-copy reference) * @param index The default index name * @param id The default document ID * @param routing The default routing value @@ -229,7 +263,7 @@ public static DocWriteRequest[] getDocWriteRequests( */ public static IndexRequest buildCreateRequest( WriteOperation createOperation, - byte[] document, + ByteString documentBytes, String index, String id, String routing, @@ -240,13 +274,17 @@ public static IndexRequest buildCreateRequest( long ifPrimaryTerm, boolean requireAlias ) { - index = createOperation.hasXIndex() ? createOperation.getXIndex() : index; + if (createOperation.hasXIndex()) { + index = createOperation.getXIndex(); + } + id = createOperation.hasXId() ? createOperation.getXId() : id; routing = createOperation.hasRouting() ? createOperation.getRouting() : routing; pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline; requireAlias = createOperation.hasRequireAlias() ? createOperation.getRequireAlias() : requireAlias; - MediaType mediaType = detectMediaType(document); + BytesReference documentRef = byteStringToBytesReference(documentBytes); + MediaType mediaType = detectMediaType(documentRef); IndexRequest indexRequest = new IndexRequest(index).id(id) .routing(routing) .version(version) @@ -255,7 +293,7 @@ public static IndexRequest buildCreateRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, mediaType) + .source(documentRef, mediaType) .setRequireAlias(requireAlias); return indexRequest; } @@ -264,7 +302,7 @@ public static IndexRequest buildCreateRequest( * Builds an IndexRequest from an IndexOperation protobuf message. * * @param indexOperation The index operation protobuf message - * @param document The document content as bytes + * @param documentBytes The document content as ByteString (zero-copy reference) * @param opType The default operation type * @param index The default index name * @param id The default document ID @@ -279,7 +317,7 @@ public static IndexRequest buildCreateRequest( */ public static IndexRequest buildIndexRequest( IndexOperation indexOperation, - byte[] document, + ByteString documentBytes, OpType opType, String index, String id, @@ -292,7 +330,11 @@ public static IndexRequest buildIndexRequest( boolean requireAlias ) { opType = indexOperation.hasOpType() ? indexOperation.getOpType() : opType; - index = indexOperation.hasXIndex() ? indexOperation.getXIndex() : index; + + if (indexOperation.hasXIndex()) { + index = indexOperation.getXIndex(); + } + id = indexOperation.hasXId() ? indexOperation.getXId() : id; routing = indexOperation.hasRouting() ? indexOperation.getRouting() : routing; version = indexOperation.hasVersion() ? indexOperation.getVersion() : version; @@ -304,7 +346,8 @@ public static IndexRequest buildIndexRequest( ifPrimaryTerm = indexOperation.hasIfPrimaryTerm() ? indexOperation.getIfPrimaryTerm() : ifPrimaryTerm; requireAlias = indexOperation.hasRequireAlias() ? indexOperation.getRequireAlias() : requireAlias; - MediaType mediaType = detectMediaType(document); + BytesReference documentRef = byteStringToBytesReference(documentBytes); + MediaType mediaType = detectMediaType(documentRef); IndexRequest indexRequest; if (opType == null) { indexRequest = new IndexRequest(index).id(id) @@ -314,7 +357,7 @@ public static IndexRequest buildIndexRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, mediaType) + .source(documentRef, mediaType) .setRequireAlias(requireAlias); } else { indexRequest = new IndexRequest(index).id(id) @@ -325,7 +368,7 @@ public static IndexRequest buildIndexRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, mediaType) + .source(documentRef, mediaType) .setRequireAlias(requireAlias); } return indexRequest; @@ -335,7 +378,7 @@ public static IndexRequest buildIndexRequest( * Builds an UpdateRequest from an UpdateOperation protobuf message. * * @param updateOperation The update operation protobuf message - * @param document The document content as bytes + * @param documentBytes The document content as ByteString (zero-copy reference) * @param bulkRequestBody The bulk request body containing additional update options * @param index The default index name * @param id The default document ID @@ -350,7 +393,7 @@ public static IndexRequest buildIndexRequest( */ public static UpdateRequest buildUpdateRequest( UpdateOperation updateOperation, - byte[] document, + ByteString documentBytes, BulkRequestBody bulkRequestBody, String index, String id, @@ -362,17 +405,18 @@ public static UpdateRequest buildUpdateRequest( long ifPrimaryTerm, boolean requireAlias ) { - index = updateOperation.hasXIndex() ? updateOperation.getXIndex() : index; + if (updateOperation.hasXIndex()) { + index = updateOperation.getXIndex(); + } + id = updateOperation.hasXId() ? updateOperation.getXId() : id; routing = updateOperation.hasRouting() ? updateOperation.getRouting() : routing; - fetchSourceContext = bulkRequestBody.hasUpdateAction() && bulkRequestBody.getUpdateAction().hasXSource() - ? FetchSourceContextProtoUtils.fromProto(bulkRequestBody.getUpdateAction().getXSource()) - : fetchSourceContext; retryOnConflict = updateOperation.hasRetryOnConflict() ? updateOperation.getRetryOnConflict() : retryOnConflict; ifSeqNo = updateOperation.hasIfSeqNo() ? updateOperation.getIfSeqNo() : ifSeqNo; ifPrimaryTerm = updateOperation.hasIfPrimaryTerm() ? updateOperation.getIfPrimaryTerm() : ifPrimaryTerm; requireAlias = updateOperation.hasRequireAlias() ? updateOperation.getRequireAlias() : requireAlias; + // Create UpdateRequest with operation-level fields UpdateRequest updateRequest = new UpdateRequest().index(index) .id(id) .routing(routing) @@ -382,77 +426,108 @@ public static UpdateRequest buildUpdateRequest( .setRequireAlias(requireAlias) .routing(routing); - updateRequest = fromProto(updateRequest, document, bulkRequestBody, updateOperation); + // Populate all document-level fields + updateRequest = fromProto(updateRequest, documentBytes, bulkRequestBody, ifSeqNo, ifPrimaryTerm); + // Apply fetchSourceContext default if (fetchSourceContext != null) { updateRequest.fetchSource(fetchSourceContext); } - // TODO: how is upsertRequest used? - // IndexRequest upsertRequest = updateRequest.upsertRequest(); - // if (upsertRequest != null) { - // upsertRequest.setPipeline(pipeline); - // } + + // Set pipeline on upsert request if it exists + IndexRequest upsertRequest = updateRequest.upsertRequest(); + if (upsertRequest != null) { + upsertRequest.setPipeline(pipeline); + } return updateRequest; } /** * Populates an UpdateRequest with values from protobuf messages. - * Similar to {@link UpdateRequest#fromXContent(XContentParser)} + * Equivalent to {@link UpdateRequest#fromXContent(XContentParser)} for REST API. * - * @param updateRequest The update request to populate - * @param document The document content as bytes + * @param updateRequest The update request to populate (may already have if_seq_no/if_primary_term set) + * @param documentBytes The document content as ByteString (zero-copy reference) * @param bulkRequestBody The bulk request body containing update options - * @param updateOperation The update operation protobuf message + * @param ifSeqNoFromOperation The sequence number + * @param ifPrimaryTermFromOperation The primary term * @return The populated UpdateRequest */ - public static UpdateRequest fromProto( + static UpdateRequest fromProto( UpdateRequest updateRequest, - byte[] document, + ByteString documentBytes, BulkRequestBody bulkRequestBody, - UpdateOperation updateOperation + long ifSeqNoFromOperation, + long ifPrimaryTermFromOperation ) { + // Start with operation metadata values + long ifSeqNo = ifSeqNoFromOperation; + long ifPrimaryTerm = ifPrimaryTermFromOperation; if (bulkRequestBody.hasUpdateAction()) { UpdateAction updateAction = bulkRequestBody.getUpdateAction(); + // 1. script if (updateAction.hasScript()) { Script script = ScriptProtoUtils.parseFromProtoRequest(updateAction.getScript()); updateRequest.script(script); } + // 2. scripted_upsert if (updateAction.hasScriptedUpsert()) { updateRequest.scriptedUpsert(updateAction.getScriptedUpsert()); } + // 3. upsert if (updateAction.hasUpsert()) { - byte[] upsertBytes = updateAction.getUpsert().toByteArray(); - MediaType upsertMediaType = detectMediaType(upsertBytes); - updateRequest.upsert(upsertBytes, upsertMediaType); + ByteString upsertBytes = updateAction.getUpsert(); + BytesReference upsertRef = byteStringToBytesReference(upsertBytes); + MediaType upsertMediaType = detectMediaType(upsertRef); + BytesRef bytesRef = upsertRef.toBytesRef(); + updateRequest.upsert(bytesRef.bytes, bytesRef.offset, bytesRef.length, upsertMediaType); } + } + + // 4. doc + // Only set doc if ByteString is non-empty (empty ByteString = field not provided in proto) + // This check is structural, not business validation: + // - ByteString.EMPTY = no doc field in proto → don't call doc() → keeps doc=null + // - Non-empty ByteString = doc field in proto → call doc() → sets doc!=null + // UpdateRequest.validate() then handles business rules: + // - If script!=null && doc!=null → validation error + // - If script==null && doc==null → validation error + if (documentBytes != null && !documentBytes.isEmpty()) { + BytesReference docRef = byteStringToBytesReference(documentBytes); + MediaType mediaType = detectMediaType(docRef); + BytesRef bytesRef = docRef.toBytesRef(); + updateRequest.doc(bytesRef.bytes, bytesRef.offset, bytesRef.length, mediaType); + } + + if (bulkRequestBody.hasUpdateAction()) { + UpdateAction updateAction = bulkRequestBody.getUpdateAction(); + // 5. doc_as_upsert if (updateAction.hasDocAsUpsert()) { updateRequest.docAsUpsert(updateAction.getDocAsUpsert()); } + // 6. detect_noop if (updateAction.hasDetectNoop()) { updateRequest.detectNoop(updateAction.getDetectNoop()); } + // 7. _source if (updateAction.hasXSource()) { updateRequest.fetchSource(FetchSourceContextProtoUtils.fromProto(updateAction.getXSource())); } - } - - MediaType mediaType = detectMediaType(document); - updateRequest.doc(document, mediaType); - if (updateOperation.hasIfSeqNo()) { - updateRequest.setIfSeqNo(updateOperation.getIfSeqNo()); + // 8 + 9. if_seq_no and if_primary_term are excluded from UpdateAction protobufs intentionally, as users can just provide them + // in UpdateOperation. } - if (updateOperation.hasIfPrimaryTerm()) { - updateRequest.setIfPrimaryTerm(updateOperation.getIfPrimaryTerm()); - } + // Set if_seq_no and if_primary_term + updateRequest.setIfSeqNo(ifSeqNo); + updateRequest.setIfPrimaryTerm(ifPrimaryTerm); return updateRequest; } @@ -480,7 +555,10 @@ public static DeleteRequest buildDeleteRequest( long ifSeqNo, long ifPrimaryTerm ) { - index = deleteOperation.hasXIndex() ? deleteOperation.getXIndex() : index; + if (deleteOperation.hasXIndex()) { + index = deleteOperation.getXIndex(); + } + id = deleteOperation.hasXId() ? deleteOperation.getXId() : id; routing = deleteOperation.hasRouting() ? deleteOperation.getRouting() : routing; version = deleteOperation.hasVersion() ? deleteOperation.getVersion() : version; diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java index 7f336e56e87a9..6d4ee623b066b 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java @@ -36,6 +36,12 @@ private BulkRequestProtoUtils() { * Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)} * Please ensure to keep both implementations consistent. * + * Note: Unlike REST API, gRPC does not enforce the allowExplicitIndex security setting. + * In REST, this setting provides network-level security by allowing proxies to filter + * requests based on URL paths. In gRPC, both default_index and x_index are in the + * request body, making this check ineffective for network-level security. + * For gRPC security, use mTLS, gRPC interceptors, or service mesh policies instead. + * * @param request the request to execute * @return a future of the bulk action that was executed */ diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java index 0b8d170ea51e2..795fe2ac2a9de 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java @@ -109,12 +109,9 @@ public void testParseFromProtoRequestWithNoSourceParams() { FetchSourceContext context = FetchSourceContextProtoUtils.parseFromProtoRequest(request); // Verify the result - // The implementation returns a default FetchSourceContext with fetchSource=true - // and empty includes/excludes arrays when no source parameters are provided - assertNotNull("Context should not be null", context); - assertTrue("fetchSource should be true", context.fetchSource()); - assertArrayEquals("includes should be empty", Strings.EMPTY_ARRAY, context.includes()); - assertArrayEquals("excludes should be empty", Strings.EMPTY_ARRAY, context.excludes()); + // When no source parameters are provided, should return null to match REST API behavior + // This prevents the "get" field from being returned in update/upsert responses + assertNull("Context should be null when no source parameters provided", context); } public void testFromProtoWithFetch() { diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java index 11dfe757d9d52..bc3cc7217591f 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java @@ -9,11 +9,14 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.MediaType; import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; @@ -45,7 +48,7 @@ public void testBuildCreateRequest() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - document, + UnsafeByteOperations.unsafeWrap(document), "default-index", "default-id", "default-routing", @@ -87,7 +90,7 @@ public void testBuildIndexRequest() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), null, "default-index", "default-id", @@ -122,7 +125,7 @@ public void testBuildIndexRequestWithOpType() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), opType, "default-index", "default-id", @@ -188,13 +191,13 @@ public void testBuildUpdateRequest() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setDocAsUpsert(true).setDetectNoop(true).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -313,7 +316,7 @@ public void testBuildCreateRequestWithDefaults() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - document, + UnsafeByteOperations.unsafeWrap(document), "default-index", "default-id", "default-routing", @@ -340,7 +343,7 @@ public void testBuildCreateRequestWithPipeline() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - document, + UnsafeByteOperations.unsafeWrap(document), "default-index", "default-id", "default-routing", @@ -372,7 +375,7 @@ public void testBuildIndexRequestWithAllFields() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), OpType.OP_TYPE_INDEX, "default-index", "default-id", @@ -405,7 +408,7 @@ public void testBuildIndexRequestWithNullOpType() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), null, "default-index", "default-id", @@ -431,7 +434,7 @@ public void testBuildUpdateRequestWithScript() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction( org.opensearch.protobufs.UpdateAction.newBuilder() .setScript( @@ -454,7 +457,7 @@ public void testBuildUpdateRequestWithScript() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -480,13 +483,13 @@ public void testBuildUpdateRequestWithUpsert() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setUpsert(ByteString.copyFrom(upsertDoc)).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -510,13 +513,13 @@ public void testBuildUpdateRequestWithScriptedUpsert() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setScriptedUpsert(true).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -540,7 +543,7 @@ public void testBuildUpdateRequestWithFetchSource() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction( org.opensearch.protobufs.UpdateAction.newBuilder() .setXSource(org.opensearch.protobufs.SourceConfig.newBuilder().setFetch(true).build()) @@ -550,7 +553,7 @@ public void testBuildUpdateRequestWithFetchSource() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -574,12 +577,12 @@ public void testBuildUpdateRequestWithoutUpdateAction() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -705,7 +708,13 @@ public void testFromProtoWithAllUpdateActionFields() { UpdateOperation updateOperation = UpdateOperation.newBuilder().setIfSeqNo(123L).setIfPrimaryTerm(456L).build(); - UpdateRequest result = BulkRequestParserProtoUtils.fromProto(updateRequest, document, bulkRequestBody, updateOperation); + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + UnsafeByteOperations.unsafeWrap(document), + bulkRequestBody, + 123L, + 456L + ); assertNotNull("Result should not be null", result); assertNotNull("Script should be set", result.script()); @@ -726,7 +735,7 @@ public void testBuildCreateRequestWithSmileContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - smileDocument, + UnsafeByteOperations.unsafeWrap(smileDocument), "default-index", "default-id", null, @@ -754,7 +763,7 @@ public void testBuildCreateRequestWithCborContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - cborDocument, + UnsafeByteOperations.unsafeWrap(cborDocument), "default-index", "default-id", null, @@ -782,7 +791,7 @@ public void testBuildIndexRequestWithSmileContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - smileDocument, + UnsafeByteOperations.unsafeWrap(smileDocument), null, "default-index", "default-id", @@ -810,7 +819,7 @@ public void testBuildIndexRequestWithCborContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - cborDocument, + UnsafeByteOperations.unsafeWrap(cborDocument), null, "default-index", "default-id", @@ -843,7 +852,13 @@ public void testUpdateRequestWithCborUpsert() throws Exception { UpdateOperation updateOperation = UpdateOperation.newBuilder().build(); - UpdateRequest result = BulkRequestParserProtoUtils.fromProto(updateRequest, document, bulkRequestBody, updateOperation); + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + UnsafeByteOperations.unsafeWrap(document), + bulkRequestBody, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ); assertNotNull("Result should not be null", result); assertNotNull("Upsert should be set", result.upsertRequest()); @@ -856,7 +871,7 @@ public void testBuildCreateRequestWithEmptyDocument() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - emptyDocument, + UnsafeByteOperations.unsafeWrap(emptyDocument), "default-index", "default-id", null, @@ -882,7 +897,7 @@ public void testBuildCreateRequestWithJsonContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - jsonDocument, + UnsafeByteOperations.unsafeWrap(jsonDocument), "default-index", "default-id", null, @@ -910,7 +925,7 @@ public void testBuildCreateRequestWithYamlContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - yamlDocument, + UnsafeByteOperations.unsafeWrap(yamlDocument), "default-index", "default-id", null, @@ -938,7 +953,7 @@ public void testBuildIndexRequestWithJsonContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - jsonDocument, + UnsafeByteOperations.unsafeWrap(jsonDocument), null, "default-index", "default-id", @@ -966,7 +981,7 @@ public void testBuildIndexRequestWithYamlContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - yamlDocument, + UnsafeByteOperations.unsafeWrap(yamlDocument), null, "default-index", "default-id", @@ -1023,10 +1038,10 @@ private byte[] createYamlDocument() throws Exception { * Test detectMediaType with null or empty document */ public void testDetectMediaTypeNullOrEmpty() { - MediaType result = BulkRequestParserProtoUtils.detectMediaType(null); + MediaType result = BulkRequestParserProtoUtils.detectMediaType((BytesReference) null); assertEquals("application/json", result.mediaTypeWithoutParameters()); - result = BulkRequestParserProtoUtils.detectMediaType(new byte[0]); + result = BulkRequestParserProtoUtils.detectMediaType(new BytesArray(new byte[0])); assertEquals("application/json", result.mediaTypeWithoutParameters()); } @@ -1035,7 +1050,486 @@ public void testDetectMediaTypeNullOrEmpty() { */ public void testDetectMediaTypeUnrecognizable() { byte[] invalidBytes = new byte[] { (byte) 0xFF, (byte) 0xFE, (byte) 0xFD, (byte) 0xFC }; - MediaType result = BulkRequestParserProtoUtils.detectMediaType(invalidBytes); + MediaType result = BulkRequestParserProtoUtils.detectMediaType(new BytesArray(invalidBytes)); assertEquals("application/json", result.mediaTypeWithoutParameters()); } + + /** + * Test buildUpdateRequest with upsert request and pipeline + */ + public void testBuildUpdateRequestWithUpsertAndPipeline() { + UpdateOperation updateOperation = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + byte[] upsertDoc = "{\"upsert_field\":\"upsert_value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) + .setObject(UnsafeByteOperations.unsafeWrap(document)) + .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setUpsert(ByteString.copyFrom(upsertDoc)).build()) + .build(); + + UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( + updateOperation, + UnsafeByteOperations.unsafeWrap(document), + bulkRequestBody, + "default-index", + "default-id", + null, + null, + 0, + "test-pipeline", // pipeline + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("UpdateRequest should not be null", updateRequest); + assertNotNull("Upsert request should be set", updateRequest.upsertRequest()); + assertEquals("Pipeline should be set on upsert request", "test-pipeline", updateRequest.upsertRequest().getPipeline()); + } + + /** + * Test fromProto with empty document bytes (ByteString.EMPTY) + */ + public void testFromProtoWithEmptyDocumentBytes() { + UpdateRequest updateRequest = new UpdateRequest("test-index", "test-id"); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setUpdateAction( + org.opensearch.protobufs.UpdateAction.newBuilder() + .setScript( + org.opensearch.protobufs.Script.newBuilder() + .setInline( + org.opensearch.protobufs.InlineScript.newBuilder() + .setSource("ctx._source.counter += 1") + .setLang( + org.opensearch.protobufs.ScriptLanguage.newBuilder() + .setBuiltin(org.opensearch.protobufs.BuiltinScriptLanguage.BUILTIN_SCRIPT_LANGUAGE_PAINLESS) + .build() + ) + .build() + ) + .build() + ) + .build() + ) + .build(); + + // Test with ByteString.EMPTY (no doc field) + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + ByteString.EMPTY, + bulkRequestBody, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + + assertNotNull("Result should not be null", result); + assertNotNull("Script should be set", result.script()); + assertNull("Doc should not be set when ByteString is empty", result.doc()); + } + + /** + * Test fromProto with null document bytes + */ + public void testFromProtoWithNullDocumentBytes() { + UpdateRequest updateRequest = new UpdateRequest("test-index", "test-id"); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setUpdateAction( + org.opensearch.protobufs.UpdateAction.newBuilder() + .setScript( + org.opensearch.protobufs.Script.newBuilder() + .setInline( + org.opensearch.protobufs.InlineScript.newBuilder() + .setSource("ctx._source.counter += 1") + .setLang( + org.opensearch.protobufs.ScriptLanguage.newBuilder() + .setBuiltin(org.opensearch.protobufs.BuiltinScriptLanguage.BUILTIN_SCRIPT_LANGUAGE_PAINLESS) + .build() + ) + .build() + ) + .build() + ) + .build() + ) + .build(); + + // Test with null documentBytes + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + null, + bulkRequestBody, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + + assertNotNull("Result should not be null", result); + assertNotNull("Script should be set", result.script()); + assertNull("Doc should not be set when documentBytes is null", result.doc()); + } + + /** + * Test getDocWriteRequests with update operation using UpdateAction.doc field + */ + public void testGetDocWriteRequestsWithUpdateActionDoc() { + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody updateBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setDoc(UnsafeByteOperations.unsafeWrap(document)).build()) + .build(); + + BulkRequest request = BulkRequest.newBuilder().addBulkRequestBody(updateBody).build(); + + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests(request, "default-index", null, null, null, false); + + assertNotNull("Requests should not be null", requests); + assertEquals("Should have 1 request", 1, requests.length); + assertTrue("Request should be an UpdateRequest", requests[0] instanceof UpdateRequest); + + UpdateRequest updateRequest = (UpdateRequest) requests[0]; + assertNotNull("Doc should be set from UpdateAction.doc", updateRequest.doc()); + } + + /** + * Test valueOrDefault for String with null value and non-null globalDefault + */ + public void testValueOrDefaultStringWithNullValue() { + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody updateBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setObject(UnsafeByteOperations.unsafeWrap(document)) + .build(); + + BulkRequest request = BulkRequest.newBuilder() + .addBulkRequestBody(updateBody) + .setRouting("global-routing") + .setPipeline("global-pipeline") + .build(); + + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests( + request, + "default-index", + null, // defaultRouting is null, should use global routing + null, + null, // defaultPipeline is null, should use global pipeline + false + ); + + assertNotNull("Requests should not be null", requests); + assertEquals("Should have 1 request", 1, requests.length); + + UpdateRequest updateRequest = (UpdateRequest) requests[0]; + assertEquals("Routing should use global value", "global-routing", updateRequest.routing()); + } + + /** + * Test valueOrDefault for Boolean with null value and non-null globalDefault + */ + public void testValueOrDefaultBooleanWithNullValue() { + IndexOperation indexOp = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + BulkRequestBody indexBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setIndex(indexOp).build()) + .setObject(ByteString.copyFromUtf8("{\"field\":\"value\"}")) + .build(); + + BulkRequest request = BulkRequest.newBuilder().addBulkRequestBody(indexBody).setRequireAlias(true).build(); + + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests( + request, + "default-index", + null, + null, + null, + null // defaultRequireAlias is null, should use global requireAlias + ); + + assertNotNull("Requests should not be null", requests); + assertEquals("Should have 1 request", 1, requests.length); + + IndexRequest indexRequest = (IndexRequest) requests[0]; + assertTrue("RequireAlias should use global value", indexRequest.isRequireAlias()); + } + + /** + * Test ByteString to BytesReference conversion with UnsafeByteOperations.unsafeWrap() + * This tests the zero-copy path where ByteString wraps a byte array + */ + public void testByteStringToBytesReferenceZeroCopy() { + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + ByteString byteString = UnsafeByteOperations.unsafeWrap(document); + + WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + createOp, + byteString, + "default-index", + "default-id", + "default-routing", + Versions.MATCH_ANY, + VersionType.INTERNAL, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertNotNull("Source should not be null", indexRequest.source()); + // Verify the content is correct + assertEquals("Source content should match", new String(document, StandardCharsets.UTF_8), indexRequest.source().utf8ToString()); + } + + /** + * Test ByteString to BytesReference conversion with ByteString.copyFrom() + * This tests the copy path where ByteString creates an internal copy + */ + public void testByteStringToBytesReferenceCopy() { + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + ByteString byteString = ByteString.copyFrom(document); + + WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + createOp, + byteString, + "default-index", + "default-id", + "default-routing", + Versions.MATCH_ANY, + VersionType.INTERNAL, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertNotNull("Source should not be null", indexRequest.source()); + assertEquals("Source content should match", new String(document, StandardCharsets.UTF_8), indexRequest.source().utf8ToString()); + } + + /** + * Test ByteString to BytesReference conversion with empty ByteString + */ + public void testByteStringToBytesReferenceEmpty() { + ByteString byteString = ByteString.EMPTY; + + WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + createOp, + byteString, + "default-index", + "default-id", + "default-routing", + Versions.MATCH_ANY, + VersionType.INTERNAL, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertNotNull("Source should not be null", indexRequest.source()); + assertEquals("Source should be empty", 0, indexRequest.source().length()); + } + + /** + * Test update request with doc field using UnsafeByteOperations.unsafeWrap() + */ + public void testUpdateRequestDocFieldZeroCopy() { + byte[] document = "{\"field\":\"updated_value\"}".getBytes(StandardCharsets.UTF_8); + ByteString docBytes = UnsafeByteOperations.unsafeWrap(document); + + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + org.opensearch.protobufs.UpdateAction updateAction = org.opensearch.protobufs.UpdateAction.newBuilder().setDoc(docBytes).build(); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setUpdateAction(updateAction) + .build(); + + UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( + updateOp, + docBytes, + bulkRequestBody, + "default-index", + "default-id", + "default-routing", + null, + 0, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("UpdateRequest should not be null", updateRequest); + assertNotNull("Doc should not be null", updateRequest.doc()); + assertEquals("Doc content should match", new String(document, StandardCharsets.UTF_8), updateRequest.doc().source().utf8ToString()); + } + + /** + * Test update request with upsert field using UnsafeByteOperations.unsafeWrap() + */ + public void testUpdateRequestUpsertFieldZeroCopy() { + byte[] docBytes = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + byte[] upsertBytes = "{\"field\":\"default_value\"}".getBytes(StandardCharsets.UTF_8); + + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + org.opensearch.protobufs.UpdateAction updateAction = org.opensearch.protobufs.UpdateAction.newBuilder() + .setDoc(UnsafeByteOperations.unsafeWrap(docBytes)) + .setUpsert(UnsafeByteOperations.unsafeWrap(upsertBytes)) + .build(); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setUpdateAction(updateAction) + .build(); + + UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( + updateOp, + UnsafeByteOperations.unsafeWrap(docBytes), + bulkRequestBody, + "default-index", + "default-id", + "default-routing", + null, + 0, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("UpdateRequest should not be null", updateRequest); + assertNotNull("Upsert should not be null", updateRequest.upsertRequest()); + assertEquals( + "Upsert content should match", + new String(upsertBytes, StandardCharsets.UTF_8), + updateRequest.upsertRequest().source().utf8ToString() + ); + } + + /** + * Test update request with both doc and upsert fields using ByteString.copyFrom() + */ + public void testUpdateRequestDocAndUpsertCopy() { + byte[] docBytes = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + byte[] upsertBytes = "{\"field\":\"default_value\"}".getBytes(StandardCharsets.UTF_8); + + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + org.opensearch.protobufs.UpdateAction updateAction = org.opensearch.protobufs.UpdateAction.newBuilder() + .setDoc(ByteString.copyFrom(docBytes)) + .setUpsert(ByteString.copyFrom(upsertBytes)) + .build(); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setUpdateAction(updateAction) + .build(); + + UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( + updateOp, + ByteString.copyFrom(docBytes), + bulkRequestBody, + "default-index", + "default-id", + "default-routing", + null, + 0, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("UpdateRequest should not be null", updateRequest); + assertNotNull("Doc should not be null", updateRequest.doc()); + assertNotNull("Upsert should not be null", updateRequest.upsertRequest()); + assertEquals("Doc content should match", new String(docBytes, StandardCharsets.UTF_8), updateRequest.doc().source().utf8ToString()); + assertEquals( + "Upsert content should match", + new String(upsertBytes, StandardCharsets.UTF_8), + updateRequest.upsertRequest().source().utf8ToString() + ); + } + + /** + * Test index request with large document using UnsafeByteOperations.unsafeWrap() + */ + public void testIndexRequestLargeDocumentZeroCopy() { + // Create a large document (> 1KB) + StringBuilder sb = new StringBuilder("{\"data\":\""); + for (int i = 0; i < 200; i++) { + sb.append("0123456789"); + } + sb.append("\"}"); + byte[] document = sb.toString().getBytes(StandardCharsets.UTF_8); + + IndexOperation indexOp = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( + indexOp, + UnsafeByteOperations.unsafeWrap(document), + null, + "default-index", + "default-id", + "default-routing", + Versions.MATCH_ANY, + VersionType.INTERNAL, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertNotNull("Source should not be null", indexRequest.source()); + assertEquals("Source length should match", document.length, indexRequest.source().length()); + assertEquals("Source content should match", new String(document, StandardCharsets.UTF_8), indexRequest.source().utf8ToString()); + } + + /** + * Test create request with UTF-8 encoded document using UnsafeByteOperations.unsafeWrap() + */ + public void testCreateRequestUtf8DocumentZeroCopy() { + String jsonWithUnicode = "{\"field\":\"Hello 世界 🌍\"}"; + byte[] document = jsonWithUnicode.getBytes(StandardCharsets.UTF_8); + + WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + createOp, + UnsafeByteOperations.unsafeWrap(document), + "default-index", + "default-id", + "default-routing", + Versions.MATCH_ANY, + VersionType.INTERNAL, + "default-pipeline", + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertNotNull("Source should not be null", indexRequest.source()); + assertEquals("Source content should match UTF-8", jsonWithUnicode, indexRequest.source().utf8ToString()); + } }