16
16
import org .opensearch .action .support .ActionFilters ;
17
17
import org .opensearch .action .support .HandledTransportAction ;
18
18
import org .opensearch .client .Client ;
19
+ import org .opensearch .cluster .service .ClusterService ;
19
20
import org .opensearch .common .inject .Inject ;
21
+ import org .opensearch .common .settings .Settings ;
20
22
import org .opensearch .common .unit .TimeValue ;
21
23
import org .opensearch .common .util .concurrent .ThreadContext ;
22
24
import org .opensearch .commons .authuser .User ;
23
25
import org .opensearch .core .action .ActionListener ;
24
26
import org .opensearch .core .rest .RestStatus ;
27
+ import org .opensearch .core .xcontent .NamedXContentRegistry ;
25
28
import org .opensearch .flowframework .common .CommonValue ;
26
29
import org .opensearch .flowframework .common .FlowFrameworkSettings ;
27
30
import org .opensearch .flowframework .exception .FlowFrameworkException ;
49
52
import static org .opensearch .flowframework .common .CommonValue .GLOBAL_CONTEXT_INDEX ;
50
53
import static org .opensearch .flowframework .common .CommonValue .PROVISIONING_PROGRESS_FIELD ;
51
54
import static org .opensearch .flowframework .common .CommonValue .STATE_FIELD ;
55
+ import static org .opensearch .flowframework .common .FlowFrameworkSettings .FILTER_BY_BACKEND_ROLES ;
56
+ import static org .opensearch .flowframework .util .ParseUtils .checkFilterByBackendRoles ;
52
57
import static org .opensearch .flowframework .util .ParseUtils .getUserContext ;
58
+ import static org .opensearch .flowframework .util .ParseUtils .getWorkflow ;
53
59
54
60
/**
55
61
* Transport Action to index or update a use case template within the Global Context
@@ -63,6 +69,9 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
63
69
private final Client client ;
64
70
private final FlowFrameworkSettings flowFrameworkSettings ;
65
71
private final PluginsService pluginsService ;
72
+ private volatile Boolean filterByEnabled ;
73
+ private final ClusterService clusterService ;
74
+ private final NamedXContentRegistry xContentRegistry ;
66
75
67
76
/**
68
77
* Instantiates a new CreateWorkflowTransportAction
@@ -73,6 +82,9 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
73
82
* @param flowFrameworkSettings Plugin settings
74
83
* @param client The client used to make the request to OS
75
84
* @param pluginsService The plugin service
85
+ * @param clusterService the cluster service
86
+ * @param xContentRegistry the named content registry
87
+ * @param settings the plugin settings
76
88
*/
77
89
@ Inject
78
90
public CreateWorkflowTransportAction (
@@ -82,20 +94,93 @@ public CreateWorkflowTransportAction(
82
94
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler ,
83
95
FlowFrameworkSettings flowFrameworkSettings ,
84
96
Client client ,
85
- PluginsService pluginsService
97
+ PluginsService pluginsService ,
98
+ ClusterService clusterService ,
99
+ NamedXContentRegistry xContentRegistry ,
100
+ Settings settings
86
101
) {
87
102
super (CreateWorkflowAction .NAME , transportService , actionFilters , WorkflowRequest ::new );
88
103
this .workflowProcessSorter = workflowProcessSorter ;
89
104
this .flowFrameworkIndicesHandler = flowFrameworkIndicesHandler ;
90
105
this .flowFrameworkSettings = flowFrameworkSettings ;
91
106
this .client = client ;
92
107
this .pluginsService = pluginsService ;
108
+ filterByEnabled = FILTER_BY_BACKEND_ROLES .get (settings );
109
+ this .clusterService = clusterService ;
110
+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (FILTER_BY_BACKEND_ROLES , it -> filterByEnabled = it );
111
+ this .xContentRegistry = xContentRegistry ;
93
112
}
94
113
95
114
@ Override
96
115
protected void doExecute (Task task , WorkflowRequest request , ActionListener <WorkflowResponse > listener ) {
97
-
98
116
User user = getUserContext (client );
117
+ String workflowId = request .getWorkflowId ();
118
+ try {
119
+ resolveUserAndExecute (user , workflowId , listener , () -> createExecute (request , user , listener ));
120
+ } catch (Exception e ) {
121
+ logger .error ("Failed to create workflow" , e );
122
+ listener .onFailure (e );
123
+ }
124
+ }
125
+
126
+ /**
127
+ * Resolve user and execute the workflow function
128
+ * @param requestedUser the user making the request
129
+ * @param workflowId the workflow id
130
+ * @param listener the action listener
131
+ * @param function the workflow function to execute
132
+ */
133
+ private void resolveUserAndExecute (
134
+ User requestedUser ,
135
+ String workflowId ,
136
+ ActionListener <WorkflowResponse > listener ,
137
+ Runnable function
138
+ ) {
139
+ try {
140
+ // Check if user has backend roles
141
+ // When filter by is enabled, block users creating/updating workflows who do not have backend roles.
142
+ if (filterByEnabled == Boolean .TRUE ) {
143
+ try {
144
+ checkFilterByBackendRoles (requestedUser );
145
+ } catch (FlowFrameworkException e ) {
146
+ logger .error (e .getMessage (), e );
147
+ listener .onFailure (e );
148
+ return ;
149
+ }
150
+ }
151
+ if (workflowId != null ) {
152
+ // requestedUser == null means security is disabled or user is superadmin. In this case we don't need to
153
+ // check if request user have access to the workflow or not. But we still need to get current workflow for
154
+ // this case, so we can keep current workflow's user data.
155
+ boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled ;
156
+ // Update workflow request, check if user has permissions to update the workflow
157
+ // Get workflow and verify backend roles
158
+ getWorkflow (requestedUser , workflowId , filterByBackendRole , listener , function , client , clusterService , xContentRegistry );
159
+ } else {
160
+ // Create Workflow. No need to get current workflow.
161
+ function .run ();
162
+ }
163
+ } catch (Exception e ) {
164
+ String errorMessage = "Failed to create or update workflow" ;
165
+ if (e instanceof FlowFrameworkException ) {
166
+ listener .onFailure (e );
167
+ } else {
168
+ listener .onFailure (new FlowFrameworkException (errorMessage , ExceptionsHelper .status (e )));
169
+ }
170
+ }
171
+ }
172
+
173
+ /**
174
+ * Execute the create or update request
175
+ * 1. Validate workflows if requested
176
+ * 2. Create or update global context index
177
+ * 3. Create or update state index
178
+ * 4. Create or update provisioning progress index
179
+ * @param request the workflow request
180
+ * @param user the user making the request
181
+ * @param listener the action listener
182
+ */
183
+ private void createExecute (WorkflowRequest request , User user , ActionListener <WorkflowResponse > listener ) {
99
184
Instant creationTime = Instant .now ();
100
185
Template templateWithUser = new Template (
101
186
request .getTemplate ().name (),
0 commit comments