Skip to content

Commit b15c866

Browse files
[Backport 2.x] Topological Sorting and Sequenced Execution (#48)
Topological Sorting and Sequenced Execution (#26) * Topological Sorting and Sequenced Execution * Add javadocs * Update demo to link to Workflow interface * Replace System.out with logging * Update with new interface signatures * Demo passing input data at parse-time * Demo passing data in between steps * Change execute arg to list and refactor demo classes to own package * Significantly simplify input/output data passing * Add tests * Fix javadocs and forbidden API issues * Address code review comments --------- (cherry picked from commit a574f47) Signed-off-by: Daniel Widdis <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 58db87b commit b15c866

21 files changed

+1133
-11
lines changed

.codecov.yml

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
codecov:
22
require_ci_to_pass: yes
33

4+
# ignore files in demo package
5+
ignore:
6+
- "src/main/java/demo"
7+
48
coverage:
59
precision: 2
610
round: down

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ repositories {
105105
dependencies {
106106
implementation "org.opensearch:opensearch:${opensearch_version}"
107107
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
108+
implementation "com.google.code.gson:gson:2.10.1"
108109
compileOnly "com.google.guava:guava:32.1.2-jre"
109110
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
110111

formatter/formatting.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ allprojects {
3535

3636
trimTrailingWhitespace()
3737
endWithNewline()
38+
indentWithSpaces()
3839
}
3940
format("license", {
4041
licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package ");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package demo;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
14+
import org.opensearch.flowframework.workflow.WorkflowData;
15+
import org.opensearch.flowframework.workflow.WorkflowStep;
16+
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
/**
22+
* Sample to show other devs how to pass data around. Will be deleted once other PRs are merged.
23+
*/
24+
public class CreateIndexWorkflowStep implements WorkflowStep {
25+
26+
private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class);
27+
28+
private final String name;
29+
30+
/**
31+
* Instantiate this class.
32+
*/
33+
public CreateIndexWorkflowStep() {
34+
this.name = "CREATE_INDEX";
35+
}
36+
37+
@Override
38+
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
39+
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
40+
// TODO we will be passing a thread pool to this object when it's instantiated
41+
// we should either add the generic executor from that pool to this call
42+
// or use executorservice.submit or any of various threading options
43+
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
44+
CompletableFuture.runAsync(() -> {
45+
String inputIndex = null;
46+
boolean first = true;
47+
for (WorkflowData wfData : data) {
48+
logger.debug(
49+
"{} sent params: {}, content: {}",
50+
first ? "Initialization" : "Previous step",
51+
wfData.getParams(),
52+
wfData.getContent()
53+
);
54+
if (first) {
55+
Map<String, String> params = data.get(0).getParams();
56+
if (params.containsKey("index")) {
57+
inputIndex = params.get("index");
58+
}
59+
first = false;
60+
}
61+
}
62+
// do some work, simulating a REST API call
63+
try {
64+
Thread.sleep(2000);
65+
} catch (InterruptedException e) {}
66+
// Simulate response of created index
67+
CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex);
68+
future.complete(new WorkflowData() {
69+
@Override
70+
public Map<String, Object> getContent() {
71+
return Map.of("index", response.index());
72+
}
73+
});
74+
});
75+
76+
return future;
77+
}
78+
79+
@Override
80+
public String getName() {
81+
return name;
82+
}
83+
}

src/main/java/demo/DataDemo.java

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package demo;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.SuppressForbidden;
14+
import org.opensearch.common.io.PathUtils;
15+
import org.opensearch.flowframework.template.ProcessNode;
16+
import org.opensearch.flowframework.template.TemplateParser;
17+
import org.opensearch.flowframework.workflow.WorkflowStep;
18+
19+
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
21+
import java.nio.file.Files;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Locale;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.stream.Collectors;
30+
31+
/**
32+
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
33+
*/
34+
public class DataDemo {
35+
36+
private static final Logger logger = LogManager.getLogger(DataDemo.class);
37+
38+
// This is temporary. We need a factory class to generate these workflow steps
39+
// based on a field in the JSON.
40+
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
41+
static {
42+
workflowMap.put("create_index", new CreateIndexWorkflowStep());
43+
workflowMap.put("create_another_index", new CreateIndexWorkflowStep());
44+
}
45+
46+
/**
47+
* Demonstrate parsing a JSON graph.
48+
*
49+
* @param args unused
50+
*/
51+
@SuppressForbidden(reason = "just a demo class that will be deleted")
52+
public static void main(String[] args) {
53+
String path = "src/test/resources/template/datademo.json";
54+
String json;
55+
try {
56+
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8);
57+
} catch (IOException e) {
58+
logger.error("Failed to read JSON at path {}", path);
59+
return;
60+
}
61+
62+
logger.info("Parsing graph to sequence...");
63+
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
64+
List<CompletableFuture<?>> futureList = new ArrayList<>();
65+
66+
for (ProcessNode n : processSequence) {
67+
Set<ProcessNode> predecessors = n.getPredecessors();
68+
logger.info(
69+
"Queueing process [{}].{}",
70+
n.id(),
71+
predecessors.isEmpty()
72+
? " Can start immediately!"
73+
: String.format(
74+
Locale.getDefault(),
75+
" Must wait for [%s] to complete first.",
76+
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
77+
)
78+
);
79+
futureList.add(n.execute());
80+
}
81+
futureList.forEach(CompletableFuture::join);
82+
logger.info("All done!");
83+
}
84+
85+
}

src/main/java/demo/Demo.java

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package demo;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.SuppressForbidden;
14+
import org.opensearch.common.io.PathUtils;
15+
import org.opensearch.flowframework.template.ProcessNode;
16+
import org.opensearch.flowframework.template.TemplateParser;
17+
import org.opensearch.flowframework.workflow.WorkflowStep;
18+
19+
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
21+
import java.nio.file.Files;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Locale;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.stream.Collectors;
30+
31+
/**
32+
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
33+
*/
34+
public class Demo {
35+
36+
private static final Logger logger = LogManager.getLogger(Demo.class);
37+
38+
// This is temporary. We need a factory class to generate these workflow steps
39+
// based on a field in the JSON.
40+
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
41+
static {
42+
workflowMap.put("fetch_model", new DemoWorkflowStep(3000));
43+
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000));
44+
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000));
45+
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
46+
}
47+
48+
/**
49+
* Demonstrate parsing a JSON graph.
50+
*
51+
* @param args unused
52+
*/
53+
@SuppressForbidden(reason = "just a demo class that will be deleted")
54+
public static void main(String[] args) {
55+
String path = "src/test/resources/template/demo.json";
56+
String json;
57+
try {
58+
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8);
59+
} catch (IOException e) {
60+
logger.error("Failed to read JSON at path {}", path);
61+
return;
62+
}
63+
64+
logger.info("Parsing graph to sequence...");
65+
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
66+
List<CompletableFuture<?>> futureList = new ArrayList<>();
67+
68+
for (ProcessNode n : processSequence) {
69+
Set<ProcessNode> predecessors = n.getPredecessors();
70+
logger.info(
71+
"Queueing process [{}].{}",
72+
n.id(),
73+
predecessors.isEmpty()
74+
? " Can start immediately!"
75+
: String.format(
76+
Locale.getDefault(),
77+
" Must wait for [%s] to complete first.",
78+
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
79+
)
80+
);
81+
// TODO need to handle this better, passing an argument when we start them all at the beginning is silly
82+
futureList.add(n.execute());
83+
}
84+
futureList.forEach(CompletableFuture::join);
85+
logger.info("All done!");
86+
}
87+
88+
}
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package demo;
10+
11+
import org.opensearch.flowframework.workflow.WorkflowData;
12+
import org.opensearch.flowframework.workflow.WorkflowStep;
13+
14+
import java.util.List;
15+
import java.util.concurrent.CompletableFuture;
16+
17+
/**
18+
* Demo workflowstep to show sequenced execution
19+
*/
20+
public class DemoWorkflowStep implements WorkflowStep {
21+
22+
private final long delay;
23+
private final String name;
24+
25+
/**
26+
* Instantiate a step with a delay.
27+
* @param delay milliseconds to take pretending to do work while really sleeping
28+
*/
29+
public DemoWorkflowStep(long delay) {
30+
this.delay = delay;
31+
this.name = "DEMO_DELAY_" + delay;
32+
}
33+
34+
@Override
35+
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
36+
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
37+
CompletableFuture.runAsync(() -> {
38+
try {
39+
Thread.sleep(this.delay);
40+
future.complete(null);
41+
} catch (InterruptedException e) {
42+
future.completeExceptionally(e);
43+
}
44+
});
45+
return future;
46+
}
47+
48+
@Override
49+
public String getName() {
50+
return name;
51+
}
52+
}

src/main/java/demo/README.txt

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
2+
DO NOT DEPEND ON CLASSES IN THIS PACKAGE.
3+
4+
The contents of this folder are for demo/proof-of-concept use.
5+
6+
Feel free to look at the classes in this folder for potential "how could I" scenarios.
7+
8+
Tests will not be written against them.
9+
Documentation may be incomplete, wrong, or outdated.
10+
These are not for production use.
11+
They will be deleted without notice at some point, and altered without notice at other points.
12+
13+
DO NOT DEPEND ON CLASSES IN THIS PACKAGE.

0 commit comments

Comments
 (0)