8
8
9
9
package org .opensearch .extensions .rest ;
10
10
11
+ import org .opensearch .Version ;
11
12
import org .opensearch .client .node .NodeClient ;
13
+ import org .opensearch .common .Strings ;
14
+ import org .opensearch .common .collect .Tuple ;
15
+ import org .opensearch .common .settings .Settings ;
16
+ import org .opensearch .common .xcontent .XContentHelper ;
17
+ import org .opensearch .core .xcontent .MediaType ;
12
18
import org .opensearch .core .xcontent .XContentBuilder ;
13
- import org .opensearch .core .xcontent .XContentParser ;
14
19
import org .opensearch .extensions .ExtensionDependency ;
15
20
import org .opensearch .extensions .ExtensionScopedSettings ;
16
21
import org .opensearch .extensions .ExtensionsManager ;
23
28
24
29
import java .io .IOException ;
25
30
import java .util .ArrayList ;
26
- import java .util .Collections ;
31
+ import java .util .Arrays ;
32
+ import java .util .Collection ;
33
+ import java .util .HashMap ;
27
34
import java .util .List ;
35
+ import java .util .Map ;
36
+ import java .util .Set ;
28
37
import java .util .concurrent .CompletionException ;
29
38
import java .util .concurrent .TimeoutException ;
39
+ import java .util .stream .Collectors ;
30
40
31
- import static org .opensearch .common .xcontent .XContentParserUtils .ensureExpectedToken ;
32
41
import static org .opensearch .rest .RestRequest .Method .POST ;
33
42
34
43
/**
@@ -62,36 +71,79 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
62
71
String openSearchVersion = null ;
63
72
String minimumCompatibleVersion = null ;
64
73
List <ExtensionDependency > dependencies = new ArrayList <>();
74
+ Set <String > additionalSettingsKeys = extensionsManager .getAdditionalSettings ()
75
+ .stream ()
76
+ .map (s -> s .getKey ())
77
+ .collect (Collectors .toSet ());
65
78
66
- try (XContentParser parser = request .contentParser ()) {
67
- parser .nextToken ();
68
- ensureExpectedToken (XContentParser .Token .START_OBJECT , parser .currentToken (), parser );
69
- while (parser .nextToken () != XContentParser .Token .END_OBJECT ) {
70
- String currentFieldName = parser .currentName ();
71
- parser .nextToken ();
72
- if ("name" .equals (currentFieldName )) {
73
- name = parser .text ();
74
- } else if ("uniqueId" .equals (currentFieldName )) {
75
- uniqueId = parser .text ();
76
- } else if ("hostAddress" .equals (currentFieldName )) {
77
- hostAddress = parser .text ();
78
- } else if ("port" .equals (currentFieldName )) {
79
- port = parser .text ();
80
- } else if ("version" .equals (currentFieldName )) {
81
- version = parser .text ();
82
- } else if ("opensearchVersion" .equals (currentFieldName )) {
83
- openSearchVersion = parser .text ();
84
- } else if ("minimumCompatibleVersion" .equals (currentFieldName )) {
85
- minimumCompatibleVersion = parser .text ();
86
- } else if ("dependencies" .equals (currentFieldName )) {
87
- ensureExpectedToken (XContentParser .Token .START_ARRAY , parser .currentToken (), parser );
88
- while (parser .nextToken () != XContentParser .Token .END_ARRAY ) {
89
- dependencies .add (ExtensionDependency .parse (parser ));
79
+ Tuple <? extends MediaType , Map <String , Object >> unreadExtensionTuple = XContentHelper .convertToMap (
80
+ request .content (),
81
+ false ,
82
+ request .getXContentType ().xContent ().mediaType ()
83
+ );
84
+ Map <String , Object > extensionMap = unreadExtensionTuple .v2 ();
85
+
86
+ ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings (extensionsManager .getAdditionalSettings ());
87
+
88
+ try {
89
+ // checking to see whether any required fields are missing from extension initialization request or not
90
+ String [] requiredFields = {
91
+ "name" ,
92
+ "uniqueId" ,
93
+ "hostAddress" ,
94
+ "port" ,
95
+ "version" ,
96
+ "opensearchVersion" ,
97
+ "minimumCompatibleVersion" };
98
+ List <String > missingFields = Arrays .stream (requiredFields )
99
+ .filter (field -> !extensionMap .containsKey (field ))
100
+ .collect (Collectors .toList ());
101
+ if (!missingFields .isEmpty ()) {
102
+ throw new IOException ("Extension is missing these required fields : " + missingFields );
103
+ }
104
+
105
+ // Parse extension dependencies
106
+ List <ExtensionDependency > extensionDependencyList = new ArrayList <ExtensionDependency >();
107
+ if (extensionMap .get ("dependencies" ) != null ) {
108
+ List <HashMap <String , ?>> extensionDependencies = new ArrayList <>(
109
+ (Collection <HashMap <String , ?>>) extensionMap .get ("dependencies" )
110
+ );
111
+ for (HashMap <String , ?> dependency : extensionDependencies ) {
112
+ if (Strings .isNullOrEmpty ((String ) dependency .get ("uniqueId" ))) {
113
+ throw new IOException ("Required field [uniqueId] is missing in the request for the dependent extension" );
114
+ } else if (dependency .get ("version" ) == null ) {
115
+ throw new IOException ("Required field [version] is missing in the request for the dependent extension" );
90
116
}
117
+ extensionDependencyList .add (
118
+ new ExtensionDependency (
119
+ dependency .get ("uniqueId" ).toString (),
120
+ Version .fromString (dependency .get ("version" ).toString ())
121
+ )
122
+ );
91
123
}
92
124
}
125
+
126
+ Map <String , ?> additionalSettingsMap = extensionMap .entrySet ()
127
+ .stream ()
128
+ .filter (kv -> additionalSettingsKeys .contains (kv .getKey ()))
129
+ .collect (Collectors .toMap (map -> map .getKey (), map -> map .getValue ()));
130
+
131
+ Settings .Builder output = Settings .builder ();
132
+ output .loadFromMap (additionalSettingsMap );
133
+ extAdditionalSettings .applySettings (output .build ());
134
+
135
+ // Create extension read from initialization request
136
+ name = extensionMap .get ("name" ).toString ();
137
+ uniqueId = extensionMap .get ("uniqueId" ).toString ();
138
+ hostAddress = extensionMap .get ("hostAddress" ).toString ();
139
+ port = extensionMap .get ("port" ).toString ();
140
+ version = extensionMap .get ("version" ).toString ();
141
+ openSearchVersion = extensionMap .get ("opensearchVersion" ).toString ();
142
+ minimumCompatibleVersion = extensionMap .get ("minimumCompatibleVersion" ).toString ();
143
+ dependencies = extensionDependencyList ;
93
144
} catch (IOException e ) {
94
- throw new IOException ("Missing attribute" , e );
145
+ logger .warn ("loading extension has been failed because of exception : " + e .getMessage ());
146
+ return channel -> channel .sendResponse (new BytesRestResponse (RestStatus .INTERNAL_SERVER_ERROR , e .getMessage ()));
95
147
}
96
148
97
149
Extension extension = new Extension (
@@ -103,8 +155,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
103
155
openSearchVersion ,
104
156
minimumCompatibleVersion ,
105
157
dependencies ,
106
- // TODO add this to the API (https://github.com/opensearch-project/OpenSearch/issues/8032)
107
- new ExtensionScopedSettings (Collections .emptySet ())
158
+ extAdditionalSettings
108
159
);
109
160
try {
110
161
extensionsManager .loadExtension (extension );
0 commit comments