Skip to content

Commit e88bc67

Browse files
committed
initial create and start client
Signed-off-by: Amit Galitzky <[email protected]>
1 parent a6c1916 commit e88bc67

21 files changed

+629
-52
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ dependencies {
192192
testImplementation 'org.reflections:reflections:0.10.2'
193193

194194
testImplementation "org.opensearch.test:framework:${opensearch_version}"
195+
196+
zipArchive("org.opensearch.plugin:opensearch-ml-plugin:${opensearch_build}")
197+
195198
}
196199

197200
apply plugin: 'java'

src/main/java/org/opensearch/ad/client/AnomalyDetectionClient.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@
99
import org.opensearch.action.search.SearchResponse;
1010
import org.opensearch.action.support.PlainActionFuture;
1111
import org.opensearch.ad.transport.GetAnomalyDetectorResponse;
12+
import org.opensearch.ad.transport.IndexAnomalyDetectorRequest;
13+
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
1214
import org.opensearch.common.action.ActionFuture;
1315
import org.opensearch.core.action.ActionListener;
1416
import org.opensearch.timeseries.transport.GetConfigRequest;
17+
import org.opensearch.timeseries.transport.JobRequest;
18+
import org.opensearch.timeseries.transport.JobResponse;
1519
import org.opensearch.timeseries.transport.SuggestConfigParamRequest;
1620
import org.opensearch.timeseries.transport.SuggestConfigParamResponse;
1721
import org.opensearch.timeseries.transport.ValidateConfigRequest;
@@ -110,4 +114,40 @@ default ActionFuture<SuggestConfigParamResponse> suggestAnomalyDetector(SuggestC
110114
* @param listener a listener to be notified of the result
111115
*/
112116
void suggestAnomalyDetector(SuggestConfigParamRequest suggestRequest, ActionListener<SuggestConfigParamResponse> listener);
117+
118+
/**
119+
* Create anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#create-detector
120+
* @param createRequest request to create the detector
121+
* @return ActionFuture of IndexAnomalyDetectorResponse
122+
*/
123+
default ActionFuture<IndexAnomalyDetectorResponse> createAnomalyDetector(IndexAnomalyDetectorRequest createRequest) {
124+
PlainActionFuture<IndexAnomalyDetectorResponse> actionFuture = PlainActionFuture.newFuture();
125+
createAnomalyDetector(createRequest, actionFuture);
126+
return actionFuture;
127+
}
128+
129+
/**
130+
* Create anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#create-detector
131+
* @param createRequest request to create the detector
132+
* @param listener a listener to be notified of the result
133+
*/
134+
void createAnomalyDetector(IndexAnomalyDetectorRequest createRequest, ActionListener<IndexAnomalyDetectorResponse> listener);
135+
136+
/**
137+
* Start anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#start-detector
138+
* @param startRequest request to start the detector
139+
* @return ActionFuture of JobResponse
140+
*/
141+
default ActionFuture<JobResponse> startAnomalyDetector(JobRequest startRequest) {
142+
PlainActionFuture<JobResponse> actionFuture = PlainActionFuture.newFuture();
143+
startAnomalyDetector(startRequest, actionFuture);
144+
return actionFuture;
145+
}
146+
147+
/**
148+
* Start anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#start-detector
149+
* @param startRequest request to start the detector
150+
* @param listener a listener to be notified of the result
151+
*/
152+
void startAnomalyDetector(JobRequest startRequest, ActionListener<JobResponse> listener);
113153
}

src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@
99

1010
import org.opensearch.action.search.SearchRequest;
1111
import org.opensearch.action.search.SearchResponse;
12+
import org.opensearch.ad.transport.AnomalyDetectorJobAction;
1213
import org.opensearch.ad.transport.GetAnomalyDetectorAction;
1314
import org.opensearch.ad.transport.GetAnomalyDetectorResponse;
15+
import org.opensearch.ad.transport.IndexAnomalyDetectorAction;
16+
import org.opensearch.ad.transport.IndexAnomalyDetectorRequest;
17+
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
1418
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
1519
import org.opensearch.ad.transport.SearchAnomalyResultAction;
1620
import org.opensearch.ad.transport.SuggestAnomalyDetectorParamAction;
@@ -19,6 +23,8 @@
1923
import org.opensearch.core.action.ActionResponse;
2024
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2125
import org.opensearch.timeseries.transport.GetConfigRequest;
26+
import org.opensearch.timeseries.transport.JobRequest;
27+
import org.opensearch.timeseries.transport.JobResponse;
2228
import org.opensearch.timeseries.transport.SuggestConfigParamRequest;
2329
import org.opensearch.timeseries.transport.SuggestConfigParamResponse;
2430
import org.opensearch.timeseries.transport.ValidateConfigRequest;
@@ -63,6 +69,16 @@ public void suggestAnomalyDetector(SuggestConfigParamRequest suggestRequest, Act
6369
this.client.execute(SuggestAnomalyDetectorParamAction.INSTANCE, suggestRequest, suggestConfigResponseActionListener(listener));
6470
}
6571

72+
@Override
73+
public void createAnomalyDetector(IndexAnomalyDetectorRequest createRequest, ActionListener<IndexAnomalyDetectorResponse> listener) {
74+
this.client.execute(IndexAnomalyDetectorAction.INSTANCE, createRequest, indexAnomalyDetectorResponseActionListener(listener));
75+
}
76+
77+
@Override
78+
public void startAnomalyDetector(JobRequest startRequest, ActionListener<JobResponse> listener) {
79+
this.client.execute(AnomalyDetectorJobAction.INSTANCE, startRequest, jobResponseActionListener(listener));
80+
}
81+
6682
// We need to wrap AD-specific response type listeners around an internal listener, and re-generate the response from a generic
6783
// ActionResponse. This is needed to prevent classloader issues and ClassCastExceptions when executed by other plugins.
6884
// Additionally, we need to inject the configured NamedWriteableRegistry so NamedWriteables (present in sub-fields of
@@ -107,6 +123,30 @@ private ActionListener<SuggestConfigParamResponse> suggestConfigResponseActionLi
107123
return actionListener;
108124
}
109125

126+
private ActionListener<IndexAnomalyDetectorResponse> indexAnomalyDetectorResponseActionListener(
127+
ActionListener<IndexAnomalyDetectorResponse> listener
128+
) {
129+
ActionListener<IndexAnomalyDetectorResponse> internalListener = ActionListener.wrap(indexAnomalyDetectorResponse -> {
130+
listener.onResponse(indexAnomalyDetectorResponse);
131+
}, listener::onFailure);
132+
ActionListener<IndexAnomalyDetectorResponse> actionListener = wrapActionListener(internalListener, actionResponse -> {
133+
IndexAnomalyDetectorResponse response = IndexAnomalyDetectorResponse
134+
.fromActionResponse(actionResponse, this.namedWriteableRegistry);
135+
return response;
136+
});
137+
return actionListener;
138+
}
139+
140+
private ActionListener<JobResponse> jobResponseActionListener(ActionListener<JobResponse> listener) {
141+
ActionListener<JobResponse> internalListener = ActionListener
142+
.wrap(jobResponse -> { listener.onResponse(jobResponse); }, listener::onFailure);
143+
ActionListener<JobResponse> actionListener = wrapActionListener(internalListener, actionResponse -> {
144+
JobResponse response = JobResponse.fromActionResponse(actionResponse, this.namedWriteableRegistry);
145+
return response;
146+
});
147+
return actionListener;
148+
}
149+
110150
private <T extends ActionResponse> ActionListener<T> wrapActionListener(
111151
final ActionListener<T> listener,
112152
final Function<ActionResponse, T> recreate

src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.opensearch.cluster.service.ClusterService;
3333
import org.opensearch.common.inject.Inject;
3434
import org.opensearch.common.settings.Settings;
35+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
3536
import org.opensearch.core.xcontent.NamedXContentRegistry;
3637
import org.opensearch.timeseries.transport.BaseJobTransportAction;
3738
import org.opensearch.transport.TransportService;
@@ -47,7 +48,8 @@ public AnomalyDetectorJobTransportAction(
4748
ClusterService clusterService,
4849
Settings settings,
4950
NamedXContentRegistry xContentRegistry,
50-
ADIndexJobActionHandler adIndexJobActionHandler
51+
ADIndexJobActionHandler adIndexJobActionHandler,
52+
NamedWriteableRegistry namedWriteableRegistry
5153
) {
5254
super(
5355
transportService,
@@ -64,7 +66,8 @@ public AnomalyDetectorJobTransportAction(
6466
AnomalyDetector.class,
6567
adIndexJobActionHandler,
6668
Clock.systemUTC(), // inject cannot find clock due to OS limitation
67-
AnomalyDetector.class
69+
AnomalyDetector.class,
70+
namedWriteableRegistry
6871
);
6972
}
7073
}

src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorRequest.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.common.unit.TimeValue;
2424
import org.opensearch.core.common.io.stream.StreamInput;
2525
import org.opensearch.core.common.io.stream.StreamOutput;
26+
import org.opensearch.index.seqno.SequenceNumbers;
2627
import org.opensearch.rest.RestRequest;
2728

2829
public class IndexAnomalyDetectorRequest extends ActionRequest implements DocRequest {
@@ -50,10 +51,10 @@ public IndexAnomalyDetectorRequest(StreamInput in) throws IOException {
5051
detector = new AnomalyDetector(in);
5152
method = in.readEnum(RestRequest.Method.class);
5253
requestTimeout = in.readTimeValue();
53-
maxSingleEntityAnomalyDetectors = in.readInt();
54-
maxMultiEntityAnomalyDetectors = in.readInt();
55-
maxAnomalyFeatures = in.readInt();
56-
maxCategoricalFields = in.readInt();
54+
maxSingleEntityAnomalyDetectors = in.readOptionalInt();
55+
maxMultiEntityAnomalyDetectors = in.readOptionalInt();
56+
maxAnomalyFeatures = in.readOptionalInt();
57+
maxCategoricalFields = in.readOptionalInt();
5758
}
5859

5960
public IndexAnomalyDetectorRequest(
@@ -83,6 +84,22 @@ public IndexAnomalyDetectorRequest(
8384
this.maxCategoricalFields = maxCategoricalFields;
8485
}
8586

87+
public IndexAnomalyDetectorRequest(String detectorID, AnomalyDetector detector, RestRequest.Method method) {
88+
this(
89+
detectorID,
90+
SequenceNumbers.UNASSIGNED_SEQ_NO,
91+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
92+
WriteRequest.RefreshPolicy.IMMEDIATE,
93+
detector,
94+
method,
95+
TimeValue.timeValueSeconds(60),
96+
null,
97+
null,
98+
null,
99+
null
100+
);
101+
}
102+
86103
public String getDetectorID() {
87104
return detectorID;
88105
}
@@ -137,10 +154,10 @@ public void writeTo(StreamOutput out) throws IOException {
137154
detector.writeTo(out);
138155
out.writeEnum(method);
139156
out.writeTimeValue(requestTimeout);
140-
out.writeInt(maxSingleEntityAnomalyDetectors);
141-
out.writeInt(maxMultiEntityAnomalyDetectors);
142-
out.writeInt(maxAnomalyFeatures);
143-
out.writeInt(maxCategoricalFields);
157+
out.writeOptionalInt(maxSingleEntityAnomalyDetectors);
158+
out.writeOptionalInt(maxMultiEntityAnomalyDetectors);
159+
out.writeOptionalInt(maxAnomalyFeatures);
160+
out.writeOptionalInt(maxCategoricalFields);
144161
}
145162

146163
@Override
@@ -162,4 +179,32 @@ public String index() {
162179
public String id() {
163180
return detectorID;
164181
}
182+
183+
public static IndexAnomalyDetectorRequest fromActionRequest(
184+
final ActionRequest actionRequest,
185+
org.opensearch.core.common.io.stream.NamedWriteableRegistry namedWriteableRegistry
186+
) {
187+
if (actionRequest instanceof IndexAnomalyDetectorRequest) {
188+
return (IndexAnomalyDetectorRequest) actionRequest;
189+
}
190+
191+
try (
192+
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
193+
org.opensearch.core.common.io.stream.OutputStreamStreamOutput osso =
194+
new org.opensearch.core.common.io.stream.OutputStreamStreamOutput(baos)
195+
) {
196+
actionRequest.writeTo(osso);
197+
try (
198+
org.opensearch.core.common.io.stream.StreamInput input = new org.opensearch.core.common.io.stream.InputStreamStreamInput(
199+
new java.io.ByteArrayInputStream(baos.toByteArray())
200+
);
201+
org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput namedInput =
202+
new org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput(input, namedWriteableRegistry)
203+
) {
204+
return new IndexAnomalyDetectorRequest(namedInput);
205+
}
206+
} catch (java.io.IOException e) {
207+
throw new IllegalArgumentException("failed to parse ActionRequest into IndexAnomalyDetectorRequest", e);
208+
}
209+
}
165210
}

src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorResponse.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,17 @@
1111

1212
package org.opensearch.ad.transport;
1313

14+
import java.io.ByteArrayInputStream;
15+
import java.io.ByteArrayOutputStream;
1416
import java.io.IOException;
17+
import java.io.UncheckedIOException;
1518

1619
import org.opensearch.ad.model.AnomalyDetector;
1720
import org.opensearch.core.action.ActionResponse;
21+
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
22+
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
23+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
24+
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
1825
import org.opensearch.core.common.io.stream.StreamInput;
1926
import org.opensearch.core.common.io.stream.StreamOutput;
2027
import org.opensearch.core.rest.RestStatus;
@@ -81,4 +88,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
8188
.field(RestHandlerUtils._PRIMARY_TERM, primaryTerm)
8289
.endObject();
8390
}
91+
92+
public static IndexAnomalyDetectorResponse fromActionResponse(
93+
ActionResponse actionResponse,
94+
NamedWriteableRegistry namedWriteableRegistry
95+
) {
96+
if (actionResponse instanceof IndexAnomalyDetectorResponse) {
97+
return (IndexAnomalyDetectorResponse) actionResponse;
98+
}
99+
100+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
101+
actionResponse.writeTo(osso);
102+
try (
103+
StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()));
104+
NamedWriteableAwareStreamInput namedWriteableAwareInput = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry)
105+
) {
106+
return new IndexAnomalyDetectorResponse(namedWriteableAwareInput);
107+
}
108+
} catch (IOException e) {
109+
throw new UncheckedIOException("failed to parse ActionResponse into IndexAnomalyDetectorResponse", e);
110+
}
111+
}
84112
}

0 commit comments

Comments
 (0)