Skip to content

Commit 4af6137

Browse files
committed
Convert agentic query translator processor to system-generated processor
Signed-off-by: Owais <[email protected]>
1 parent caf333d commit 4af6137

File tree

6 files changed

+204
-161
lines changed

6 files changed

+204
-161
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- [Semantic Field] Support the sparse two phase processor for the semantic field.
1515
- [Stats] Add stats for agentic query and agentic query translator processor.
1616
- [Agentic Search] Adds validations and logging for agentic query
17+
- [Agentic Search] Convert agentic query translator processor to system-generated processor
1718

1819
### Bug Fixes
1920

src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
110110
import org.opensearch.search.pipeline.SearchRequestProcessor;
111111
import org.opensearch.search.pipeline.SearchResponseProcessor;
112+
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
112113
import org.opensearch.search.query.QueryPhaseSearcher;
113114
import org.opensearch.threadpool.ExecutorBuilder;
114115
import org.opensearch.threadpool.FixedExecutorBuilder;
@@ -326,7 +327,15 @@ public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchReques
326327
NeuralQueryEnricherProcessor.TYPE,
327328
new NeuralQueryEnricherProcessor.Factory(),
328329
NeuralSparseTwoPhaseProcessor.TYPE,
329-
new NeuralSparseTwoPhaseProcessor.Factory(),
330+
new NeuralSparseTwoPhaseProcessor.Factory()
331+
);
332+
}
333+
334+
@Override
335+
public Map<String, SystemGeneratedProcessor.SystemGeneratedFactory<SearchRequestProcessor>> getSystemGeneratedRequestProcessors(
336+
Parameters parameters
337+
) {
338+
return Map.of(
330339
AgenticQueryTranslatorProcessor.TYPE,
331340
new AgenticQueryTranslatorProcessor.Factory(clientAccessor, xContentRegistry, settingsAccessor)
332341
);

src/main/java/org/opensearch/neuralsearch/processor/AgenticQueryTranslatorProcessor.java

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package org.opensearch.neuralsearch.processor;
66

77
import com.google.gson.Gson;
8+
import lombok.AllArgsConstructor;
89
import lombok.extern.log4j.Log4j2;
910
import org.opensearch.action.search.SearchRequest;
1011
import org.opensearch.common.xcontent.XContentType;
@@ -20,7 +21,8 @@
2021
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
2122
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
2223
import org.opensearch.search.builder.SearchSourceBuilder;
23-
import org.opensearch.search.pipeline.AbstractProcessor;
24+
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
25+
import org.opensearch.search.pipeline.ProcessorGenerationContext;
2426
import org.opensearch.search.pipeline.Processor;
2527
import org.opensearch.search.pipeline.SearchRequestProcessor;
2628
import org.opensearch.search.pipeline.PipelineProcessingContext;
@@ -31,29 +33,29 @@
3133
import java.util.Locale;
3234
import java.util.Map;
3335

34-
import static org.opensearch.ingest.ConfigurationUtils.readStringProperty;
35-
3636
@Log4j2
37-
public class AgenticQueryTranslatorProcessor extends AbstractProcessor implements SearchRequestProcessor {
37+
public class AgenticQueryTranslatorProcessor implements SearchRequestProcessor, SystemGeneratedProcessor {
3838

3939
public static final String TYPE = "agentic_query_translator";
4040
private static final int MAX_AGENT_RESPONSE_SIZE = 10_000;
4141
private final MLCommonsClientAccessor mlClient;
42-
private final String agentId;
4342
private final NamedXContentRegistry xContentRegistry;
44-
private static final Gson gson = new Gson();;
43+
private final String tag;
44+
private final String description;
45+
private final boolean ignoreFailure;
46+
private static final Gson gson = new Gson();
4547

4648
AgenticQueryTranslatorProcessor(
4749
String tag,
4850
String description,
4951
boolean ignoreFailure,
5052
MLCommonsClientAccessor mlClient,
51-
String agentId,
5253
NamedXContentRegistry xContentRegistry
5354
) {
54-
super(tag, description, ignoreFailure);
55+
this.tag = tag;
56+
this.description = description;
57+
this.ignoreFailure = ignoreFailure;
5558
this.mlClient = mlClient;
56-
this.agentId = agentId;
5759
this.xContentRegistry = xContentRegistry;
5860
}
5961

@@ -80,12 +82,7 @@ public void processRequestAsync(
8082

8183
// Validate that agentic query is used alone without other search features
8284
if (hasOtherSearchFeatures(sourceBuilder)) {
83-
String errorMessage = String.format(
84-
Locale.ROOT,
85-
"Agentic search blocked - Invalid usage with other search features - Agent ID: [%s], Query: [%s]",
86-
agentId,
87-
agenticQuery.getQueryText()
88-
);
85+
String errorMessage = "Agentic search blocked - Invalid usage with other search features";
8986
requestListener.onFailure(new IllegalArgumentException(errorMessage));
9087
return;
9188
}
@@ -109,6 +106,7 @@ private void executeAgentAsync(
109106
ActionListener<SearchRequest> requestListener
110107
) {
111108
Map<String, String> parameters = new HashMap<>();
109+
String agentId = agenticQuery.getAgentId();
112110
parameters.put("query_text", agenticQuery.getQueryText());
113111

114112
// Get index mapping from the search request
@@ -131,22 +129,14 @@ private void executeAgentAsync(
131129

132130
// Validate response size to prevent memory exhaustion
133131
if (agentResponse == null) {
134-
String errorMessage = String.format(
135-
Locale.ROOT,
136-
"Agentic search failed - Null response from agent - Agent ID: [%s], Query: [%s]",
137-
agentId,
138-
agenticQuery.getQueryText()
139-
);
140-
throw new IllegalArgumentException(errorMessage);
132+
throw new IllegalArgumentException("Agentic search failed - Null response from agent");
141133
}
142134

143135
if (agentResponse.length() > MAX_AGENT_RESPONSE_SIZE) {
144136
String errorMessage = String.format(
145137
Locale.ROOT,
146-
"Agentic search blocked - Response size exceeded limit - Agent ID: [%s], Size: [%d], Query: [%s]. Maximum allowed size is %d characters.",
147-
agentId,
138+
"Agentic search blocked - Response size exceeded limit. Size: [%d], Maximum allowed size is %d characters.",
148139
agentResponse.length(),
149-
agenticQuery.getQueryText(),
150140
MAX_AGENT_RESPONSE_SIZE
151141
);
152142
throw new IllegalArgumentException(errorMessage);
@@ -161,22 +151,11 @@ private void executeAgentAsync(
161151

162152
requestListener.onResponse(request);
163153
} catch (IOException e) {
164-
String errorMessage = String.format(
165-
Locale.ROOT,
166-
"Agentic search failed - Parse error - Agent ID: [%s], Error: [%s]",
167-
agentId,
168-
e.getMessage()
169-
);
154+
String errorMessage = String.format(Locale.ROOT, "Agentic search failed - Parse error: [%s]", e.getMessage());
170155
requestListener.onFailure(new IOException(errorMessage, e));
171156
}
172157
}, e -> {
173-
String errorMessage = String.format(
174-
Locale.ROOT,
175-
"Agentic search failed - Agent execution error - Agent ID: [%s], Query: [%s], Error: [%s]",
176-
agentId,
177-
agenticQuery.getQueryText(),
178-
e.getMessage()
179-
);
158+
String errorMessage = String.format(Locale.ROOT, "Agentic search failed - Agent execution error: [%s]", e.getMessage());
180159
requestListener.onFailure(new RuntimeException(errorMessage, e));
181160
}));
182161
}
@@ -191,19 +170,44 @@ public String getType() {
191170
return TYPE;
192171
}
193172

194-
public static class Factory implements Processor.Factory<SearchRequestProcessor> {
173+
@Override
174+
public String getTag() {
175+
return this.tag;
176+
}
177+
178+
@Override
179+
public String getDescription() {
180+
return this.description;
181+
}
182+
183+
@Override
184+
public boolean isIgnoreFailure() {
185+
return this.ignoreFailure;
186+
}
187+
188+
@Override
189+
public ExecutionStage getExecutionStage() {
190+
// Execute before user-defined processors as agentic query would be replaced by the new DSL
191+
return ExecutionStage.PRE_USER_DEFINED;
192+
}
193+
194+
@AllArgsConstructor
195+
public static class Factory implements SystemGeneratedProcessor.SystemGeneratedFactory<SearchRequestProcessor> {
195196
private final MLCommonsClientAccessor mlClient;
196197
private final NamedXContentRegistry xContentRegistry;
197198
private final NeuralSearchSettingsAccessor settingsAccessor;
198199

199-
public Factory(
200-
MLCommonsClientAccessor mlClient,
201-
NamedXContentRegistry xContentRegistry,
202-
NeuralSearchSettingsAccessor settingsAccessor
203-
) {
204-
this.mlClient = mlClient;
205-
this.xContentRegistry = xContentRegistry;
206-
this.settingsAccessor = settingsAccessor;
200+
@Override
201+
public boolean shouldGenerate(ProcessorGenerationContext context) {
202+
SearchRequest searchRequest = context.searchRequest();
203+
if (searchRequest == null || searchRequest.source() == null) {
204+
return false;
205+
}
206+
207+
boolean hasAgenticQuery = searchRequest.source().query() instanceof AgenticSearchQueryBuilder;
208+
log.debug("Query type: {}, hasAgenticQuery: {}", searchRequest.source().query().getClass().getSimpleName(), hasAgenticQuery);
209+
210+
return hasAgenticQuery;
207211
}
208212

209213
@Override
@@ -221,11 +225,7 @@ public AgenticQueryTranslatorProcessor create(
221225
"Agentic search is currently disabled. Enable it using the 'plugins.neural_search.agentic_search_enabled' setting."
222226
);
223227
}
224-
String agentId = readStringProperty(TYPE, tag, config, "agent_id");
225-
if (agentId == null || agentId.trim().isEmpty()) {
226-
throw new IllegalArgumentException("agent_id is required for agentic_query_translator processor");
227-
}
228-
return new AgenticQueryTranslatorProcessor(tag, description, ignoreFailure, mlClient, agentId, xContentRegistry);
228+
return new AgenticQueryTranslatorProcessor(tag, description, ignoreFailure, mlClient, xContentRegistry);
229229
}
230230
}
231231
}

src/main/java/org/opensearch/neuralsearch/query/AgenticSearchQueryBuilder.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class AgenticSearchQueryBuilder extends AbstractQueryBuilder<Agenti
4949
public static final String NAME = "agentic";
5050
public static final ParseField QUERY_TEXT_FIELD = new ParseField("query_text");
5151
public static final ParseField QUERY_FIELDS = new ParseField("query_fields");
52+
public static final ParseField AGENT_ID_FIELD = new ParseField("agent_id");
5253

5354
// Regex patterns for sanitizing query text
5455
private static final String SYSTEM_INSTRUCTION_PATTERN = "(?i)\\b(system|instruction|prompt)\\s*:";
@@ -57,6 +58,7 @@ public final class AgenticSearchQueryBuilder extends AbstractQueryBuilder<Agenti
5758
private static final int MAX_QUERY_LENGTH = 1000;
5859
public String queryText;
5960
public List<String> queryFields;
61+
public String agentId;
6062

6163
// setting accessor to retrieve agentic search feature flag
6264
private static NeuralSearchSettingsAccessor SETTINGS_ACCESSOR;
@@ -69,6 +71,7 @@ public AgenticSearchQueryBuilder(StreamInput in) throws IOException {
6971
super(in);
7072
this.queryText = in.readString();
7173
this.queryFields = in.readOptionalStringList();
74+
this.agentId = in.readOptionalString();
7275
}
7376

7477
public String getQueryText() {
@@ -79,6 +82,10 @@ public List<String> getQueryFields() {
7982
return queryFields;
8083
}
8184

85+
public String getAgentId() {
86+
return agentId;
87+
}
88+
8289
@Override
8390
protected void doWriteTo(StreamOutput out) throws IOException {
8491
// feature flag check
@@ -89,6 +96,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
8996
}
9097
out.writeString(this.queryText);
9198
out.writeOptionalStringCollection(this.queryFields);
99+
out.writeOptionalString(this.agentId);
92100
}
93101

94102
@Override
@@ -106,6 +114,9 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws
106114
if (Objects.nonNull(queryFields) && !queryFields.isEmpty()) {
107115
xContentBuilder.field(QUERY_FIELDS.getPreferredName(), queryFields);
108116
}
117+
if (Objects.nonNull(agentId)) {
118+
xContentBuilder.field(AGENT_ID_FIELD.getPreferredName(), agentId);
119+
}
109120
xContentBuilder.endObject();
110121
}
111122

@@ -115,6 +126,7 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws
115126
* {
116127
* "agentic": {
117128
* "query_text": "string",
129+
* "agent_id": "string"
118130
* "query_fields": ["string", "string"..]
119131
* }
120132
* }
@@ -133,6 +145,8 @@ public static AgenticSearchQueryBuilder fromXContent(XContentParser parser) thro
133145
} else if (token.isValue()) {
134146
if (QUERY_TEXT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
135147
agenticSearchQueryBuilder.queryText = parser.text();
148+
} else if (AGENT_ID_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
149+
agenticSearchQueryBuilder.agentId = parser.text();
136150
} else {
137151
throw new ParsingException(parser.getTokenLocation(), "Unknown field [" + currentFieldName + "]");
138152
}
@@ -157,6 +171,9 @@ public static AgenticSearchQueryBuilder fromXContent(XContentParser parser) thro
157171
throw new ParsingException(parser.getTokenLocation(), "[" + QUERY_TEXT_FIELD.getPreferredName() + "] is required");
158172
}
159173

174+
if (agenticSearchQueryBuilder.agentId == null || agenticSearchQueryBuilder.agentId.trim().isEmpty()) {
175+
throw new ParsingException(parser.getTokenLocation(), "[" + AGENT_ID_FIELD.getPreferredName() + "] is required");
176+
}
160177
// Sanitize query text to prevent prompt injection
161178
agenticSearchQueryBuilder.queryText = sanitizeQueryText(agenticSearchQueryBuilder.queryText);
162179

@@ -171,8 +188,15 @@ protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws
171188

172189
@Override
173190
protected Query doToQuery(QueryShardContext context) throws IOException {
191+
// This should not be reached if the system-generated processor is working correctly
192+
if (agentId == null || agentId.trim().isEmpty()) {
193+
throw new IllegalStateException(
194+
"Agentic search query requires an agent_id. Provide agent_id in the query or ensure the agentic_query_translator processor is configured."
195+
);
196+
}
174197
throw new IllegalStateException(
175-
"Agentic search query must be used as top-level query, not nested inside other queries. Should be used with agentic_query_translator search processor"
198+
"Agentic search query must be processed by the agentic_query_translator system processor before query execution. "
199+
+ "Ensure the neural search plugin is properly installed and the agentic search feature is enabled."
176200
);
177201
}
178202

@@ -183,12 +207,13 @@ protected boolean doEquals(AgenticSearchQueryBuilder obj) {
183207
EqualsBuilder equalsBuilder = new EqualsBuilder();
184208
equalsBuilder.append(queryText, obj.queryText);
185209
equalsBuilder.append(queryFields, obj.queryFields);
210+
equalsBuilder.append(agentId, obj.agentId);
186211
return equalsBuilder.isEquals();
187212
}
188213

189214
@Override
190215
protected int doHashCode() {
191-
return new HashCodeBuilder().append(queryText).append(queryFields).toHashCode();
216+
return new HashCodeBuilder().append(queryText).append(queryFields).append(agentId).toHashCode();
192217
}
193218

194219
@Override

0 commit comments

Comments
 (0)