Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add S3Repository.LEGACY_MD5_CHECKSUM_CALCULATION to list of repository-s3 settings ([#19788](https://github.com/opensearch-project/OpenSearch/pull/19788))
- Fix NullPointerException when restoring remote snapshot with missing shard size information ([#19684](https://github.com/opensearch-project/OpenSearch/pull/19684))
- Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650))
- Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937))

### Dependencies
- Update to Gradle 9.2 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) ([#19856](https://github.com/opensearch-project/OpenSearch/pull/19856))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(

return Collections.singletonMap(GRPC_TRANSPORT_SETTING_KEY, () -> {
List<BindableService> grpcServices = new ArrayList<>(
List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils))
List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils))
);
for (GrpcServiceFactory serviceFac : servicesFactory) {
List<BindableService> pluginServices = serviceFac.initClient(client)
Expand Down Expand Up @@ -259,10 +259,9 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
if (client == null || queryRegistry == null) {
throw new RuntimeException("createComponents must be called first to initialize server provided resources.");
}

return Collections.singletonMap(GRPC_SECURE_TRANSPORT_SETTING_KEY, () -> {
List<BindableService> grpcServices = new ArrayList<>(
List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils))
List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils))
);
for (GrpcServiceFactory serviceFac : servicesFactory) {
List<BindableService> pluginServices = serviceFac.initClient(client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private FetchSourceContextProtoUtils() {
* @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided
*/
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.BulkRequest request) {
Boolean fetchSource = true;
Boolean fetchSource = null;
String[] sourceExcludes = null;
String[] sourceIncludes = null;

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.transport.grpc.proto.request.document.bulk;

import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.protobufs.BulkRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.document.RestBulkAction;
Expand Down Expand Up @@ -37,9 +38,10 @@ private BulkRequestProtoUtils() {
* Please ensure to keep both implementations consistent.
*
* @param request the request to execute
* @param settings node settings for security and configuration
* @return a future of the bulk action that was executed
*/
public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request) {
public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request, Settings settings) {
org.opensearch.action.bulk.BulkRequest bulkRequest = Requests.bulkRequest();

String defaultIndex = request.hasIndex() ? request.getIndex() : null;
Expand All @@ -60,6 +62,9 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest

bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh()));

// Read the allowExplicitIndex setting (matches REST BulkAction line 74)
boolean allowExplicitIndex = RestBulkAction.MULTI_ALLOW_EXPLICIT_INDEX.get(settings);

// Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x
/*
if (request.hasBatchSize()){
Expand All @@ -75,7 +80,8 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias
defaultRequireAlias,
allowExplicitIndex
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.protobufs.services.DocumentServiceGrpc;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.grpc.listeners.BulkRequestActionListener;
Expand All @@ -25,14 +26,17 @@
public class DocumentServiceImpl extends DocumentServiceGrpc.DocumentServiceImplBase {
private static final Logger logger = LogManager.getLogger(DocumentServiceImpl.class);
private final Client client;
private final Settings settings;

/**
* Creates a new DocumentServiceImpl.
*
* @param client Client for executing actions on the local node
* @param settings Node settings for security and configuration
*/
public DocumentServiceImpl(Client client) {
public DocumentServiceImpl(Client client, Settings settings) {
this.client = client;
this.settings = settings;
}

/**
Expand All @@ -44,7 +48,7 @@ public DocumentServiceImpl(Client client) {
@Override
public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver) {
try {
org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request);
org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, settings);
BulkRequestActionListener listener = new BulkRequestActionListener(responseObserver);
client.bulk(bulkRequest, listener);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,9 @@ public void testParseFromProtoRequestWithNoSourceParams() {
FetchSourceContext context = FetchSourceContextProtoUtils.parseFromProtoRequest(request);

// Verify the result
// The implementation returns a default FetchSourceContext with fetchSource=true
// and empty includes/excludes arrays when no source parameters are provided
assertNotNull("Context should not be null", context);
assertTrue("fetchSource should be true", context.fetchSource());
assertArrayEquals("includes should be empty", Strings.EMPTY_ARRAY, context.includes());
assertArrayEquals("excludes should be empty", Strings.EMPTY_ARRAY, context.excludes());
// When no source parameters are provided, should return null to match REST API behavior
// This prevents the "get" field from being returned in update/upsert responses
assertNull("Context should be null when no source parameters provided", context);
}

public void testFromProtoWithFetch() {
Expand Down
Loading
Loading