Skip to content

Commit 3019fb8

Browse files
authored
Substitute REST path or body parameters in Workflow Steps (#525)
* Include params map in WorkflowRequest when provisioning Signed-off-by: Daniel Widdis <[email protected]> * Pass params to ProcessNode Signed-off-by: Daniel Widdis <[email protected]> * Pass params to WorkflowSteps Signed-off-by: Daniel Widdis <[email protected]> * Substitute params Signed-off-by: Daniel Widdis <[email protected]> * Add change log Signed-off-by: Daniel Widdis <[email protected]> * Improve param consuming checks, add coverage Signed-off-by: Daniel Widdis <[email protected]> * Allow specifying key-value pairs in body Signed-off-by: Daniel Widdis <[email protected]> * Update title in change log Signed-off-by: Daniel Widdis <[email protected]> * Refactor param and content map generation to a new method Signed-off-by: Daniel Widdis <[email protected]> --------- Signed-off-by: Daniel Widdis <[email protected]>
1 parent 24bf51a commit 3019fb8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+446
-90
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1515
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
1616
### Features
1717
### Enhancements
18+
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
19+
1820
### Bug Fixes
1921
### Infrastructure
2022
### Documentation

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import java.io.IOException;
3030
import java.util.List;
3131
import java.util.Locale;
32+
import java.util.Map;
33+
import java.util.function.Function;
34+
import java.util.stream.Collectors;
3235

3336
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
3437
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
@@ -75,6 +78,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
7578
String workflowId = request.param(WORKFLOW_ID);
7679
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
7780
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
81+
final List<String> validCreateParams = List.of(WORKFLOW_ID, VALIDATION, PROVISION_WORKFLOW);
82+
// If provisioning, consume all other params and pass to provision transport action
83+
Map<String, String> params = provision
84+
? request.params()
85+
.keySet()
86+
.stream()
87+
.filter(k -> !validCreateParams.contains(k))
88+
.collect(Collectors.toMap(Function.identity(), request::param))
89+
: request.params()
90+
.entrySet()
91+
.stream()
92+
.filter(e -> !validCreateParams.contains(e.getKey()))
93+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
7894
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
7995
FlowFrameworkException ffe = new FlowFrameworkException(
8096
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
@@ -84,12 +100,24 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
84100
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
85101
);
86102
}
103+
if (!provision && !params.isEmpty()) {
104+
// Consume params and content so custom exception is processed
105+
params.keySet().stream().forEach(request::param);
106+
request.content();
107+
FlowFrameworkException ffe = new FlowFrameworkException(
108+
"Only the parameters " + validCreateParams + " are permitted unless the provision parameter is set to true.",
109+
RestStatus.BAD_REQUEST
110+
);
111+
return channel -> channel.sendResponse(
112+
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
113+
);
114+
}
87115
try {
88116
XContentParser parser = request.contentParser();
89117
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
90118
Template template = Template.parse(parser);
91119

92-
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision);
120+
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision, params);
93121

94122
return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
95123
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

+34-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.core.rest.RestStatus;
1717
import org.opensearch.core.xcontent.ToXContent;
1818
import org.opensearch.core.xcontent.XContentBuilder;
19+
import org.opensearch.core.xcontent.XContentParser;
1920
import org.opensearch.flowframework.common.FlowFrameworkSettings;
2021
import org.opensearch.flowframework.exception.FlowFrameworkException;
2122
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
@@ -27,7 +28,11 @@
2728
import java.io.IOException;
2829
import java.util.List;
2930
import java.util.Locale;
31+
import java.util.Map;
32+
import java.util.function.Function;
33+
import java.util.stream.Collectors;
3034

35+
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
3136
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
3237
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
3338
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
@@ -69,23 +74,19 @@ public List<Route> routes() {
6974
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
7075
String workflowId = request.param(WORKFLOW_ID);
7176
try {
77+
Map<String, String> params = parseParamsAndContent(request);
7278
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
7379
throw new FlowFrameworkException(
7480
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
7581
RestStatus.FORBIDDEN
7682
);
7783
}
78-
// Validate content
79-
if (request.hasContent()) {
80-
// BaseRestHandler will give appropriate error message
81-
return channel -> channel.sendResponse(null);
82-
}
8384
// Validate params
8485
if (workflowId == null) {
8586
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
8687
}
8788
// Create request and provision
88-
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
89+
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params);
8990
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
9091
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
9192
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
@@ -108,4 +109,31 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
108109
}
109110
}
110111

112+
private Map<String, String> parseParamsAndContent(RestRequest request) {
113+
// Get any other params from path
114+
Map<String, String> params = request.params()
115+
.keySet()
116+
.stream()
117+
.filter(k -> !WORKFLOW_ID.equals(k))
118+
.collect(Collectors.toMap(Function.identity(), request::param));
119+
// If body is included get any params from body
120+
if (request.hasContent()) {
121+
try (XContentParser parser = request.contentParser()) {
122+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
123+
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
124+
String key = parser.currentName();
125+
if (params.containsKey(key)) {
126+
throw new FlowFrameworkException("Duplicate key " + key, RestStatus.BAD_REQUEST);
127+
}
128+
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
129+
throw new FlowFrameworkException("Request body fields must have string values", RestStatus.BAD_REQUEST);
130+
}
131+
params.put(key, parser.text());
132+
}
133+
} catch (IOException e) {
134+
throw new FlowFrameworkException("Request body parsing failed", RestStatus.BAD_REQUEST);
135+
}
136+
}
137+
return params;
138+
}
111139
}

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.transport.TransportService;
4040

4141
import java.util.Arrays;
42+
import java.util.Collections;
4243
import java.util.List;
4344
import java.util.Map;
4445

@@ -282,7 +283,7 @@ void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, ActionList
282283

283284
private void validateWorkflows(Template template) throws Exception {
284285
for (Workflow workflow : template.workflows().values()) {
285-
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null);
286+
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap());
286287
workflowProcessSorter.validate(sortedNodes, pluginsService);
287288
}
288289
}

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

+2
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ private void executeDeprovisionSequence(
140140
deprovisionStepId,
141141
workflowStepFactory.createStep(deprovisionStep),
142142
Collections.emptyMap(),
143+
Collections.emptyMap(),
143144
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
144145
Collections.emptyList(),
145146
this.threadPool,
@@ -194,6 +195,7 @@ private void executeDeprovisionSequence(
194195
pn.id(),
195196
workflowStepFactory.createStep(pn.workflowStep().getName()),
196197
pn.previousNodeInputs(),
198+
pn.params(),
197199
pn.input(),
198200
pn.predecessors(),
199201
this.threadPool,

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
126126

127127
// Sort and validate graph
128128
Workflow provisionWorkflow = template.workflows().get(PROVISION_WORKFLOW);
129-
List<ProcessNode> provisionProcessSequence = workflowProcessSorter.sortProcessNodes(provisionWorkflow, workflowId);
129+
List<ProcessNode> provisionProcessSequence = workflowProcessSorter.sortProcessNodes(
130+
provisionWorkflow,
131+
workflowId,
132+
request.getParams()
133+
);
130134
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);
131135

132136
flowFrameworkIndicesHandler.isWorkflowNotStarted(workflowId, workflowIsNotStarted -> {

src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.opensearch.flowframework.model.Template;
1717

1818
import java.io.IOException;
19+
import java.util.Collections;
20+
import java.util.Map;
1921

2022
/**
2123
* Transport Request to create, provision, and deprovision a workflow
@@ -43,12 +45,27 @@ public class WorkflowRequest extends ActionRequest {
4345
private boolean provision;
4446

4547
/**
46-
* Instantiates a new WorkflowRequest, set validation to false and set requestTimeout and maxWorkflows to null
48+
* Params map
49+
*/
50+
private Map<String, String> params;
51+
52+
/**
53+
* Instantiates a new WorkflowRequest, set validation to all, no provisioning
4754
* @param workflowId the documentId of the workflow
4855
* @param template the use case template which describes the workflow
4956
*/
5057
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) {
51-
this(workflowId, template, new String[] { "all" }, false);
58+
this(workflowId, template, new String[] { "all" }, false, Collections.emptyMap());
59+
}
60+
61+
/**
62+
* Instantiates a new WorkflowRequest with params map, set validation to all, provisioning to true
63+
* @param workflowId the documentId of the workflow
64+
* @param template the use case template which describes the workflow
65+
* @param params The parameters from the REST path
66+
*/
67+
public WorkflowRequest(String workflowId, @Nullable Template template, Map<String, String> params) {
68+
this(workflowId, template, new String[] { "all" }, true, params);
5269
}
5370

5471
/**
@@ -57,12 +74,23 @@ public WorkflowRequest(@Nullable String workflowId, @Nullable Template template)
5774
* @param template the use case template which describes the workflow
5875
* @param validation flag to indicate if validation is necessary
5976
* @param provision flag to indicate if provision is necessary
77+
* @param params map of REST path params. If provision is false, must be an empty map.
6078
*/
61-
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, String[] validation, boolean provision) {
79+
public WorkflowRequest(
80+
@Nullable String workflowId,
81+
@Nullable Template template,
82+
String[] validation,
83+
boolean provision,
84+
Map<String, String> params
85+
) {
6286
this.workflowId = workflowId;
6387
this.template = template;
6488
this.validation = validation;
6589
this.provision = provision;
90+
if (!provision && !params.isEmpty()) {
91+
throw new IllegalArgumentException("Params may only be included when provisioning.");
92+
}
93+
this.params = params;
6694
}
6795

6896
/**
@@ -77,6 +105,7 @@ public WorkflowRequest(StreamInput in) throws IOException {
77105
this.template = templateJson == null ? null : Template.parse(templateJson);
78106
this.validation = in.readStringArray();
79107
this.provision = in.readBoolean();
108+
this.params = this.provision ? in.readMap(StreamInput::readString, StreamInput::readString) : Collections.emptyMap();
80109
}
81110

82111
/**
@@ -113,13 +142,24 @@ public boolean isProvision() {
113142
return this.provision;
114143
}
115144

145+
/**
146+
* Gets the params map
147+
* @return the params map
148+
*/
149+
public Map<String, String> getParams() {
150+
return Map.copyOf(this.params);
151+
}
152+
116153
@Override
117154
public void writeTo(StreamOutput out) throws IOException {
118155
super.writeTo(out);
119156
out.writeOptionalString(workflowId);
120157
out.writeOptionalString(template == null ? null : template.toJson());
121158
out.writeStringArray(validation);
122159
out.writeBoolean(provision);
160+
if (provision) {
161+
out.writeMap(params, StreamOutput::writeString, StreamOutput::writeString);
162+
}
123163
}
124164

125165
@Override

src/main/java/org/opensearch/flowframework/util/ParseUtils.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ public static Map<String, String> getStringToStringMap(Object map, String fieldN
248248
* @param currentNodeInputs Input params and content for this node, from workflow parsing
249249
* @param outputs WorkflowData content of previous steps
250250
* @param previousNodeInputs Input params for this node that come from previous steps
251+
* @param params Params that came from REST path
251252
* @return A map containing the requiredInputKeys with their corresponding values,
252253
* and optionalInputKeys with their corresponding values if present.
253254
* Throws a {@link FlowFrameworkException} if a required key is not present.
@@ -257,7 +258,8 @@ public static Map<String, Object> getInputsFromPreviousSteps(
257258
Set<String> optionalInputKeys,
258259
WorkflowData currentNodeInputs,
259260
Map<String, WorkflowData> outputs,
260-
Map<String, String> previousNodeInputs
261+
Map<String, String> previousNodeInputs,
262+
Map<String, String> params
261263
) {
262264
// Mutable set to ensure all required keys are used
263265
Set<String> requiredKeys = new HashSet<>(requiredInputKeys);
@@ -308,11 +310,11 @@ public static Map<String, Object> getInputsFromPreviousSteps(
308310
Map<String, Object> valueMap = (Map<String, Object>) value;
309311
value = valueMap.entrySet()
310312
.stream()
311-
.collect(Collectors.toMap(Map.Entry::getKey, e -> conditionallySubstitute(e.getValue(), outputs)));
313+
.collect(Collectors.toMap(Map.Entry::getKey, e -> conditionallySubstitute(e.getValue(), outputs, params)));
312314
} else if (value instanceof List) {
313-
value = ((List<?>) value).stream().map(v -> conditionallySubstitute(v, outputs)).collect(Collectors.toList());
315+
value = ((List<?>) value).stream().map(v -> conditionallySubstitute(v, outputs, params)).collect(Collectors.toList());
314316
} else {
315-
value = conditionallySubstitute(value, outputs);
317+
value = conditionallySubstitute(value, outputs, params);
316318
}
317319
// Add value to inputs and mark that a required key was present
318320
inputs.put(key, value);
@@ -336,15 +338,21 @@ public static Map<String, Object> getInputsFromPreviousSteps(
336338
return inputs;
337339
}
338340

339-
private static Object conditionallySubstitute(Object value, Map<String, WorkflowData> outputs) {
341+
private static Object conditionallySubstitute(Object value, Map<String, WorkflowData> outputs, Map<String, String> params) {
340342
if (value instanceof String) {
341343
Matcher m = SUBSTITUTION_PATTERN.matcher((String) value);
342344
if (m.matches()) {
345+
// Try matching a previous step+value pair
343346
WorkflowData data = outputs.get(m.group(1));
344347
if (data != null && data.getContent().containsKey(m.group(2))) {
345348
return data.getContent().get(m.group(2));
346349
}
347350
}
351+
// Replace all params if present
352+
for (Entry<String, String> e : params.entrySet()) {
353+
String regex = "\\$\\{\\{\\s*" + Pattern.quote(e.getKey()) + "\\s*\\}\\}";
354+
value = ((String) value).replaceAll(regex, e.getValue());
355+
}
348356
}
349357
return value;
350358
}

src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public PlainActionFuture<WorkflowData> execute(
7979
String currentNodeId,
8080
WorkflowData currentNodeInputs,
8181
Map<String, WorkflowData> outputs,
82-
Map<String, String> previousNodeInputs
82+
Map<String, String> previousNodeInputs,
83+
Map<String, String> params
8384
) {
8485

8586
PlainActionFuture<WorkflowData> registerLocalModelFuture = PlainActionFuture.newFuture();
@@ -90,7 +91,8 @@ public PlainActionFuture<WorkflowData> execute(
9091
getOptionalKeys(),
9192
currentNodeInputs,
9293
outputs,
93-
previousNodeInputs
94+
previousNodeInputs,
95+
params
9496
);
9597

9698
// Extract common fields of OS provided text-embedding, sparse encoding and custom models

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public PlainActionFuture<WorkflowData> execute(
7474
String currentNodeId,
7575
WorkflowData currentNodeInputs,
7676
Map<String, WorkflowData> outputs,
77-
Map<String, String> previousNodeInputs
77+
Map<String, String> previousNodeInputs,
78+
Map<String, String> params
7879
) {
7980
PlainActionFuture<WorkflowData> createConnectorFuture = PlainActionFuture.newFuture();
8081

@@ -138,7 +139,8 @@ public void onFailure(Exception e) {
138139
optionalKeys,
139140
currentNodeInputs,
140141
outputs,
141-
previousNodeInputs
142+
previousNodeInputs,
143+
params
142144
);
143145

144146
String name = (String) inputs.get(NAME_FIELD);

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public PlainActionFuture<WorkflowData> execute(
6363
String currentNodeId,
6464
WorkflowData currentNodeInputs,
6565
Map<String, WorkflowData> outputs,
66-
Map<String, String> previousNodeInputs
66+
Map<String, String> previousNodeInputs,
67+
Map<String, String> params
6768
) {
6869
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();
6970
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

0 commit comments

Comments
 (0)