Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
strategy:
matrix:
java: [21]
resource_sharing_flag: [ "", "-Dresource_sharing.enabled=true" ]

name: Run Security Integration Tests on Linux
runs-on: ubuntu-latest
Expand All @@ -45,4 +46,8 @@ jobs:
# switching the user, as OpenSearch cluster can only be started as root/Administrator on linux-deb/linux-rpm/windows-zip.
run: |
chown -R 1000:1000 `pwd`
su `id -un 1000` -c "whoami && java -version && ./gradlew integTest -Dsecurity.enabled=true"
su `id -un 1000` -c "whoami && java -version && ./gradlew integTest \
-Dsecurity.enabled=true \
-Dhttps=true \
${{ matrix.resource_sharing_flag }} \
--tests '*IT'"
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 3.3](https://github.com/opensearch-project/flow-framework/compare/3.2...HEAD)
### Features
- Onboards flow-framework plugin to resource-sharing and access control framework ([#1251](https://github.com/opensearch-project/flow-framework/pull/1251))

### Enhancements
### Bug Fixes
- Pre-create ML Commons indices for Tenant Aware tests ([#1217](https://github.com/opensearch-project/flow-framework/pull/1217))
Expand Down
20 changes: 19 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ opensearchplugin {
classname = "${projectPath}.${pathToPlugin}.${pluginClassName}"
licenseFile = rootProject.file('LICENSE')
noticeFile = rootProject.file('NOTICE')
extendedPlugins = ['opensearch-security;optional=true']
}

dependencyLicenses.enabled = false
Expand Down Expand Up @@ -195,6 +196,9 @@ configurations {
}

dependencies {
// For resource access control
compileOnly group: 'org.opensearch', name:'opensearch-security-spi', version:"3.4.0-SNAPSHOT"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fetch the version from the variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, once main branch has the version bumped I'll revert it.

i explicitly set it in last commit: 69009e6


implementation("org.opensearch:opensearch:${opensearch_version}")
api("org.opensearch:opensearch-ml-client:${opensearch_build}")
// json, jsonpath, and commons-text are required by MLClient but must be provided by calling plugins
Expand Down Expand Up @@ -252,6 +256,8 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:${junitJupiterVersion}")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}")

testImplementation 'org.awaitility:awaitility:4.3.0'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constant for the version

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.


// ZipArchive dependencies used for integration tests
zipArchive("org.opensearch.plugin:opensearch-job-scheduler:${opensearch_build}")
zipArchive("org.opensearch.plugin:opensearch-ml-plugin:${opensearch_build}")
Expand Down Expand Up @@ -347,6 +353,8 @@ integTest {
systemProperty('user', user)
systemProperty('password', password)

systemProperty "resource_sharing.enabled", System.getProperty("resource_sharing.enabled")

// Only tenant aware test if set
if (System.getProperty("tests.rest.tenantaware") == "true") {
filter {
Expand Down Expand Up @@ -506,6 +514,10 @@ testClusters.integTest {
'".plugins-flow-framework-state"' +
']'
)
if (System.getProperty("resource_sharing.enabled") == "true") {
setting("plugins.security.experimental.resource_sharing.enabled", "true")
setting("plugins.security.experimental.resource_sharing.protected_types", "[\"workflow\", \"workflow_state\"]")
}
setSecure(true)
}

Expand Down Expand Up @@ -538,7 +550,13 @@ testClusters.integTest {
if (System.getProperty("opensearch.debug") != null) {
def debugPort = 5005
nodes.forEach { node ->
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
// server=n,suspend=y -> node tries to connect to a debugger and hence test runs fails with
// Exec output and error:
// | Output for ./bin/opensearch-plugin:ERROR: transport error 202: connect failed: Connection refused
// | ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
// | JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized [src/jdk.jdwp.agent/share/native/libjdwp/debugInit.c:700].
// So instead, we listen to a debugger by saying server=y and suspend=n
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the comments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, let's keep it to understand what these changes mean.

node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:${debugPort}")
debugPort += 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.flowframework.transport.GetWorkflowStepAction;
import org.opensearch.flowframework.transport.GetWorkflowStepTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.PluginClient;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowAction;
Expand Down Expand Up @@ -120,6 +121,8 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin, SystemI

private FlowFrameworkSettings flowFrameworkSettings;

private PluginClient pluginClient;

/**
* Instantiate this plugin.
*/
Expand All @@ -143,8 +146,11 @@ public Collection<Object> createComponents(
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
boolean multiTenancyEnabled = FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings);

this.pluginClient = new PluginClient(client);

SdkClient sdkClient = SdkClientFactory.createSdkClient(
client,
pluginClient,
xContentRegistry,
// Here we assume remote metadata client is only used with tenant awareness.
// This may change in the future allowing more options for this map
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework;

import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.util.ResourceSharingClientAccessor;
import org.opensearch.security.spi.resources.ResourceProvider;
import org.opensearch.security.spi.resources.ResourceSharingExtension;
import org.opensearch.security.spi.resources.client.ResourceSharingClient;

import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;

public class FlowFrameworkResourceSharingExtension implements ResourceSharingExtension {
@Override
public Set<ResourceProvider> getResourceProviders() {
return Set.of(
new ResourceProvider(CommonValue.WORKFLOW_RESOURCE_TYPE, GLOBAL_CONTEXT_INDEX),
new ResourceProvider(CommonValue.WORKFLOW_STATE_RESOURCE_TYPE, WORKFLOW_STATE_INDEX)
);
}

@Override
public void assignResourceSharingClient(ResourceSharingClient resourceSharingClient) {
ResourceSharingClientAccessor.getInstance().setResourceSharingClient(resourceSharingClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,10 @@ private CommonValue() {}
*/
/** Version 2.19.0 */
public static final Version VERSION_2_19_0 = Version.fromString("2.19.0");

/*
* Constants associated with resource-sharing
*/
public static final String WORKFLOW_STATE_RESOURCE_TYPE = "workflow_state";
public static final String WORKFLOW_RESOURCE_TYPE = "workflow";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand Down Expand Up @@ -63,6 +64,7 @@
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.getWorkflow;
import static org.opensearch.flowframework.util.ParseUtils.verifyResourceAccessAndProcessRequest;

/**
* Transport Action to index or update a use case template within the Global Context
Expand Down Expand Up @@ -131,13 +133,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
User user = getUserContext(client);
String workflowId = request.getWorkflowId();
try {
resolveUserAndExecute(
user,
workflowId,
tenantId,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> createExecute(request, user, tenantId, listener)
verifyResourceAccessAndProcessRequest(
CommonValue.WORKFLOW_RESOURCE_TYPE,
() -> createExecute(request, user, tenantId, listener),
() -> resolveUserAndExecute(
user,
workflowId,
tenantId,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> createExecute(request, user, tenantId, listener)
)
);
} catch (Exception e) {
logger.error("Failed to create workflow", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand All @@ -39,6 +40,7 @@
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.flowframework.util.ParseUtils.verifyResourceAccessAndProcessRequest;

/**
* Transport action to retrieve a use case template within the Global Context
Expand Down Expand Up @@ -104,21 +106,24 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();

resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
clearStatus,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
verifyResourceAccessAndProcessRequest(
CommonValue.WORKFLOW_RESOURCE_TYPE,
() -> executeDeleteRequest(request, tenantId, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
() -> resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
clearStatus,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeDeleteRequest(request, tenantId, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
)
);

} else {
String errorMessage = "There are no templates in the global context";
logger.error(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand Down Expand Up @@ -64,6 +65,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.flowframework.util.ParseUtils.verifyResourceAccessAndProcessRequest;

/**
* Transport Action to deprovision a workflow from a stored use case template
Expand Down Expand Up @@ -142,21 +144,24 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

// Stash thread context to interact with system index
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
true,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
verifyResourceAccessAndProcessRequest(
CommonValue.WORKFLOW_RESOURCE_TYPE,
() -> executeDeprovisionRequest(request, tenantId, listener, context, user),
client,
sdkClient,
clusterService,
xContentRegistry
() -> resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
true,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeDeprovisionRequest(request, tenantId, listener, context, user),
client,
sdkClient,
clusterService,
xContentRegistry
)
);

} catch (Exception e) {
String errorMessage = "Failed to retrieve template from global context.";
logger.error(errorMessage, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand All @@ -33,6 +34,7 @@

import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.flowframework.util.ParseUtils.verifyResourceAccessAndProcessRequest;

//TODO: Currently we only get the workflow status but we should change to be able to get the
// full template as well
Expand Down Expand Up @@ -98,21 +100,24 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {

resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
true,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
verifyResourceAccessAndProcessRequest(
CommonValue.WORKFLOW_STATE_RESOURCE_TYPE,
() -> executeGetWorkflowStateRequest(request, tenantId, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
() -> resolveUserAndExecute(
user,
workflowId,
tenantId,
filterByEnabled,
true,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeGetWorkflowStateRequest(request, tenantId, listener, context),
client,
sdkClient,
clusterService,
xContentRegistry
)
);

} catch (Exception e) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage("Failed to get workflow: {}", workflowId)
.getFormattedMessage();
Expand Down
Loading
Loading