diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a3cb39c09a0b..b05277f2bd4c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Use Lucene `pack` method for `half_float` and `usigned_long` when using `ApproximatePointRangeQuery`. - Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233)) - Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568)) +- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) diff --git a/modules/transport-grpc/spi/README.md b/modules/transport-grpc/spi/README.md index 9a3609abbe528..0778e6f9c3577 100644 --- a/modules/transport-grpc/spi/README.md +++ b/modules/transport-grpc/spi/README.md @@ -8,6 +8,7 @@ The `transport-grpc-spi` module enables plugin developers to: - Implement custom query converters for gRPC transport - Extend gRPC protocol buffer handling - Register custom query types that can be processed via gRPC +- Register gRPC interceptors with explicit ordering ## Key Components @@ -26,6 +27,26 @@ public interface QueryBuilderProtoConverter { Interface for accessing the query converter registry. This provides a clean abstraction for plugins that need to convert nested queries without exposing internal implementation details. +### GrpcInterceptorProvider + +Interface for providing gRPC interceptors that can intercept and process all incoming gRPC requests. This enables plugins to implement cross-cutting concerns like authentication, authorization, logging, and metrics collection. + +```java +public interface GrpcInterceptorProvider { + List getOrderedGrpcInterceptors(); + + /** + * Nested interface for ordered gRPC interceptors. + * Interceptors are executed in order based on their order() value, + * with lower values executing first (e.g., 10 before 20). + */ + interface OrderedGrpcInterceptor { + int order(); // Lower values execute first + ServerInterceptor getInterceptor(); + } +} +``` + ## Usage for Plugin Developers ### 1. Add Dependency @@ -36,10 +57,47 @@ Add the SPI dependency to your plugin's `build.gradle`: dependencies { compileOnly 'org.opensearch.plugin:transport-grpc-spi:${opensearch.version}' compileOnly 'org.opensearch:protobufs:${protobufs.version}' + compileOnly 'io.grpc:grpc-api:${versions.grpc}' } ``` -### 2. Implement Custom Query Converter +### 2. Declare Extension in build.gradle + +In your `build.gradle`, declare that your plugin extends `transport-grpc`. This automatically adds the `extended.plugins=transport-grpc` entry to the auto-generated `plugin-descriptor.properties` file: : + +```groovy +opensearchplugin { + name 'your-plugin-name' + description 'Your plugin description' + classname 'org.opensearch.yourplugin.YourPlugin' + extendedPlugins = ['transport-grpc'] // Declare extension here +} +``` +**Real-world examples:** +- [OpenSearch Reporting Plugin](https://github.com/opensearch-project/reporting/blob/main/build.gradle#L92) +- [OpenSearch k-NN Plugin](https://github.com/opensearch-project/k-NN/blob/main/build.gradle#L319) + +### 3. Create SPI Registration File(s) + +Create a service file denoting your plugin's implementation of a service interface. + +For QueryBuilderProtoConverter implementations: +`src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.QueryBuilderProtoConverter`: + +``` +org.opensearch.mypackage.MyCustomQueryConverter +``` + +For `GrpcInterceptorProvider` implementations: `src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.GrpcInterceptorProvider`: + +``` +org.opensearch.mypackage.SampleInterceptorProvider +``` + + + +## QueryBuilderProtoConverter +### 1. Implement Custom Query Converter ```java public class MyCustomQueryConverter implements QueryBuilderProtoConverter { @@ -58,7 +116,7 @@ public class MyCustomQueryConverter implements QueryBuilderProtoConverter { } ``` -### 3. Register Your Converter +### 2. Register Your Converter In your plugin's main class, return the converter from createComponents: @@ -80,23 +138,7 @@ public class MyPlugin extends Plugin { } ``` -**Step 3b: Create SPI Registration File** - -Create a file at `src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.QueryBuilderProtoConverter`: - -``` -org.opensearch.mypackage.MyCustomQueryConverter -``` - -**Step 3c: Declare Extension in Plugin Descriptor** - -In your `plugin-descriptor.properties`, declare that your plugin extends transport-grpc: - -```properties -extended.plugins=transport-grpc -``` - -### 4. Accessing the Registry (For Complex Queries) +### 3. Accessing the Registry (For Complex Queries) If your converter needs to handle nested queries (like k-NN's filter clause), you'll need access to the registry to convert other query types. The transport-grpc plugin will inject the registry into your converter. @@ -188,7 +230,7 @@ public class KNNQueryBuilderProtoConverter implements QueryBuilderProtoConverter ./gradlew :modules:transport-grpc:spi:test ``` -### Testing Your Custom Converter +### 4. Testing Your Custom Converter ```java @Test @@ -275,3 +317,69 @@ org.opensearch.knn.grpc.proto.request.search.query.KNNQueryBuilderProtoConverter **Why k-NN needs the registry:** The k-NN query's `filter` field is a `QueryContainer` protobuf type that can contain any query type (MatchAll, Term, Terms, etc.). The k-NN converter needs access to the registry to convert these nested queries to their corresponding QueryBuilder objects. + + +## gRPC Interceptors + +### Overview + +Intercept incoming gRPC requests for authentication, authorization, logging, metrics, rate limiting,etc + +### Basic Usage + +**1. Implement Provider:** +```java +public class SampleInterceptorProvider implements GrpcInterceptorProvider { + @Override + public List getOrderedGrpcInterceptors() { + return Arrays.asList( + // First interceptor (order = 5, runs first) + new GrpcInterceptorProvider.OrderedGrpcInterceptor() { + @Override + public int order() { return 5; } // Lower = higher priority + + @Override + public ServerInterceptor getInterceptor() { + return (call, headers, next) -> { + String methodName = call.getMethodDescriptor().getFullMethodName(); + System.out.println("First interceptor - Method: " + methodName); + return next.startCall(call, headers); + }; + } + }, + + // Second interceptor (order = 10, runs after first) + new GrpcInterceptorProvider.OrderedGrpcInterceptor() { + @Override + public int order() { return 10; } + + @Override + public ServerInterceptor getInterceptor() { + return (call, headers, next) -> { + System.out.println("Second interceptor - Processing request"); + return next.startCall(call, headers); + }; + } + } + ); + } +} +``` + +### Understanding Interceptor Ordering + +#### How Order Values Work + +Interceptors are executed based on their `order()` value. Lower order values execute first. The interceptor chain processes requests from lowest to highest order value. + +``` +Request → [order=5] → [order=10] → [order=100] → Service Handler +``` + +#### Duplicate Order Values + +Each interceptor must have a unique order value. If duplicate order values are detected, OpenSearch will fail to start with an `IllegalArgumentException`. +``` +IllegalArgumentException: Multiple gRPC interceptors have the same order value: 10. +Each interceptor must have a unique order value. +``` diff --git a/modules/transport-grpc/spi/build.gradle b/modules/transport-grpc/spi/build.gradle index 82ccdd824a696..991fbabb8fe2f 100644 --- a/modules/transport-grpc/spi/build.gradle +++ b/modules/transport-grpc/spi/build.gradle @@ -15,8 +15,28 @@ base { } dependencies { - api project(":server") - api "org.opensearch:protobufs:${versions.opensearchprotobufs}" + implementation project(":server") + implementation "org.opensearch:protobufs:${versions.opensearchprotobufs}" + implementation "io.grpc:grpc-api:${versions.grpc}" testImplementation project(":test:framework") } + +thirdPartyAudit { + ignoreMissingClasses( + 'com.google.common.base.Joiner', + 'com.google.common.base.MoreObjects', + 'com.google.common.base.MoreObjects$ToStringHelper', + 'com.google.common.base.Objects', + 'com.google.common.base.Preconditions', + 'com.google.common.base.Strings', + 'com.google.common.base.Throwables', + 'com.google.common.collect.ImmutableList', + 'com.google.common.collect.ImmutableMap', + 'com.google.common.collect.Maps', + 'com.google.common.collect.Sets', + 'com.google.common.io.BaseEncoding', + 'com.google.common.io.ByteStreams', + 'com.google.common.util.concurrent.ListenableFuture' + ) +} diff --git a/modules/transport-grpc/spi/licenses/grpc-api-1.75.0.jar.sha1 b/modules/transport-grpc/spi/licenses/grpc-api-1.75.0.jar.sha1 new file mode 100644 index 0000000000000..cedd356c2200c --- /dev/null +++ b/modules/transport-grpc/spi/licenses/grpc-api-1.75.0.jar.sha1 @@ -0,0 +1 @@ +18ddd409fb9bc0209d216854ca584d027e68210b \ No newline at end of file diff --git a/modules/transport-grpc/spi/licenses/grpc-api-LICENSE.txt b/modules/transport-grpc/spi/licenses/grpc-api-LICENSE.txt new file mode 100644 index 0000000000000..6b0b1270ff0ca --- /dev/null +++ b/modules/transport-grpc/spi/licenses/grpc-api-LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/modules/transport-grpc/spi/licenses/grpc-api-NOTICE.txt b/modules/transport-grpc/spi/licenses/grpc-api-NOTICE.txt new file mode 100644 index 0000000000000..b79586a1a160e --- /dev/null +++ b/modules/transport-grpc/spi/licenses/grpc-api-NOTICE.txt @@ -0,0 +1,63 @@ +Copyright 2014 The gRPC Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +----------------------------------------------------------------------- + +This product contains a modified portion of 'OkHttp', an open source +HTTP & SPDY client for Android and Java applications, which can be obtained +at: + + * LICENSE: + * okhttp/third_party/okhttp/LICENSE (Apache License 2.0) + * HOMEPAGE: + * https://github.com/square/okhttp + * LOCATION_IN_GRPC: + * okhttp/third_party/okhttp + +This product contains a modified portion of 'Envoy', an open source +cloud-native high-performance edge/middle/service proxy, which can be +obtained at: + + * LICENSE: + * xds/third_party/envoy/LICENSE (Apache License 2.0) + * NOTICE: + * xds/third_party/envoy/NOTICE + * HOMEPAGE: + * https://www.envoyproxy.io + * LOCATION_IN_GRPC: + * xds/third_party/envoy + +This product contains a modified portion of 'protoc-gen-validate (PGV)', +an open source protoc plugin to generate polyglot message validators, +which can be obtained at: + + * LICENSE: + * xds/third_party/protoc-gen-validate/LICENSE (Apache License 2.0) + * NOTICE: + * xds/third_party/protoc-gen-validate/NOTICE + * HOMEPAGE: + * https://github.com/envoyproxy/protoc-gen-validate + * LOCATION_IN_GRPC: + * xds/third_party/protoc-gen-validate + +This product contains a modified portion of 'udpa', +an open source universal data plane API, which can be obtained at: + + * LICENSE: + * xds/third_party/udpa/LICENSE (Apache License 2.0) + * HOMEPAGE: + * https://github.com/cncf/udpa + * LOCATION_IN_GRPC: + * xds/third_party/udpa + diff --git a/modules/transport-grpc/spi/src/main/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProvider.java b/modules/transport-grpc/spi/src/main/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProvider.java new file mode 100644 index 0000000000000..d111ca6fa71f5 --- /dev/null +++ b/modules/transport-grpc/spi/src/main/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProvider.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.transport.grpc.spi; + +import java.util.List; + +import io.grpc.ServerInterceptor; + +/** + * SPI interface for providing gRPC interceptors. + * Plugins can implement this interface to provide custom gRPC interceptors + * that will be executed in the order specified by their OrderedGrpcInterceptor. + */ +public interface GrpcInterceptorProvider { + + /** + * Returns a list of ordered gRPC interceptors. + * Each interceptor must have a unique order value. + * + * @return List of ordered gRPC interceptors + */ + List getOrderedGrpcInterceptors(); + + /** + * Provides a gRPC interceptor with an order value for execution priority. + * Interceptors with lower order values are applied earlier. + */ + interface OrderedGrpcInterceptor { + /** + * Defines the order in which the interceptor should be applied. + * Lower values indicate higher priority. + * Must be unique across all interceptors - no two interceptors should have the same order. + * + * @return the order value + */ + int order(); + + /** + * Returns the actual gRPC ServerInterceptor instance. + * + * @return the server interceptor + */ + ServerInterceptor getInterceptor(); + } +} diff --git a/modules/transport-grpc/spi/src/test/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProviderTests.java b/modules/transport-grpc/spi/src/test/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProviderTests.java new file mode 100644 index 0000000000000..103fa610c3d0d --- /dev/null +++ b/modules/transport-grpc/spi/src/test/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProviderTests.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc.spi; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.List; + +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; + +public class GrpcInterceptorProviderTests extends OpenSearchTestCase { + + public void testBasicProviderImplementation() { + TestGrpcInterceptorProvider provider = new TestGrpcInterceptorProvider(10); + + List interceptors = provider.getOrderedGrpcInterceptors(); + assertNotNull(interceptors); + assertEquals(1, interceptors.size()); + assertEquals(10, interceptors.get(0).order()); + } + + public void testProviderReturnsEmptyList() { + GrpcInterceptorProvider provider = new GrpcInterceptorProvider() { + @Override + public List getOrderedGrpcInterceptors() { + return Collections.emptyList(); + } + }; + + List interceptors = provider.getOrderedGrpcInterceptors(); + assertNotNull(interceptors); + assertTrue(interceptors.isEmpty()); + } + + private static class TestGrpcInterceptorProvider implements GrpcInterceptorProvider { + private final int order; + + TestGrpcInterceptorProvider(int order) { + this.order = order; + } + + @Override + public List getOrderedGrpcInterceptors() { + return Collections.singletonList(createTestInterceptor(order, "test-interceptor")); + } + } + + /** + * Creates a test OrderedGrpcInterceptor + */ + private static GrpcInterceptorProvider.OrderedGrpcInterceptor createTestInterceptor(int order, String name) { + return new GrpcInterceptorProvider.OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + return next.startCall(call, headers); + } + }; + } + }; + } +} diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index 95886eb6b91bb..47db838ef4fb4 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -32,10 +32,13 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.AuxTransport; import org.opensearch.transport.client.Client; +import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain; import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils; import org.opensearch.transport.grpc.proto.request.search.query.QueryBuilderProtoConverterRegistryImpl; import org.opensearch.transport.grpc.services.DocumentServiceImpl; import org.opensearch.transport.grpc.services.SearchServiceImpl; +import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider; +import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider.OrderedGrpcInterceptor; import org.opensearch.transport.grpc.spi.QueryBuilderProtoConverter; import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport; import org.opensearch.watcher.ResourceWatcherService; @@ -43,6 +46,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -79,6 +84,7 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl private final List queryConverters = new ArrayList<>(); private QueryBuilderProtoConverterRegistryImpl queryRegistry; private AbstractQueryBuilderProtoUtils queryUtils; + private GrpcInterceptorChain serverInterceptor = new GrpcInterceptorChain(); /** * Creates a new GrpcPlugin instance. @@ -109,6 +115,42 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) { } else { logger.info("No QueryBuilderProtoConverter extensions found from other plugins"); } + List providers = loader.loadExtensions(GrpcInterceptorProvider.class); + if (providers != null) { + List orderedList = new ArrayList<>(); + for (GrpcInterceptorProvider provider : providers) { + orderedList.addAll(provider.getOrderedGrpcInterceptors()); + } + + // Validate that no two interceptors have the same order + Map> orderMap = new HashMap<>(); + for (OrderedGrpcInterceptor interceptor : orderedList) { + int order = interceptor.order(); + orderMap.computeIfAbsent(order, k -> new ArrayList<>()).add(interceptor); + } + + // Check for duplicates and throw exception if found + for (Map.Entry> entry : orderMap.entrySet()) { + if (entry.getValue().size() > 1) { + throw new IllegalArgumentException( + "Multiple gRPC interceptors have the same order value: " + + entry.getKey() + + ". Each interceptor must have a unique order value." + ); + } + } + + // Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern + orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order)); + + if (!orderedList.isEmpty()) { + // Create a single chain interceptor that manages the execution + // This ensures proper ordering and exception handling + serverInterceptor.addInterceptors(orderedList); + + logger.info("Loaded {} gRPC interceptors into chain", orderedList.size()); + } + } } /** @@ -169,7 +211,7 @@ public Map> getAuxTransports( ); return Collections.singletonMap( GRPC_TRANSPORT_SETTING_KEY, - () -> new Netty4GrpcServerTransport(settings, grpcServices, networkService, threadPool) + () -> new Netty4GrpcServerTransport(settings, grpcServices, networkService, threadPool, serverInterceptor) ); } @@ -217,7 +259,8 @@ public Map> getSecureAuxTransports( grpcServices, networkService, threadPool, - secureAuxTransportSettingsProvider + secureAuxTransportSettingsProvider, + serverInterceptor ) ); } diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java index 4812680a7bff9..de2fb0079c652 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java @@ -24,6 +24,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.AuxTransport; import org.opensearch.transport.BindTransportException; +import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain; import java.io.IOException; import java.net.InetAddress; @@ -41,6 +42,7 @@ import io.grpc.BindableService; import io.grpc.Server; +import io.grpc.ServerInterceptor; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; @@ -209,6 +211,7 @@ public class Netty4GrpcServerTransport extends AuxTransport { private final int executorThreads; private final long maxInboundMessageSize; private final long maxConcurrentConnectionCalls; + private final ServerInterceptor serverInterceptor; private final TimeValue maxConnectionAge; private final TimeValue maxConnectionIdle; private final TimeValue keepAliveTimeout; @@ -226,16 +229,19 @@ public class Netty4GrpcServerTransport extends AuxTransport { * @param services the gRPC compatible services to be registered with the server. * @param networkService the bind/publish addresses. * @param threadPool the thread pool for gRPC request processing. + * @param serverInterceptor the gRPC server interceptor to be applied. */ public Netty4GrpcServerTransport( Settings settings, List services, NetworkService networkService, - ThreadPool threadPool + ThreadPool threadPool, + ServerInterceptor serverInterceptor ) { logger.debug("Initializing Netty4GrpcServerTransport with settings = {}", settings); this.settings = Objects.requireNonNull(settings); this.services = Objects.requireNonNull(services); + this.serverInterceptor = Objects.requireNonNull(serverInterceptor); this.networkService = Objects.requireNonNull(networkService); this.threadPool = Objects.requireNonNull(threadPool); final List grpcBindHost = SETTING_GRPC_BIND_HOST.get(settings); @@ -256,6 +262,22 @@ public Netty4GrpcServerTransport( this.portSettingKey = SETTING_GRPC_PORT.getKey(); } + /** + * Creates a new Netty4GrpcServerTransport instance. + * @param settings the configured settings. + * @param services the gRPC compatible services to be registered with the server. + * @param networkService the bind/publish addresses. + * @param threadPool the thread pool for gRPC request processing. + */ + public Netty4GrpcServerTransport( + Settings settings, + List services, + NetworkService networkService, + ThreadPool threadPool + ) { + this(settings, services, networkService, threadPool, new GrpcInterceptorChain()); + } + /** * Returns the setting key used to identify this transport type. * @@ -424,7 +446,8 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan .keepAliveTimeout(keepAliveTimeout.duration(), keepAliveTimeout.timeUnit()) .channelType(NioServerSocketChannel.class) .addService(new HealthStatusManager().getHealthService()) - .addService(ProtoReflectionService.newInstance()); + .addService(ProtoReflectionService.newInstance()) + .intercept(serverInterceptor); for (UnaryOperator op : serverBuilderConfigs) { op.apply(serverBuilder); diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/interceptor/GrpcInterceptorChain.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/interceptor/GrpcInterceptorChain.java new file mode 100644 index 0000000000000..1e5f075cab302 --- /dev/null +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/interceptor/GrpcInterceptorChain.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc.interceptor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider.OrderedGrpcInterceptor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; + +/** + * Simple gRPC interceptor chain that executes OrderedGrpcInterceptors in order and handles exceptions + */ +public class GrpcInterceptorChain implements ServerInterceptor { + + private static final Logger logger = LogManager.getLogger(GrpcInterceptorChain.class); + + private static final ServerCall.Listener EMPTY_LISTENER = new ServerCall.Listener<>() { + }; + + private final List interceptors = new ArrayList<>(); + + /** + * Constructs an empty GrpcInterceptorChain. + */ + public GrpcInterceptorChain() {} + + /** + * Constructs a GrpcInterceptorChain with the provided list of ordered interceptors. + * @param interceptors List of OrderedGrpcInterceptor instances to be applied in order + */ + public GrpcInterceptorChain(List interceptors) { + this.interceptors.addAll(Objects.requireNonNull(interceptors)); + } + + /** + * Adds interceptors to the chain. + * @param interceptors List of OrderedGrpcInterceptor instances to be added + */ + public void addInterceptors(List interceptors) { + this.interceptors.addAll(Objects.requireNonNull(interceptors)); + } + + /** + * Intercepts a gRPC call, executing the chain of interceptors in order. + * @param call object to receive response messages + * @param headers which can contain extra call metadata + * @param next next processor in the interceptor chain + * @return a listener for processing incoming request messages + */ + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + ServerCallHandler currentHandler = next; + + for (int i = interceptors.size() - 1; i >= 0; i--) { + final OrderedGrpcInterceptor interceptor = interceptors.get(i); + final ServerCallHandler nextHandler = currentHandler; + final int index = i; + + currentHandler = new ServerCallHandler() { + @Override + public ServerCall.Listener startCall(ServerCall call, Metadata headers) { + try { + return interceptor.getInterceptor().interceptCall(call, headers, nextHandler); + } catch (StatusRuntimeException sre) { + logger.error( + "Interceptor at index [{}] failed with status [{}]: {}", + index, + sre.getStatus().getCode(), + sre.getMessage() + ); + call.close(sre.getStatus(), headers); + return emptyListener(); + } catch (Exception e) { + // Unexpected exception - wrap in INTERNAL for safety + logger.error("Interceptor at index [{}] failed unexpectedly: {}", index, e.getMessage()); + call.close(Status.INTERNAL.withDescription("Interceptor failure: " + e.getMessage()), headers); + return emptyListener(); + } + } + }; + } + return currentHandler.startCall(call, headers); + } + + /** + * Returns a reusable empty listener to minimize object allocation on interceptor failures. + * @param the request type + * @return an empty ServerCall.Listener + */ + @SuppressWarnings("unchecked") + private static ServerCall.Listener emptyListener() { + return (ServerCall.Listener) EMPTY_LISTENER; + } +} diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/interceptor/package-info.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/interceptor/package-info.java new file mode 100644 index 0000000000000..b81a3d4e54ba0 --- /dev/null +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/interceptor/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Contains interfaces and classes related to gRPC interceptors + * used for transport-level request handling in OpenSearch. + */ +package org.opensearch.transport.grpc.interceptor; diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java index 24e5ea49cf654..63564a57d126d 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java @@ -26,6 +26,7 @@ import java.util.Optional; import io.grpc.BindableService; +import io.grpc.ServerInterceptor; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig; import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolNames; @@ -76,15 +77,17 @@ public Collection cipherSuites() { * @param networkService the bind/publish addresses. * @param threadPool the thread pool for managing gRPC executor and monitoring. * @param secureTransportSettingsProvider TLS configuration settings. + * @param serverInterceptor the gRPC server interceptor to be registered with the server. */ public SecureNetty4GrpcServerTransport( Settings settings, List services, NetworkService networkService, ThreadPool threadPool, - SecureAuxTransportSettingsProvider secureTransportSettingsProvider + SecureAuxTransportSettingsProvider secureTransportSettingsProvider, + ServerInterceptor serverInterceptor ) { - super(settings, services, networkService, threadPool); + super(settings, services, networkService, threadPool, serverInterceptor); this.port = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.get(settings); this.portSettingKey = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.getKey(); try { diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java index 7bb4a1dde85d1..3c99787275f8c 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java @@ -20,16 +20,27 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.AuxTransport; import org.opensearch.transport.client.Client; +import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain; +import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider; +import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider.OrderedGrpcInterceptor; import org.opensearch.transport.grpc.spi.QueryBuilderProtoConverter; import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport; import org.junit.Before; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -50,6 +61,7 @@ import static org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport.GRPC_SECURE_TRANSPORT_SETTING_KEY; import static org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT; import static org.opensearch.transport.grpc.ssl.SecureSettingsHelpers.getServerClientAuthNone; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; public class GrpcPluginTests extends OpenSearchTestCase { @@ -339,4 +351,543 @@ public void testCreateComponentsWithExternalConverters() { // 2. In updateRegistryOnAllConverters() to ensure all converters have the complete registry Mockito.verify(mockConverter, Mockito.times(2)).setRegistry(Mockito.any()); } + + // Test cases for gRPC interceptor functionality + + public void testLoadExtensionsWithGrpcInterceptors() { + testInterceptorLoading(List.of(1, 2), null); + } + + public void testLoadExtensionsWithGrpcInterceptorsOrdering() { + testInterceptorLoading(List.of(3, 1, 2), null); // Out of order - should be sorted + } + + public void testLoadExtensionsWithDuplicateGrpcInterceptorOrder() { + testInterceptorLoading(List.of(1, 1), IllegalArgumentException.class); + } + + public void testLoadExtensionsWithMultipleProvidersAndDuplicateOrder() { + testInterceptorLoadingWithMultipleProviders(List.of(List.of(5), List.of(5)), IllegalArgumentException.class); + } + + public void testLoadExtensionsWithNullGrpcInterceptorProviders() { + testInterceptorLoading(null, null); + } + + public void testLoadExtensionsWithEmptyGrpcInterceptorList() { + testInterceptorLoading(List.of(), null); + } + + public void testLoadExtensionsWithSameExplicitOrderInterceptors() { + testInterceptorLoading(List.of(5, 5), IllegalArgumentException.class); + } + + // Test cases for interceptor chain failure handling + + public void testInterceptorExceptionDuringRequestPhase() { + testInterceptorLoading(List.of(1, 2), null); // Runtime failures don't affect loading + } + + // Test cases for actual request processing with interceptors + + public void testInterceptorRequestProcessingWithSuccess() { + testRequestProcessing( + List.of(new TestRequestInterceptor(1, true, "Auth check passed")), + true, + "Request should succeed when interceptor succeeds" + ); + } + + public void testInterceptorRequestProcessingWithFailure() { + testRequestProcessing( + List.of(new TestRequestInterceptor(1, false, "Auth failed")), + false, + "Request should fail when interceptor fails" + ); + } + + public void testMultipleInterceptorsRequestProcessing() { + testRequestProcessing( + List.of( + new TestRequestInterceptor(1, true, "Auth passed"), + new TestRequestInterceptor(2, true, "Rate limit passed"), + new TestRequestInterceptor(3, true, "Validation passed") + ), + true, + "Request should succeed when all interceptors succeed" + ); + } + + public void testMultipleInterceptorsWithEarlyFailure() { + testRequestProcessing( + List.of( + new TestRequestInterceptor(1, true, "Auth passed"), + new TestRequestInterceptor(2, false, "Rate limit exceeded"), // Fails here + new TestRequestInterceptor(3, true, "Validation passed") // Should not execute + ), + false, + "Request should fail when any interceptor fails" + ); + } + + public void testInterceptorExceptionHandling() { + testRequestProcessingWithException( + new TestExceptionThrowingInterceptor(1, "Security check failed"), + SecurityException.class, + "Security check failed" + ); + } + + public void testInterceptorChainExceptionPropagation() { + // Test that exceptions in interceptor chain stop processing + @SuppressWarnings("unchecked") + ServerCall mockCall = Mockito.mock(ServerCall.class); + @SuppressWarnings("unchecked") + ServerCallHandler mockHandler = Mockito.mock(ServerCallHandler.class); + Metadata headers = new Metadata(); + + TestExceptionThrowingInterceptor throwingInterceptor = new TestExceptionThrowingInterceptor(2, "Rate limit service down"); + + Exception thrown = expectThrows( + RuntimeException.class, + () -> { throwingInterceptor.interceptCall(mockCall, headers, mockHandler); } + ); + + assertEquals("Rate limit service down", thrown.getMessage()); + } + + // Generic helper methods for cleaner, more maintainable tests + + /** + * Generic interceptor loading test helper + * @param orders List of interceptor orders (null for no interceptors) + * @param expectedException Expected exception class (null if no exception expected) + */ + private void testInterceptorLoading(List orders, Class expectedException) { + GrpcPlugin plugin = new GrpcPlugin(); + ExtensiblePlugin.ExtensionLoader mockLoader = createMockLoader(orders); + + if (expectedException != null) { + expectThrows(expectedException, () -> plugin.loadExtensions(mockLoader)); + } else { + assertDoesNotThrow(() -> plugin.loadExtensions(mockLoader)); + } + } + + /** + * Test interceptor loading with multiple providers + */ + private void testInterceptorLoadingWithMultipleProviders( + List> providerOrders, + Class expectedException + ) { + GrpcPlugin plugin = new GrpcPlugin(); + ExtensiblePlugin.ExtensionLoader mockLoader = createMockLoaderWithMultipleProviders(providerOrders); + + if (expectedException != null) { + expectThrows(expectedException, () -> plugin.loadExtensions(mockLoader)); + } else { + assertDoesNotThrow(() -> plugin.loadExtensions(mockLoader)); + } + } + + /** + * Creates a mock extension loader with interceptors of given orders + */ + private ExtensiblePlugin.ExtensionLoader createMockLoader(List orders) { + ExtensiblePlugin.ExtensionLoader mockLoader = Mockito.mock(ExtensiblePlugin.ExtensionLoader.class); + when(mockLoader.loadExtensions(QueryBuilderProtoConverter.class)).thenReturn(null); + + if (orders == null) { + when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(null); + } else if (orders.isEmpty()) { + GrpcInterceptorProvider mockProvider = Mockito.mock(GrpcInterceptorProvider.class); + when(mockProvider.getOrderedGrpcInterceptors()).thenReturn(new ArrayList<>()); + when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(List.of(mockProvider)); + } else { + List interceptors = orders.stream().map(order -> createMockInterceptor(order)).toList(); + + GrpcInterceptorProvider mockProvider = Mockito.mock(GrpcInterceptorProvider.class); + when(mockProvider.getOrderedGrpcInterceptors()).thenReturn(interceptors); + when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(List.of(mockProvider)); + } + + return mockLoader; + } + + /** + * Creates a mock extension loader with multiple providers + */ + private ExtensiblePlugin.ExtensionLoader createMockLoaderWithMultipleProviders(List> providerOrders) { + ExtensiblePlugin.ExtensionLoader mockLoader = Mockito.mock(ExtensiblePlugin.ExtensionLoader.class); + when(mockLoader.loadExtensions(QueryBuilderProtoConverter.class)).thenReturn(null); + + List providers = providerOrders.stream().map(orders -> { + List interceptors = orders.stream().map(this::createMockInterceptor).toList(); + GrpcInterceptorProvider provider = Mockito.mock(GrpcInterceptorProvider.class); + when(provider.getOrderedGrpcInterceptors()).thenReturn(interceptors); + return provider; + }).toList(); + + when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(providers); + return mockLoader; + } + + /** + * Test actual request processing with interceptors + */ + private void testRequestProcessing(List interceptors, boolean shouldSucceed, String description) { + // Create mock server call and handler + @SuppressWarnings("unchecked") + ServerCall mockCall = Mockito.mock(ServerCall.class); + @SuppressWarnings("unchecked") + ServerCallHandler mockHandler = Mockito.mock(ServerCallHandler.class); + @SuppressWarnings("unchecked") + ServerCall.Listener mockListener = Mockito.mock(ServerCall.Listener.class); + Metadata headers = new Metadata(); + + when(mockHandler.startCall(Mockito.any(), Mockito.any())).thenReturn(mockListener); + + // Track if call.close() was called (indicates failure) + AtomicBoolean callClosed = new AtomicBoolean(false); + doAnswer(invocation -> { + callClosed.set(true); + return null; + }).when(mockCall).close(Mockito.any(Status.class), Mockito.any(Metadata.class)); + + // Execute the interceptors in chain (simulating real gRPC behavior) + ServerCallHandler currentHandler = mockHandler; + + for (TestRequestInterceptor interceptor : interceptors) { + final ServerCallHandler nextHandler = currentHandler; + ServerCall.Listener result = interceptor.interceptCall(mockCall, headers, nextHandler); + + // Check if the call was closed (indicating failure) + if (callClosed.get()) { + // A failure occurred - this is expected for failure test cases + if (!shouldSucceed) { + // This is expected - test passes + return; + } else { + fail("Unexpected failure in success test case"); + } + } + + // If we reach here, the interceptor succeeded + assertNotNull(result); + + // Create a new handler for the next interceptor that continues the chain + currentHandler = new ServerCallHandler() { + @Override + public ServerCall.Listener startCall(ServerCall call, Metadata headers) { + return result; + } + }; + } + + // If we reach here and shouldSucceed is false, the test failed to fail as expected + if (!shouldSucceed) { + fail("Expected failure but all interceptors succeeded"); + } + + // For success case, ensure call was not closed + assertFalse("Call should not be closed for success case", callClosed.get()); + } + + /** + * Test request processing with exception throwing interceptor + */ + private void testRequestProcessingWithException( + TestExceptionThrowingInterceptor interceptor, + Class expectedExceptionType, + String expectedMessage + ) { + @SuppressWarnings("unchecked") + ServerCall mockCall = Mockito.mock(ServerCall.class); + @SuppressWarnings("unchecked") + ServerCallHandler mockHandler = Mockito.mock(ServerCallHandler.class); + Metadata headers = new Metadata(); + + Exception thrown = expectThrows(Exception.class, () -> { interceptor.interceptCall(mockCall, headers, mockHandler); }); + + assertTrue(expectedExceptionType.isInstance(thrown)); + assertTrue(thrown.getMessage().contains(expectedMessage)); + } + + /** + * Creates a mock interceptor with given order + */ + private OrderedGrpcInterceptor createMockInterceptor(int order) { + OrderedGrpcInterceptor mock = Mockito.mock(OrderedGrpcInterceptor.class); + when(mock.order()).thenReturn(order); + when(mock.getInterceptor()).thenReturn(Mockito.mock(ServerInterceptor.class)); + return mock; + } + + private void assertDoesNotThrow(Runnable runnable) { + try { + runnable.run(); + } catch (Exception e) { + fail("Expected no exception, but got: " + e.getMessage()); + } + } + + /** + * Test interceptor that simulates request processing with success/failure + */ + private static class TestRequestInterceptor implements ServerInterceptor { + private final int order; + private final boolean shouldSucceed; + private final String message; + + public TestRequestInterceptor(int order, boolean shouldSucceed, String message) { + this.order = order; + this.shouldSucceed = shouldSucceed; + this.message = message; + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + + // Simulate request processing logic + if (!shouldSucceed) { + // Fail the request with proper gRPC status + call.close(Status.PERMISSION_DENIED.withDescription(message), new Metadata()); + return new ServerCall.Listener() { + }; // Empty listener for failed call + } + + // Success case - continue to next interceptor/handler + return next.startCall(call, headers); + } + + public int getOrder() { + return order; + } + } + + /** + * Test interceptor that throws exceptions during request processing + */ + private static class TestExceptionThrowingInterceptor implements ServerInterceptor { + private final int order; + private final String message; + + public TestExceptionThrowingInterceptor(int order, String message) { + this.order = order; + this.message = message; + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + + // Simulate different types of exceptions that can occur during request processing + if (message.contains("Security")) { + throw new SecurityException(message); + } else if (message.contains("Rate limit")) { + throw new RuntimeException(message); + } else { + throw new IllegalStateException(message); + } + } + + public int getOrder() { + return order; + } + } + + // =========================================== + // Test cases for GrpcInterceptorChain + // =========================================== + + public void testGrpcInterceptorChainWithSuccessfulInterceptors() { + List interceptors = List.of(createTestInterceptor(10, false), createTestInterceptor(20, false)); + + testGrpcInterceptorChain(interceptors, true, null); + } + + public void testGrpcInterceptorChainWithFailingInterceptor() { + List interceptors = List.of( + createTestInterceptor(10, false), // Success + createTestInterceptor(20, true), // Failure + createTestInterceptor(30, false) // Should not execute + ); + + testGrpcInterceptorChain(interceptors, false, "Test failure"); + } + + public void testGrpcInterceptorChainEmptyList() { + List interceptors = List.of(); + testGrpcInterceptorChain(interceptors, true, null); + } + + public void testGrpcInterceptorChainOrdering() { + List executionOrder = new ArrayList<>(); + + List interceptors = List.of( + createTestInterceptorWithCallback(30, executionOrder, "Third"), + createTestInterceptorWithCallback(10, executionOrder, "First"), + createTestInterceptorWithCallback(20, executionOrder, "Second") + ); + + // Sort them as GrpcPlugin would + interceptors = new ArrayList<>(interceptors); + interceptors.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order)); + + testGrpcInterceptorChain(interceptors, true, null); + + // Verify execution order + assertEquals(3, executionOrder.size()); + assertEquals("First", executionOrder.get(0)); + assertEquals("Second", executionOrder.get(1)); + assertEquals("Third", executionOrder.get(2)); + } + + public void testGrpcInterceptorChainIntegrationWithPlugin() { + // Test that GrpcPlugin correctly creates and uses GrpcInterceptorChain + GrpcInterceptorProvider mockProvider = Mockito.mock(GrpcInterceptorProvider.class); + List interceptors = List.of( + createTestInterceptor(10, false), + createTestInterceptor(20, false), + createTestInterceptor(30, false) + ); + when(mockProvider.getOrderedGrpcInterceptors()).thenReturn(interceptors); + + ExtensiblePlugin.ExtensionLoader mockLoader = Mockito.mock(ExtensiblePlugin.ExtensionLoader.class); + when(mockLoader.loadExtensions(QueryBuilderProtoConverter.class)).thenReturn(null); + when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(List.of(mockProvider)); + + GrpcPlugin plugin = new GrpcPlugin(); + + // Should not throw exception and should create chain + assertDoesNotThrow(() -> plugin.loadExtensions(mockLoader)); + } + + public void testGrpcInterceptorChainWithDuplicateOrders() { + // Test that plugin validation catches duplicate orders + GrpcInterceptorProvider mockProvider = Mockito.mock(GrpcInterceptorProvider.class); + List interceptors = List.of( + createTestInterceptor(10, false), + createTestInterceptor(10, false) // Duplicate order + ); + when(mockProvider.getOrderedGrpcInterceptors()).thenReturn(interceptors); + + ExtensiblePlugin.ExtensionLoader mockLoader = Mockito.mock(ExtensiblePlugin.ExtensionLoader.class); + when(mockLoader.loadExtensions(QueryBuilderProtoConverter.class)).thenReturn(null); + when(mockLoader.loadExtensions(GrpcInterceptorProvider.class)).thenReturn(List.of(mockProvider)); + + GrpcPlugin plugin = new GrpcPlugin(); + + // Should throw exception due to duplicate orders + expectThrows(IllegalArgumentException.class, () -> plugin.loadExtensions(mockLoader)); + } + + /** + * Helper method to test GrpcInterceptorChain behavior + */ + private void testGrpcInterceptorChain(List interceptors, boolean shouldSucceed, String expectedErrorMessage) { + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + + @SuppressWarnings("unchecked") + ServerCall mockCall = Mockito.mock(ServerCall.class); + @SuppressWarnings("unchecked") + ServerCallHandler mockHandler = Mockito.mock(ServerCallHandler.class); + @SuppressWarnings("unchecked") + ServerCall.Listener mockListener = Mockito.mock(ServerCall.Listener.class); + Metadata headers = new Metadata(); + + when(mockHandler.startCall(Mockito.any(), Mockito.any())).thenReturn(mockListener); + + // Track if call.close() was called + AtomicBoolean callClosed = new AtomicBoolean(false); + AtomicReference closedStatus = new AtomicReference<>(); + doAnswer(invocation -> { + callClosed.set(true); + closedStatus.set(invocation.getArgument(0)); + return null; + }).when(mockCall).close(Mockito.any(Status.class), Mockito.any(Metadata.class)); + + // Execute the chain + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + if (shouldSucceed) { + assertNotNull("Should return a listener for successful chain", result); + assertFalse("Call should not be closed for successful chain", callClosed.get()); + } else { + // For failure cases, the call should be closed + assertTrue("Call should be closed for failed chain", callClosed.get()); + if (expectedErrorMessage != null) { + assertNotNull("Should have closed status", closedStatus.get()); + assertTrue( + "Error message should contain expected text", + closedStatus.get().getDescription().contains(expectedErrorMessage) + ); + } + } + } + + /** + * Creates a test interceptor that can succeed or fail + */ + private OrderedGrpcInterceptor createTestInterceptor(int order, boolean shouldFail) { + return new OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + if (shouldFail) { + throw new RuntimeException("Test failure"); + } + return next.startCall(call, headers); + } + }; + } + }; + } + + /** + * Creates a test interceptor that tracks execution order + */ + private OrderedGrpcInterceptor createTestInterceptorWithCallback(int order, List executionOrder, String name) { + return new OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + executionOrder.add(name); + return next.startCall(call, headers); + } + }; + } + }; + } + } diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/interceptor/GrpcInterceptorChainTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/interceptor/GrpcInterceptorChainTests.java new file mode 100644 index 0000000000000..540dfb34dfdc1 --- /dev/null +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/interceptor/GrpcInterceptorChainTests.java @@ -0,0 +1,381 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc.interceptor; + +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider.OrderedGrpcInterceptor; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class GrpcInterceptorChainTests extends OpenSearchTestCase { + + @Mock + private ServerCall mockCall; + + @Mock + private ServerCallHandler mockHandler; + + @Mock + private ServerCall.Listener mockListener; + + private Metadata headers; + + @Before + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.openMocks(this); + when(mockHandler.startCall(any(), any())).thenReturn(mockListener); + headers = new Metadata(); + } + + public void testEmptyChain() { + GrpcInterceptorChain chain = new GrpcInterceptorChain(Collections.emptyList()); + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + assertNotNull(result); + assertEquals(mockListener, result); + verify(mockHandler).startCall(mockCall, headers); + } + + public void testSingleSuccessfulInterceptor() { + List interceptors = Arrays.asList(createTestInterceptor(10, false, null)); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + assertNotNull(result); + verify(mockCall, never()).close(any(Status.class), any(Metadata.class)); + } + + public void testMultipleSuccessfulInterceptors() { + List interceptors = Arrays.asList( + createTestInterceptor(10, false, null), + createTestInterceptor(20, false, null), + createTestInterceptor(30, false, null) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + assertNotNull(result); + verify(mockCall, never()).close(any(Status.class), any(Metadata.class)); + } + + // FAILURE SCENARIO TESTS + public void testFirstInterceptorFails() { + List interceptors = Arrays.asList( + createTestInterceptor(10, true, "First failure"), + createTestInterceptor(20, false, null), + createTestInterceptor(30, false, null) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + chain.interceptCall(mockCall, headers, mockHandler); + + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.INTERNAL && status.getDescription().contains("First failure")), + eq(headers) + ); + } + + public void testMiddleInterceptorFails() { + List interceptors = Arrays.asList( + createTestInterceptor(10, false, null), + createTestInterceptor(20, true, "Middle failure"), + createTestInterceptor(30, false, null) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + chain.interceptCall(mockCall, headers, mockHandler); + + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.INTERNAL && status.getDescription().contains("Middle failure")), + eq(headers) + ); + } + + public void testLastInterceptorFails() { + List interceptors = Arrays.asList( + createTestInterceptor(10, false, null), + createTestInterceptor(20, false, null), + createTestInterceptor(30, true, "Last failure") + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + chain.interceptCall(mockCall, headers, mockHandler); + + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.INTERNAL && status.getDescription().contains("Last failure")), + eq(headers) + ); + } + + public void testInterceptorThrowsStatusRuntimeExceptionPermissionDenied() { + List interceptors = Arrays.asList( + createTestInterceptor(10, false, null), + createStatusRuntimeExceptionInterceptor(20, Status.PERMISSION_DENIED.withDescription("Unauthorized access")), + createTestInterceptor(30, false, null) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + assertNotNull(result); + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.PERMISSION_DENIED && status.getDescription().equals("Unauthorized access")), + eq(headers) + ); + } + + public void testInterceptorThrowsStatusRuntimeExceptionUnauthenticated() { + List interceptors = Arrays.asList( + createStatusRuntimeExceptionInterceptor(10, Status.UNAUTHENTICATED.withDescription("Invalid token")), + createTestInterceptor(20, false, null) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + assertNotNull(result); + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.UNAUTHENTICATED && status.getDescription().equals("Invalid token")), + eq(headers) + ); + } + + public void testInterceptorThrowsStatusRuntimeExceptionResourceExhausted() { + List interceptors = Arrays.asList( + createTestInterceptor(10, false, null), + createTestInterceptor(20, false, null), + createStatusRuntimeExceptionInterceptor(30, Status.RESOURCE_EXHAUSTED.withDescription("Rate limit exceeded")) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + + assertNotNull(result); + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.RESOURCE_EXHAUSTED && status.getDescription().equals("Rate limit exceeded")), + eq(headers) + ); + } + + public void testInterceptorOrdering() { + List executionOrder = new ArrayList<>(); + + List interceptors = Arrays.asList( + createOrderTrackingInterceptor(30, executionOrder), + createOrderTrackingInterceptor(10, executionOrder), + createOrderTrackingInterceptor(20, executionOrder) + ); + + // Sort as GrpcPlugin would + interceptors.sort((a, b) -> Integer.compare(a.order(), b.order())); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + chain.interceptCall(mockCall, headers, mockHandler); + + // Verify execution order + assertEquals(Arrays.asList(10, 20, 30), executionOrder); + } + + public void testNullInterceptorList() { + // Constructor should throw NullPointerException for null interceptors + expectThrows(NullPointerException.class, () -> { new GrpcInterceptorChain(null); }); + } + + public void testChainIntegrationWithRealScenario() { + // Simulate a real-world scenario: Auth -> Logging -> Metrics + List executionLog = new ArrayList<>(); + + List interceptors = Arrays.asList( + createLoggingInterceptor(10, "AUTH", executionLog), + createLoggingInterceptor(20, "LOGGING", executionLog), + createLoggingInterceptor(30, "METRICS", executionLog) + ); + + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + chain.interceptCall(mockCall, headers, mockHandler); + + assertEquals(Arrays.asList("AUTH", "LOGGING", "METRICS"), executionLog); + } + + /** + * Generic test method that can be extended for different scenarios + */ + public void testChainWithPattern(List interceptors, boolean expectSuccess, String expectedErrorMessage) { + GrpcInterceptorChain chain = new GrpcInterceptorChain(interceptors); + + if (expectSuccess) { + ServerCall.Listener result = chain.interceptCall(mockCall, headers, mockHandler); + assertNotNull(result); + verify(mockCall, never()).close(any(Status.class), any(Metadata.class)); + } else { + chain.interceptCall(mockCall, headers, mockHandler); + verify(mockCall).close( + argThat(status -> status.getCode() == Status.Code.INTERNAL && status.getDescription().contains(expectedErrorMessage)), + eq(headers) + ); + } + } + + /** + * Test method that can easily be extended with new interceptor combinations + */ + @SuppressWarnings("unchecked") + public void testVariousInterceptorCombinations() { + // Success scenario + testChainWithPattern(Arrays.asList(createTestInterceptor(10, false, null), createTestInterceptor(20, false, null)), true, null); + + // Reset mocks for next test + reset(mockCall); + + // Failure scenario + testChainWithPattern( + Arrays.asList(createTestInterceptor(10, false, null), createTestInterceptor(20, true, "Test failure")), + false, + "Test failure" + ); + } + + // ============================================= + // HELPER METHODS - EASILY EXTENSIBLE + // ============================================= + + /** + * Creates a test interceptor with specified behavior - easily configurable + */ + private OrderedGrpcInterceptor createTestInterceptor(int order, boolean shouldFail, String errorMessage) { + return new OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + if (shouldFail) { + throw new RuntimeException(errorMessage); + } + return next.startCall(call, headers); + } + }; + } + }; + } + + /** + * Creates an interceptor that tracks execution order - useful for ordering tests + */ + private OrderedGrpcInterceptor createOrderTrackingInterceptor(int order, List executionOrder) { + return new OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + executionOrder.add(order); + return next.startCall(call, headers); + } + }; + } + }; + } + + /** + * Creates a logging interceptor - useful for integration tests + */ + private OrderedGrpcInterceptor createLoggingInterceptor(int order, String name, List log) { + return new OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + log.add(name); + return next.startCall(call, headers); + } + }; + } + }; + } + + /** + * Creates an interceptor that throws StatusRuntimeException with specific gRPC status + * This is used to test the StatusRuntimeException handling path (lines 71-79) + */ + private OrderedGrpcInterceptor createStatusRuntimeExceptionInterceptor(int order, Status status) { + return new OrderedGrpcInterceptor() { + @Override + public int order() { + return order; + } + + @Override + public ServerInterceptor getInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next + ) { + throw new StatusRuntimeException(status); + } + }; + } + }; + } +} diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java index a7ac7c8cf80f1..cf35ad449c234 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java @@ -15,6 +15,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain; import org.junit.After; import org.junit.Before; @@ -23,6 +24,7 @@ import java.util.List; import io.grpc.BindableService; +import io.grpc.ServerInterceptor; import io.grpc.StatusRuntimeException; import io.grpc.health.v1.HealthCheckResponse; @@ -36,6 +38,8 @@ public class SecureNetty4GrpcServerTransportTests extends OpenSearchTestCase { private ThreadPool threadPool; private final List services = new ArrayList<>(); + private ServerInterceptor serverInterceptor; + static Settings createSettings() { return Settings.builder().put(SecureNetty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), getPortRange()).build(); } @@ -48,6 +52,7 @@ public void setup() { Settings settings = Settings.builder().put("node.name", "test-node").put("grpc.netty.executor_count", 4).build(); ExecutorBuilder grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc"); threadPool = new ThreadPool(settings, grpcExecutorBuilder); + serverInterceptor = new GrpcInterceptorChain(Collections.emptyList()); } @After @@ -65,7 +70,8 @@ public void testGrpcSecureTransportStartStop() { services, networkService, threadPool, - getServerClientAuthNone() + getServerClientAuthNone(), + serverInterceptor ) ) { transport.start(); @@ -84,7 +90,8 @@ public void testGrpcInsecureAuthTLS() { services, networkService, threadPool, - getServerClientAuthNone() + getServerClientAuthNone(), + serverInterceptor ) ) { transport.start(); @@ -110,7 +117,8 @@ public void testGrpcOptionalAuthTLS() { services, networkService, threadPool, - getServerClientAuthOptional() + getServerClientAuthOptional(), + serverInterceptor ) ) { transport.start(); @@ -141,7 +149,8 @@ public void testGrpcRequiredAuthTLS() { services, networkService, threadPool, - getServerClientAuthRequired() + getServerClientAuthRequired(), + serverInterceptor ) ) { transport.start();