Skip to content

Commit cd9ddb9

Browse files
committed
adding state index initial
Signed-off-by: Amit Galitzky <[email protected]>
1 parent 63ef780 commit cd9ddb9

23 files changed

+1373
-574
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ buildscript {
5656
opensearch_group = "org.opensearch"
5757
opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","")
5858
System.setProperty('tests.security.manager', 'false')
59+
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
5960
}
6061

6162
repositories {
@@ -135,6 +136,7 @@ dependencies {
135136
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
136137
implementation "com.google.guava:guava:32.1.3-jre"
137138
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
139+
implementation "org.opensearch:common-utils:${common_utils_version}"
138140

139141
configurations.all {
140142
resolutionStrategy {

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,13 @@
2424
import org.opensearch.core.xcontent.NamedXContentRegistry;
2525
import org.opensearch.env.Environment;
2626
import org.opensearch.env.NodeEnvironment;
27-
import org.opensearch.flowframework.indices.GlobalContextHandler;
27+
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
2828
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
2929
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
3030
import org.opensearch.flowframework.transport.CreateWorkflowAction;
3131
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
3232
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
3333
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
34-
import org.opensearch.flowframework.workflow.CreateIndexStep;
3534
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
3635
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
3736
import org.opensearch.plugins.ActionPlugin;
@@ -79,10 +78,9 @@ public Collection<Object> createComponents(
7978
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client);
8079
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);
8180

82-
// TODO : Refactor, move system index creation/associated methods outside of the CreateIndexStep
83-
GlobalContextHandler globalContextHandler = new GlobalContextHandler(client, new CreateIndexStep(clusterService, client));
81+
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService);
8482

85-
return ImmutableList.of(workflowStepFactory, workflowProcessSorter, globalContextHandler);
83+
return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler);
8684
}
8785

8886
@Override

src/main/java/org/opensearch/flowframework/common/CommonValue.java

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ private CommonValue() {}
2727
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
2828
/** Global Context index mapping version */
2929
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;
30+
/** Workflow State Index Name */
31+
public static final String WORKFLOW_STATE_INDEX = ".plugins-workflow-state";
32+
/** Workflow State index mapping file path */
33+
public static final String WORKFLOW_STATE_INDEX_MAPPING = "mappings/workflow-state.json";
34+
/** Workflow State index mapping version */
35+
public static final Integer WORKFLOW_STATE_INDEX_VERSION = 1;
3036

3137
/** The transport action name prefix */
3238
public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
1616
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;
17+
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
18+
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_VERSION;
1719

1820
/**
1921
* An enumeration of Flow Framework indices
@@ -24,8 +26,13 @@ public enum FlowFrameworkIndex {
2426
*/
2527
GLOBAL_CONTEXT(
2628
GLOBAL_CONTEXT_INDEX,
27-
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
29+
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getGlobalContextMappings),
2830
GLOBAL_CONTEXT_INDEX_VERSION
31+
),
32+
WORKFLOW_STATE(
33+
WORKFLOW_STATE_INDEX,
34+
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getGlobalContextMappings),
35+
WORKFLOW_STATE_INDEX_VERSION
2936
);
3037

3138
private final String indexName;

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

+432
Large diffs are not rendered by default.

src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java

-151
This file was deleted.

src/main/java/org/opensearch/flowframework/model/PipelineProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import java.util.Map;
1818

1919
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
20-
import static org.opensearch.flowframework.common.TemplateUtil.buildStringToStringMap;
21-
import static org.opensearch.flowframework.common.TemplateUtil.parseStringToStringMap;
20+
import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap;
21+
import static org.opensearch.flowframework.util.ParseUtils.parseStringToStringMap;
2222

2323
/**
2424
* This represents a processor associated with search and ingest pipelines in the {@link Template}.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package org.opensearch.flowframework.model;
10+
11+
public enum ProvisioningProgress {
12+
IN_PROGRESS,
13+
DONE,
14+
NOT_STARTED
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package org.opensearch.flowframework.model;
10+
11+
public enum State {
12+
NOT_STARTED,
13+
PROVISIONING,
14+
FAILED,
15+
READY
16+
}

src/main/java/org/opensearch/flowframework/model/Template.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import java.util.Map.Entry;
2626

2727
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
28-
import static org.opensearch.flowframework.common.TemplateUtil.jsonToParser;
29-
import static org.opensearch.flowframework.common.TemplateUtil.parseStringToStringMap;
28+
import static org.opensearch.flowframework.util.ParseUtils.jsonToParser;
29+
import static org.opensearch.flowframework.util.ParseUtils.parseStringToStringMap;
3030

3131
/**
3232
* The Template is the central data structure which configures workflows. This object is used to parse JSON communicated via REST API.

src/main/java/org/opensearch/flowframework/model/WorkflowNode.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import java.util.Objects;
2525

2626
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
27-
import static org.opensearch.flowframework.common.TemplateUtil.buildStringToStringMap;
28-
import static org.opensearch.flowframework.common.TemplateUtil.parseStringToStringMap;
27+
import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap;
28+
import static org.opensearch.flowframework.util.ParseUtils.parseStringToStringMap;
2929

3030
/**
3131
* This represents a process node (step) in a workflow graph in the {@link Template}.

0 commit comments

Comments
 (0)