Skip to content

Commit 3923f34

Browse files
authoredFeb 23, 2024··
Add an HttpHost Workflow Step (#530)
* Add an HttpHost Workflow Step Signed-off-by: Daniel Widdis <widdis@gmail.com> * Rebase, add TODOs Signed-off-by: Daniel Widdis <widdis@gmail.com> --------- Signed-off-by: Daniel Widdis <widdis@gmail.com>
1 parent 3019fb8 commit 3923f34

File tree

7 files changed

+271
-61
lines changed

7 files changed

+271
-61
lines changed
 

‎CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1414

1515
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
1616
### Features
17+
- Add HttpHost WorkflowStep ([#530](https://github.com/opensearch-project/flow-framework/pull/530))
18+
1719
### Enhancements
1820
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
1921

‎DEVELOPER_GUIDE.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ snapshots/
105105
### Adding Workflow Steps
106106

107107
To add functionality to workflows, add new Workflow Steps to the [`org.opensearch.flowframework.workflow`](https://github.com/opensearch-project/flow-framework/tree/main/src/main/java/org/opensearch/flowframework/workflow) package.
108-
1. Implement the [Workflow](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java) interface. See existing steps for examples for input, output, and API execution.
108+
1. Implement the [WorkflowStep](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java) interface. See existing steps for examples for input, output, and API execution.
109109
2. Choose a unique name for the step which is not used by other steps. This will align with the `step_type` field in the templates and should be descriptive of what the step does.
110110
3. Add a constructor and call it from the [WorkflowStepFactory](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java).
111111
4. Add an entry to the [WorkflowStepFactory](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java) enum specifying required inputs, outputs, required plugins, and optionally a different timeout than the default.

‎src/main/java/org/opensearch/flowframework/common/CommonValue.java

+8
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,14 @@ private CommonValue() {}
158158
public static final String CREATED_TIME = "created_time";
159159
/** The last updated time field for an agent */
160160
public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time";
161+
/** HttpHost */
162+
public static final String HTTP_HOST_FIELD = "http_host";
163+
/** Http scheme */
164+
public static final String SCHEME_FIELD = "scheme";
165+
/** Http hostname */
166+
public static final String HOSTNAME_FIELD = "hostname";
167+
/** Http port */
168+
public static final String PORT_FIELD = "port";
161169

162170
/*
163171
* Constants associated with resource provisioning / state
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package org.opensearch.flowframework.workflow;
10+
11+
import org.apache.hc.core5.http.HttpHost;
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.opensearch.action.support.PlainActionFuture;
15+
import org.opensearch.core.rest.RestStatus;
16+
import org.opensearch.flowframework.exception.FlowFrameworkException;
17+
import org.opensearch.flowframework.util.ParseUtils;
18+
19+
import java.util.Collections;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
import java.util.Set;
23+
24+
import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
25+
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
26+
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
27+
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;
28+
29+
/**
30+
* Step to register parameters for an HTTP Connection to a Host
31+
*/
32+
public class HttpHostStep implements WorkflowStep {
33+
34+
private static final Logger logger = LogManager.getLogger(HttpHostStep.class);
35+
PlainActionFuture<WorkflowData> hostFuture = PlainActionFuture.newFuture();
36+
static final String NAME = "http_host";
37+
38+
@Override
39+
public PlainActionFuture<WorkflowData> execute(
40+
String currentNodeId,
41+
WorkflowData currentNodeInputs,
42+
Map<String, WorkflowData> outputs,
43+
Map<String, String> previousNodeInputs,
44+
Map<String, String> params
45+
) {
46+
Set<String> requiredKeys = Set.of(SCHEME_FIELD, HOSTNAME_FIELD, PORT_FIELD);
47+
// TODO Possibly add credentials fields here
48+
// See ML Commons MLConnectorInput class and its usage
49+
Set<String> optionalKeys = Collections.emptySet();
50+
51+
try {
52+
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
53+
requiredKeys,
54+
optionalKeys,
55+
currentNodeInputs,
56+
outputs,
57+
previousNodeInputs,
58+
params
59+
);
60+
61+
String scheme = validScheme(inputs.get(SCHEME_FIELD));
62+
String hostname = validHostName(inputs.get(HOSTNAME_FIELD));
63+
int port = validPort(inputs.get(PORT_FIELD));
64+
65+
HttpHost httpHost = new HttpHost(scheme, hostname, port);
66+
67+
hostFuture.onResponse(
68+
new WorkflowData(
69+
Map.ofEntries(Map.entry(HTTP_HOST_FIELD, httpHost)),
70+
currentNodeInputs.getWorkflowId(),
71+
currentNodeInputs.getNodeId()
72+
)
73+
);
74+
75+
logger.info("Http Host registered successfully {}", httpHost);
76+
77+
} catch (FlowFrameworkException e) {
78+
hostFuture.onFailure(e);
79+
}
80+
return hostFuture;
81+
}
82+
83+
private String validScheme(Object o) {
84+
String scheme = o.toString().toLowerCase(Locale.ROOT);
85+
if ("http".equals(scheme) || "https".equals(scheme)) {
86+
return scheme;
87+
}
88+
throw new FlowFrameworkException("http_host scheme must be http or https", RestStatus.BAD_REQUEST);
89+
}
90+
91+
private String validHostName(Object o) {
92+
// TODO Add validation:
93+
// Prevent use of localhost or private IP address ranges
94+
// See ML Commons MLHttpClientFactory.java methods for examples
95+
// Possibly consider an allowlist of addresses
96+
return o.toString();
97+
}
98+
99+
private int validPort(Object o) {
100+
try {
101+
int port = Integer.parseInt(o.toString());
102+
if ((port & 0xffff0000) != 0) {
103+
throw new FlowFrameworkException("http_host port number must be between 0 and 65535", RestStatus.BAD_REQUEST);
104+
}
105+
return port;
106+
} catch (NumberFormatException e) {
107+
throw new FlowFrameworkException("http_host port must be a number between 0 and 65535", RestStatus.BAD_REQUEST);
108+
}
109+
}
110+
111+
@Override
112+
public String getName() {
113+
return NAME;
114+
}
115+
}

‎src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,19 @@
3333
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
3434
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
3535
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
36+
import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
37+
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
3638
import static org.opensearch.flowframework.common.CommonValue.MODEL_CONTENT_HASH_VALUE;
3739
import static org.opensearch.flowframework.common.CommonValue.MODEL_FORMAT;
3840
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
3941
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
4042
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
4143
import static org.opensearch.flowframework.common.CommonValue.OPENSEARCH_ML;
4244
import static org.opensearch.flowframework.common.CommonValue.PARAMETERS_FIELD;
45+
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
4346
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
4447
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
48+
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;
4549
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
4650
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
4751
import static org.opensearch.flowframework.common.CommonValue.TYPE;
@@ -100,6 +104,7 @@ public WorkflowStepFactory(
100104
stepMap.put(ToolStep.NAME, ToolStep::new);
101105
stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler));
102106
stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient));
107+
stepMap.put(HttpHostStep.NAME, HttpHostStep::new);
103108
}
104109

105110
/**
@@ -194,7 +199,16 @@ public enum WorkflowSteps {
194199
DELETE_AGENT(DeleteAgentStep.NAME, List.of(AGENT_ID), List.of(AGENT_ID), List.of(OPENSEARCH_ML), null),
195200

196201
/** Create Tool Step */
197-
CREATE_TOOL(ToolStep.NAME, List.of(TYPE), List.of(TOOLS_FIELD), List.of(OPENSEARCH_ML), null);
202+
CREATE_TOOL(ToolStep.NAME, List.of(TYPE), List.of(TOOLS_FIELD), List.of(OPENSEARCH_ML), null),
203+
204+
/** Http Host Step */
205+
HTTP_HOST(
206+
HttpHostStep.NAME,
207+
List.of(SCHEME_FIELD, HOSTNAME_FIELD, PORT_FIELD),
208+
List.of(HTTP_HOST_FIELD),
209+
Collections.emptyList(),
210+
null
211+
);
198212

199213
private final String workflowStepName;
200214
private final List<String> inputs;

‎src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java

+9-59
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@
1111
import org.opensearch.flowframework.common.FlowFrameworkSettings;
1212
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
1313
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
14+
import org.opensearch.flowframework.workflow.WorkflowStepFactory.WorkflowSteps;
1415
import org.opensearch.ml.client.MachineLearningNodeClient;
1516
import org.opensearch.test.OpenSearchTestCase;
1617
import org.opensearch.threadpool.ThreadPool;
1718

1819
import java.io.IOException;
1920
import java.util.ArrayList;
20-
import java.util.HashMap;
21+
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.stream.Collectors;
2325

2426
import static org.mockito.Mockito.mock;
2527
import static org.mockito.Mockito.when;
@@ -37,67 +39,12 @@ public void setUp() throws Exception {
3739
}
3840

3941
public void testParseWorkflowValidator() throws IOException {
40-
Map<String, WorkflowStepValidator> workflowStepValidators = new HashMap<>();
41-
workflowStepValidators.put(
42-
WorkflowStepFactory.WorkflowSteps.CREATE_CONNECTOR.getWorkflowStepName(),
43-
WorkflowStepFactory.WorkflowSteps.CREATE_CONNECTOR.getWorkflowStepValidator()
44-
);
45-
workflowStepValidators.put(
46-
WorkflowStepFactory.WorkflowSteps.DELETE_MODEL.getWorkflowStepName(),
47-
WorkflowStepFactory.WorkflowSteps.DELETE_MODEL.getWorkflowStepValidator()
48-
);
49-
workflowStepValidators.put(
50-
WorkflowStepFactory.WorkflowSteps.DEPLOY_MODEL.getWorkflowStepName(),
51-
WorkflowStepFactory.WorkflowSteps.DEPLOY_MODEL.getWorkflowStepValidator()
52-
);
53-
workflowStepValidators.put(
54-
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_CUSTOM_MODEL.getWorkflowStepName(),
55-
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_CUSTOM_MODEL.getWorkflowStepValidator()
56-
);
57-
workflowStepValidators.put(
58-
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_PRETRAINED_MODEL.getWorkflowStepName(),
59-
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_PRETRAINED_MODEL.getWorkflowStepValidator()
60-
);
61-
workflowStepValidators.put(
62-
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_SPARSE_ENCODING_MODEL.getWorkflowStepName(),
63-
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_SPARSE_ENCODING_MODEL.getWorkflowStepValidator()
64-
);
65-
workflowStepValidators.put(
66-
WorkflowStepFactory.WorkflowSteps.REGISTER_REMOTE_MODEL.getWorkflowStepName(),
67-
WorkflowStepFactory.WorkflowSteps.REGISTER_REMOTE_MODEL.getWorkflowStepValidator()
68-
);
69-
workflowStepValidators.put(
70-
WorkflowStepFactory.WorkflowSteps.REGISTER_MODEL_GROUP.getWorkflowStepName(),
71-
WorkflowStepFactory.WorkflowSteps.REGISTER_MODEL_GROUP.getWorkflowStepValidator()
72-
);
73-
workflowStepValidators.put(
74-
WorkflowStepFactory.WorkflowSteps.REGISTER_AGENT.getWorkflowStepName(),
75-
WorkflowStepFactory.WorkflowSteps.REGISTER_AGENT.getWorkflowStepValidator()
76-
);
77-
workflowStepValidators.put(
78-
WorkflowStepFactory.WorkflowSteps.CREATE_TOOL.getWorkflowStepName(),
79-
WorkflowStepFactory.WorkflowSteps.CREATE_TOOL.getWorkflowStepValidator()
80-
);
81-
workflowStepValidators.put(
82-
WorkflowStepFactory.WorkflowSteps.UNDEPLOY_MODEL.getWorkflowStepName(),
83-
WorkflowStepFactory.WorkflowSteps.UNDEPLOY_MODEL.getWorkflowStepValidator()
84-
);
85-
workflowStepValidators.put(
86-
WorkflowStepFactory.WorkflowSteps.DELETE_CONNECTOR.getWorkflowStepName(),
87-
WorkflowStepFactory.WorkflowSteps.DELETE_CONNECTOR.getWorkflowStepValidator()
88-
);
89-
workflowStepValidators.put(
90-
WorkflowStepFactory.WorkflowSteps.DELETE_AGENT.getWorkflowStepName(),
91-
WorkflowStepFactory.WorkflowSteps.DELETE_AGENT.getWorkflowStepValidator()
92-
);
93-
workflowStepValidators.put(
94-
WorkflowStepFactory.WorkflowSteps.NOOP.getWorkflowStepName(),
95-
WorkflowStepFactory.WorkflowSteps.NOOP.getWorkflowStepValidator()
96-
);
42+
Map<String, WorkflowStepValidator> workflowStepValidators = Arrays.stream(WorkflowSteps.values())
43+
.collect(Collectors.toMap(WorkflowSteps::getWorkflowStepName, WorkflowSteps::getWorkflowStepValidator));
9744

9845
WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);
9946

100-
assertEquals(14, validator.getWorkflowStepValidators().size());
47+
assertEquals(15, validator.getWorkflowStepValidators().size());
10148

10249
assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
10350
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
@@ -155,6 +102,9 @@ public void testParseWorkflowValidator() throws IOException {
155102
assertEquals(0, validator.getWorkflowStepValidators().get("noop").getInputs().size());
156103
assertEquals(0, validator.getWorkflowStepValidators().get("noop").getOutputs().size());
157104

105+
assertTrue(validator.getWorkflowStepValidators().keySet().contains("http_host"));
106+
assertEquals(3, validator.getWorkflowStepValidators().get("http_host").getInputs().size());
107+
assertEquals(1, validator.getWorkflowStepValidators().get("http_host").getOutputs().size());
158108
}
159109

160110
public void testWorkflowStepFactoryHasValidators() throws IOException {

0 commit comments

Comments
 (0)
Please sign in to comment.