Skip to content

Commit d8a5a4b

Browse files
authored
Implement multi tenancy in Flow Framework (#980)
* Import SdkClient and inject it Signed-off-by: Daniel Widdis <[email protected]> # Conflicts: # build.gradle # Conflicts: # build.gradle * Pass sdkClient to IndicesHandler and EncryptorUtils classes Signed-off-by: Daniel Widdis <[email protected]> * Extract tenant id from REST header into RestAction Signed-off-by: Daniel Widdis <[email protected]> * Pass tenant id to transport actions in template Signed-off-by: Daniel Widdis <[email protected]> * Validate tenant id existence in workflow transport actions Signed-off-by: Daniel Widdis <[email protected]> * Pass SdkClient and tenant id to util used for access control checks Signed-off-by: Daniel Widdis <[email protected]> # Conflicts: # src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java # src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java # src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java # src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java # src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java # src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java # src/main/java/org/opensearch/flowframework/util/ParseUtils.java * Perform tenant id validation checks for workflow APIs Signed-off-by: Daniel Widdis <[email protected]> * Migrate Update workflow get action to SdkCleint Signed-off-by: Daniel Widdis <[email protected]> * Pass tenantId to IndicesHandler and use in EncryptorUtils Signed-off-by: Daniel Widdis <[email protected]> * Migrate EncryptorUtils getting master key from index Signed-off-by: Daniel Widdis <[email protected]> * Refactor fetching master key to permit reuse Signed-off-by: Daniel Widdis <[email protected]> * Refactor initializeMasterKey to use common code Signed-off-by: Daniel Widdis <[email protected]> * Migrate indexing new key to config Signed-off-by: Daniel Widdis <[email protected]> * Migrate template indexing to sdkClient Signed-off-by: Daniel Widdis <[email protected]> * Migrate template deletion to sdkClient Signed-off-by: Daniel Widdis <[email protected]> * Migrate get template to sdkClient Signed-off-by: Daniel Widdis <[email protected]> * Migrate provision template to sdkClient Signed-off-by: Daniel Widdis <[email protected]> * Migrate max workflow search to sdkClient Signed-off-by: Daniel Widdis <[email protected]> * Add tenantId to GetWorkflowStateRequest Signed-off-by: Daniel Widdis <[email protected]> * Migrate GetWorkflowStateRequest to multitenant client Signed-off-by: Daniel Widdis <[email protected]> * Migrate getProvisioningProgress to avoid repetition Signed-off-by: Daniel Widdis <[email protected]> * Migrate canDeleteWorkflowStateDoc to avoid repetition Signed-off-by: Daniel Widdis <[email protected]> * Migrate initial state document creation to metadata client Signed-off-by: Daniel Widdis <[email protected]> * Migrate state document deletion to metadata client Signed-off-by: Daniel Widdis <[email protected]> * Add Tenant aware Rest Tests for Workflows Signed-off-by: Daniel Widdis <[email protected]> * Fix javadocs Signed-off-by: Daniel Widdis <[email protected]> * Add publishToMavenLocal for more CI Signed-off-by: Daniel Widdis <[email protected]> * Fix some CI Signed-off-by: Daniel Widdis <[email protected]> * Enable tenant aware search Signed-off-by: Daniel Widdis <[email protected]> * Refactor state index update method using multitenant client Signed-off-by: Daniel Widdis <[email protected]> * Get metadata client artifacts from Maven Snapshot Signed-off-by: Daniel Widdis <[email protected]> * Update tests for new update async code Signed-off-by: Daniel Widdis <[email protected]> * Switch SdkClient to use default generic thread executor Signed-off-by: Daniel Widdis <[email protected]> * Migrate last updates to sdkClient Signed-off-by: Daniel Widdis <[email protected]> * Revert (most) changes to unit tests based on async client changes Signed-off-by: Daniel Widdis <[email protected]> * Pass tenant id when updating state during provisioning Signed-off-by: Daniel Widdis <[email protected]> * Integrate tenantId with synchronous provisioning Signed-off-by: Daniel Widdis <[email protected]> * Fix failing integ tests after rebase, code review updates Signed-off-by: Daniel Widdis <[email protected]> * Replace fakeTenantId placeholders with actual tenant id Signed-off-by: Daniel Widdis <[email protected]> * Use version catalog for commons-lang3 and httpcore dependencies Signed-off-by: Daniel Widdis <[email protected]> * Exclude transitive httpclient dependency from metadata and rest client Signed-off-by: Daniel Widdis <[email protected]> * Fix more test errors and tweak dependencies Signed-off-by: Daniel Widdis <[email protected]> * More code review comments and refactoring Signed-off-by: Daniel Widdis <[email protected]> --------- Signed-off-by: Daniel Widdis <[email protected]>
1 parent 1375ee6 commit d8a5a4b

File tree

84 files changed

+3377
-1097
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+3377
-1097
lines changed

.github/workflows/CI.yml

+19
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,22 @@ jobs:
9898
- name: Build and Run Tests
9999
run: |
100100
./gradlew integTest -PnumNodes=3
101+
integTenantAwareTest:
102+
needs: [spotless, javadoc]
103+
strategy:
104+
fail-fast: false
105+
matrix:
106+
os: [ubuntu-latest, macos-latest, windows-latest]
107+
java: [21]
108+
name: Tenant Aware Integ Test JDK${{ matrix.java }}, ${{ matrix.os }}
109+
runs-on: ${{ matrix.os }}
110+
steps:
111+
- uses: actions/checkout@v4
112+
- name: Set up JDK ${{ matrix.java }}
113+
uses: actions/setup-java@v4
114+
with:
115+
java-version: ${{ matrix.java }}
116+
distribution: temurin
117+
- name: Build and Run Tests
118+
run: |
119+
./gradlew integTest "-Dtests.rest.tenantaware=true"

.github/workflows/test_security.yml

-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ jobs:
3434
steps:
3535
- name: Run start commands
3636
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}
37-
3837
- name: Checkout Flow Framework
3938
uses: actions/checkout@v4
4039
- name: Setup Java ${{ matrix.java }}

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1818

1919
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.18...2.x)
2020
### Features
21+
- Add multitenant remote metadata client ([#980](https://github.com/opensearch-project/flow-framework/pull/980))
2122
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))
2223

2324
### Enhancements

build.gradle

+43-7
Original file line numberDiff line numberDiff line change
@@ -167,17 +167,18 @@ configurations {
167167

168168
dependencies {
169169
implementation "org.opensearch:opensearch:${opensearch_version}"
170-
implementation 'org.junit.jupiter:junit-jupiter:5.11.4'
171170
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
172-
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
173-
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0'
171+
api(group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}") {
172+
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
173+
}
174+
implementation "org.apache.commons:commons-lang3:${versions.commonslang}"
174175
implementation "org.opensearch:common-utils:${common_utils_version}"
175176
implementation "com.amazonaws:aws-encryption-sdk-java:3.0.1"
176177
implementation "software.amazon.cryptography:aws-cryptographic-material-providers:1.8.0"
177178
implementation "org.dafny:DafnyRuntime:4.9.1"
178179
implementation "software.amazon.smithy.dafny:conversion:0.1.1"
179180
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
180-
api "org.apache.httpcomponents.core5:httpcore5:5.3.2"
181+
api "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
181182
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
182183
implementation "org.glassfish:jakarta.json:2.0.1"
183184
implementation "org.eclipse:yasson:3.0.4"
@@ -188,7 +189,11 @@ dependencies {
188189
implementation "io.swagger.parser.v3:swagger-parser-core:${swaggerVersion}"
189190
implementation "io.swagger.parser.v3:swagger-parser:${swaggerVersion}"
190191
implementation "io.swagger.parser.v3:swagger-parser-v3:${swaggerVersion}"
191-
// Declare Jackson dependencies for tests (from OpenSearch version catalog)
192+
// Multi-tenant SDK Client
193+
implementation ("org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}") {
194+
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
195+
}
196+
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.4'
192197
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
193198
testImplementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
194199
testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}"
@@ -202,7 +207,6 @@ dependencies {
202207
configurations.all {
203208
resolutionStrategy {
204209
force("com.google.guava:guava:33.4.0-jre") // CVE for 31.1, keep to force transitive dependencies
205-
force("org.apache.httpcomponents.core5:httpcore5:5.3.2") // Dependency Jar Hell
206210
}
207211
}
208212
}
@@ -262,10 +266,19 @@ integTest {
262266
systemProperty('user', user)
263267
systemProperty('password', password)
264268

269+
// Only tenant aware test if set
270+
if (System.getProperty("tests.rest.tenantaware") == "true") {
271+
filter {
272+
includeTestsMatching "org.opensearch.flowframework.*TenantAwareIT"
273+
}
274+
systemProperty "plugins.flow_framework.multi_tenancy_enabled", "true"
275+
}
276+
265277
// Only rest case can run with remote cluster
266-
if (System.getProperty("tests.rest.cluster") != null) {
278+
if (System.getProperty("tests.rest.cluster") != null && System.getProperty("tests.rest.tenantaware") == null) {
267279
filter {
268280
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
281+
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
269282
}
270283
}
271284

@@ -288,11 +301,34 @@ integTest {
288301
filter {
289302
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
290303
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkRestApiIT"
304+
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
291305
}
292306
}
293307

294308
// doFirst delays this block until execution time
295309
doFirst {
310+
if (System.getProperty("tests.rest.tenantaware") == "true") {
311+
def ymlFile = file("$buildDir/testclusters/integTest-0/config/opensearch.yml")
312+
if (ymlFile.exists()) {
313+
ymlFile.withWriterAppend {
314+
writer ->
315+
writer.write("\n# Set multitenancy\n")
316+
writer.write("plugins.flow_framework.multi_tenancy_enabled: true\n")
317+
}
318+
// TODO this properly uses the remote client factory but needs a remote cluster set up
319+
// TODO get the endpoint from a system property
320+
if (System.getProperty("tests.rest.cluster") != null) {
321+
ymlFile.withWriterAppend { writer ->
322+
writer.write("\n# Use a remote cluster\n")
323+
writer.write("plugins.flow_framework.remote_metadata_type: RemoteOpenSearch\n")
324+
writer.write("plugins.flow_framework.remote_metadata_endpoint: https://127.0.0.1:9200\n")
325+
}
326+
}
327+
} else {
328+
throw new GradleException("opensearch.yml not found at: $ymlFile")
329+
}
330+
}
331+
296332
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
297333
// use longer timeouts for requests.
298334
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

+54-7
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import org.opensearch.plugins.ActionPlugin;
6666
import org.opensearch.plugins.Plugin;
6767
import org.opensearch.plugins.SystemIndexPlugin;
68+
import org.opensearch.remote.metadata.client.SdkClient;
69+
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
6870
import org.opensearch.repositories.RepositoriesService;
6971
import org.opensearch.rest.RestController;
7072
import org.opensearch.rest.RestHandler;
@@ -75,22 +77,36 @@
7577
import org.opensearch.watcher.ResourceWatcherService;
7678

7779
import java.util.Collection;
80+
import java.util.Collections;
7881
import java.util.List;
82+
import java.util.Map;
7983
import java.util.function.Supplier;
8084

8185
import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX;
8286
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
8387
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
8488
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
8589
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
90+
import static org.opensearch.flowframework.common.CommonValue.TENANT_ID_FIELD;
8691
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
8792
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
8893
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
8994
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
95+
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED;
9096
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
9197
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
98+
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_ENDPOINT;
99+
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_REGION;
100+
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_SERVICE_NAME;
101+
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_TYPE;
92102
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
93103
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
104+
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
105+
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
106+
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
107+
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
108+
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
109+
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY;
94110

95111
/**
96112
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
@@ -121,9 +137,28 @@ public Collection<Object> createComponents(
121137
Settings settings = environment.settings();
122138
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
123139
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
124-
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry);
140+
SdkClient sdkClient = SdkClientFactory.createSdkClient(
141+
client,
142+
xContentRegistry,
143+
// Here we assume remote metadata client is only used with tenant awareness.
144+
// This may change in the future allowing more options for this map
145+
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings)
146+
? Map.ofEntries(
147+
Map.entry(REMOTE_METADATA_TYPE_KEY, REMOTE_METADATA_TYPE.get(settings)),
148+
Map.entry(REMOTE_METADATA_ENDPOINT_KEY, REMOTE_METADATA_ENDPOINT.get(settings)),
149+
Map.entry(REMOTE_METADATA_REGION_KEY, REMOTE_METADATA_REGION.get(settings)),
150+
Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, REMOTE_METADATA_SERVICE_NAME.get(settings)),
151+
Map.entry(TENANT_AWARE_KEY, "true"),
152+
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)
153+
)
154+
: Collections.emptyMap(),
155+
// TODO: Find a better thread pool or make one
156+
client.threadPool().executor(ThreadPool.Names.GENERIC)
157+
);
158+
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry);
125159
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
126160
client,
161+
sdkClient,
127162
clusterService,
128163
encryptorUtils,
129164
xContentRegistry
@@ -137,15 +172,22 @@ public Collection<Object> createComponents(
137172
);
138173
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);
139174

140-
SearchHandler searchHandler = new SearchHandler(settings, clusterService, client, FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES);
175+
SearchHandler searchHandler = new SearchHandler(
176+
settings,
177+
clusterService,
178+
client,
179+
sdkClient,
180+
FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES
181+
);
141182

142183
return List.of(
143184
workflowStepFactory,
144185
workflowProcessSorter,
145186
encryptorUtils,
146187
flowFrameworkIndicesHandler,
147188
searchHandler,
148-
flowFrameworkSettings
189+
flowFrameworkSettings,
190+
sdkClient
149191
);
150192
}
151193

@@ -196,7 +238,12 @@ public List<Setting<?>> getSettings() {
196238
MAX_WORKFLOW_STEPS,
197239
WORKFLOW_REQUEST_TIMEOUT,
198240
TASK_REQUEST_RETRY_DURATION,
199-
FILTER_BY_BACKEND_ROLES
241+
FILTER_BY_BACKEND_ROLES,
242+
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED,
243+
REMOTE_METADATA_TYPE,
244+
REMOTE_METADATA_ENDPOINT,
245+
REMOTE_METADATA_REGION,
246+
REMOTE_METADATA_SERVICE_NAME
200247
);
201248
}
202249

@@ -206,21 +253,21 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
206253
new ScalingExecutorBuilder(
207254
WORKFLOW_THREAD_POOL,
208255
1,
209-
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
256+
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
210257
TimeValue.timeValueMinutes(1),
211258
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
212259
),
213260
new ScalingExecutorBuilder(
214261
PROVISION_WORKFLOW_THREAD_POOL,
215262
1,
216-
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
263+
Math.max(8, OpenSearchExecutors.allocatedProcessors(settings) - 1),
217264
TimeValue.timeValueMinutes(5),
218265
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
219266
),
220267
new ScalingExecutorBuilder(
221268
DEPROVISION_WORKFLOW_THREAD_POOL,
222269
1,
223-
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
270+
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
224271
TimeValue.timeValueMinutes(1),
225272
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
226273
)

src/main/java/org/opensearch/flowframework/common/CommonValue.java

+11
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
*/
99
package org.opensearch.flowframework.common;
1010

11+
import org.opensearch.Version;
12+
1113
/**
1214
* Representation of common values that are used across project
1315
*/
@@ -82,6 +84,10 @@ private CommonValue() {}
8284
public static final String USE_CASE = "use_case";
8385
/** The param name for reprovisioning, used by the create workflow API */
8486
public static final String REPROVISION_WORKFLOW = "reprovision";
87+
/** The REST header containing the tenant id */
88+
public static final String TENANT_ID_HEADER = "x-tenant-id";
89+
/** The field name containing the tenant id */
90+
public static final String TENANT_ID_FIELD = "tenant_id";
8591

8692
/*
8793
* Constants associated with plugin configuration
@@ -244,4 +250,9 @@ private CommonValue() {}
244250
public static final String ML_COMMONS_API_SPEC_YAML_URI =
245251
"https://raw.githubusercontent.com/opensearch-project/opensearch-api-specification/refs/heads/main/spec/namespaces/ml.yaml";
246252

253+
/*
254+
* Constants associated with non-BWC features
255+
*/
256+
/** Version 2.19.0 */
257+
public static final Version VERSION_2_19_0 = Version.fromString("2.19.0");
247258
}

0 commit comments

Comments
 (0)