Skip to content

Commit 1ff7d59

Browse files
authored
Merge branch 'main' into test-integ-branch
Signed-off-by: Rajat Gupta <[email protected]>
2 parents 8d5e2fc + f6d6aa6 commit 1ff7d59

File tree

45 files changed

+1476
-66
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1476
-66
lines changed

CHANGELOG-3.0.md

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
2323
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
2424
- Added integ tests for systemd configs ([#17410](https://github.com/opensearch-project/OpenSearch/pull/17410))
25+
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
26+
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))
27+
2528

2629
### Dependencies
2730
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
1818
import org.opensearch.index.query.RangeQueryBuilder;
19+
import org.opensearch.indices.pollingingest.PollingIngestStats;
1920
import org.opensearch.plugins.PluginInfo;
2021
import org.opensearch.test.OpenSearchIntegTestCase;
2122
import org.junit.Assert;
@@ -75,6 +76,11 @@ public void testKafkaIngestion() {
7576
refresh("test");
7677
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
7778
assertThat(response.getHits().getTotalHits().value(), is(1L));
79+
PollingIngestStats stats = client().admin().indices().prepareStats("test").get().getIndex("test").getShards()[0]
80+
.getPollingIngestStats();
81+
assertNotNull(stats);
82+
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
83+
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2L));
7884
});
7985
}
8086

server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ private void assertOnGoingRecoveryState(
250250
assertThat(state.getStage(), not(equalTo(Stage.DONE)));
251251
}
252252

253-
private void slowDownRecovery(ByteSizeValue shardSize) {
253+
public void slowDownRecovery(ByteSizeValue shardSize) {
254254
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
255255
assertTrue(
256256
client().admin()
@@ -528,7 +528,7 @@ public void testRerouteRecovery() throws Exception {
528528
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
529529
indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
530530
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
531-
}, TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(500));
531+
}, TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(500));
532532

533533
logger.info("--> request recoveries");
534534
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java

+24
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111
import org.opensearch.cluster.metadata.IndexMetadata;
1212
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.core.common.unit.ByteSizeUnit;
14+
import org.opensearch.core.common.unit.ByteSizeValue;
1315
import org.opensearch.index.IndexModule;
1416
import org.opensearch.index.IndexSettings;
1517
import org.opensearch.indices.recovery.IndexRecoveryIT;
18+
import org.opensearch.indices.recovery.RecoverySettings;
1619
import org.opensearch.indices.replication.common.ReplicationType;
1720
import org.opensearch.test.OpenSearchIntegTestCase;
1821
import org.hamcrest.Matcher;
@@ -22,6 +25,7 @@
2225

2326
import java.nio.file.Path;
2427

28+
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
2529
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
2630

2731
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -54,6 +58,26 @@ public Settings indexSettings() {
5458
.build();
5559
}
5660

61+
@Override
62+
public void slowDownRecovery(ByteSizeValue shardSize) {
63+
logger.info("--> shardSize: " + shardSize);
64+
long chunkSize = Math.max(1, shardSize.getBytes() / 50);
65+
assertTrue(
66+
client().admin()
67+
.cluster()
68+
.prepareUpdateSettings()
69+
.setTransientSettings(
70+
Settings.builder()
71+
// one chunk per sec..
72+
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
73+
// small chunks
74+
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
75+
)
76+
.get()
77+
.isAcknowledged()
78+
);
79+
}
80+
5781
@After
5882
public void teardown() {
5983
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.opensearch.index.seqno.SeqNoStats;
5757
import org.opensearch.index.shard.IndexShard;
5858
import org.opensearch.indices.IndicesService;
59+
import org.opensearch.indices.pollingingest.PollingIngestStats;
5960
import org.opensearch.node.NodeService;
6061
import org.opensearch.threadpool.ThreadPool;
6162
import org.opensearch.transport.TransportRequest;
@@ -210,15 +211,18 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
210211
CommitStats commitStats;
211212
SeqNoStats seqNoStats;
212213
RetentionLeaseStats retentionLeaseStats;
214+
PollingIngestStats pollingIngestStats;
213215
try {
214216
commitStats = indexShard.commitStats();
215217
seqNoStats = indexShard.seqNoStats();
216218
retentionLeaseStats = indexShard.getRetentionLeaseStats();
219+
pollingIngestStats = indexShard.pollingIngestStats();
217220
} catch (final AlreadyClosedException e) {
218221
// shard is closed - no stats is fine
219222
commitStats = null;
220223
seqNoStats = null;
221224
retentionLeaseStats = null;
225+
pollingIngestStats = null;
222226
}
223227
shardsStats.add(
224228
new ShardStats(
@@ -227,7 +231,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
227231
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, commonStatsFlags),
228232
commitStats,
229233
seqNoStats,
230-
retentionLeaseStats
234+
retentionLeaseStats,
235+
pollingIngestStats
231236
)
232237
);
233238
}

server/src/main/java/org/opensearch/action/admin/indices/stats/ShardStats.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.action.admin.indices.stats;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.cluster.routing.ShardRouting;
3637
import org.opensearch.common.Nullable;
3738
import org.opensearch.common.annotation.PublicApi;
@@ -44,6 +45,7 @@
4445
import org.opensearch.index.seqno.RetentionLeaseStats;
4546
import org.opensearch.index.seqno.SeqNoStats;
4647
import org.opensearch.index.shard.ShardPath;
48+
import org.opensearch.indices.pollingingest.PollingIngestStats;
4749

4850
import java.io.IOException;
4951

@@ -65,6 +67,9 @@ public class ShardStats implements Writeable, ToXContentFragment {
6567
@Nullable
6668
private RetentionLeaseStats retentionLeaseStats;
6769

70+
@Nullable
71+
private PollingIngestStats pollingIngestStats;
72+
6873
/**
6974
* Gets the current retention lease stats.
7075
*
@@ -87,6 +92,9 @@ public ShardStats(StreamInput in) throws IOException {
8792
isCustomDataPath = in.readBoolean();
8893
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
8994
retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new);
95+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
96+
pollingIngestStats = in.readOptionalWriteable(PollingIngestStats::new);
97+
}
9098
}
9199

92100
public ShardStats(
@@ -95,7 +103,8 @@ public ShardStats(
95103
final CommonStats commonStats,
96104
final CommitStats commitStats,
97105
final SeqNoStats seqNoStats,
98-
final RetentionLeaseStats retentionLeaseStats
106+
final RetentionLeaseStats retentionLeaseStats,
107+
final PollingIngestStats pollingIngestStats
99108
) {
100109
this.shardRouting = routing;
101110
this.dataPath = shardPath.getRootDataPath().toString();
@@ -105,6 +114,7 @@ public ShardStats(
105114
this.commonStats = commonStats;
106115
this.seqNoStats = seqNoStats;
107116
this.retentionLeaseStats = retentionLeaseStats;
117+
this.pollingIngestStats = pollingIngestStats;
108118
}
109119

110120
/**
@@ -128,6 +138,11 @@ public SeqNoStats getSeqNoStats() {
128138
return this.seqNoStats;
129139
}
130140

141+
@Nullable
142+
public PollingIngestStats getPollingIngestStats() {
143+
return this.pollingIngestStats;
144+
}
145+
131146
public String getDataPath() {
132147
return dataPath;
133148
}
@@ -150,6 +165,9 @@ public void writeTo(StreamOutput out) throws IOException {
150165
out.writeBoolean(isCustomDataPath);
151166
out.writeOptionalWriteable(seqNoStats);
152167
out.writeOptionalWriteable(retentionLeaseStats);
168+
if (out.getVersion().onOrAfter((Version.V_3_0_0))) {
169+
out.writeOptionalWriteable(pollingIngestStats);
170+
}
153171
}
154172

155173
@Override
@@ -171,6 +189,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
171189
if (retentionLeaseStats != null) {
172190
retentionLeaseStats.toXContent(builder, params);
173191
}
192+
if (pollingIngestStats != null) {
193+
pollingIngestStats.toXContent(builder, params);
194+
}
174195
builder.startObject(Fields.SHARD_PATH);
175196
builder.field(Fields.STATE_PATH, statePath);
176197
builder.field(Fields.DATA_PATH, dataPath);

server/src/main/java/org/opensearch/action/admin/indices/stats/TransportIndicesStatsAction.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.index.shard.IndexShard;
5353
import org.opensearch.index.shard.ShardNotFoundException;
5454
import org.opensearch.indices.IndicesService;
55+
import org.opensearch.indices.pollingingest.PollingIngestStats;
5556
import org.opensearch.threadpool.ThreadPool;
5657
import org.opensearch.transport.TransportService;
5758

@@ -141,16 +142,27 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
141142
CommitStats commitStats;
142143
SeqNoStats seqNoStats;
143144
RetentionLeaseStats retentionLeaseStats;
145+
PollingIngestStats pollingIngestStats;
144146
try {
145147
commitStats = indexShard.commitStats();
146148
seqNoStats = indexShard.seqNoStats();
147149
retentionLeaseStats = indexShard.getRetentionLeaseStats();
150+
pollingIngestStats = indexShard.pollingIngestStats();
148151
} catch (final AlreadyClosedException e) {
149152
// shard is closed - no stats is fine
150153
commitStats = null;
151154
seqNoStats = null;
152155
retentionLeaseStats = null;
156+
pollingIngestStats = null;
153157
}
154-
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats, commitStats, seqNoStats, retentionLeaseStats);
158+
return new ShardStats(
159+
indexShard.routingEntry(),
160+
indexShard.shardPath(),
161+
commonStats,
162+
commitStats,
163+
seqNoStats,
164+
retentionLeaseStats,
165+
pollingIngestStats
166+
);
155167
}
156168
}

server/src/main/java/org/opensearch/index/engine/Engine.java

+8
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import org.opensearch.index.translog.Translog;
9494
import org.opensearch.index.translog.TranslogDeletionPolicy;
9595
import org.opensearch.index.translog.TranslogManager;
96+
import org.opensearch.indices.pollingingest.PollingIngestStats;
9697
import org.opensearch.search.suggest.completion.CompletionStats;
9798

9899
import java.io.Closeable;
@@ -946,6 +947,13 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
946947
return stats;
947948
}
948949

950+
/**
951+
* @return Stats for pull-based ingestion.
952+
*/
953+
public PollingIngestStats pollingIngestStats() {
954+
return null;
955+
}
956+
949957
protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
950958
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
951959
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.index.translog.TranslogStats;
3030
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
3131
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
32+
import org.opensearch.indices.pollingingest.PollingIngestStats;
3233
import org.opensearch.indices.pollingingest.StreamPoller;
3334

3435
import java.io.IOException;
@@ -288,4 +289,9 @@ protected TranslogManager createTranslogManager(
288289
protected Map<String, String> commitDataAsMap() {
289290
return commitDataAsMap(indexWriter);
290291
}
292+
293+
@Override
294+
public PollingIngestStats pollingIngestStats() {
295+
return streamPoller.getStats();
296+
}
291297
}

server/src/main/java/org/opensearch/index/query/AbstractQueryBuilder.java

+24
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,30 @@ protected AbstractQueryBuilder(StreamInput in) throws IOException {
8686
queryName = in.readOptionalString();
8787
}
8888

89+
/**
90+
* Check the input parameters of filter function.
91+
* @param filter filter to combine with current query builder
92+
* @return true if parameters are valid. Returns false when the filter is null.
93+
*/
94+
public static boolean validateFilterParams(QueryBuilder filter) {
95+
return filter != null;
96+
}
97+
98+
/**
99+
* Combine filter with current query builder
100+
* @param filter filter to combine with current query builder
101+
* @return query builder with filter combined
102+
*/
103+
public QueryBuilder filter(QueryBuilder filter) {
104+
if (validateFilterParams(filter) == false) {
105+
return this;
106+
}
107+
final BoolQueryBuilder modifiedQB = new BoolQueryBuilder();
108+
modifiedQB.must(this);
109+
modifiedQB.filter(filter);
110+
return modifiedQB;
111+
}
112+
89113
@Override
90114
public final void writeTo(StreamOutput out) throws IOException {
91115
out.writeFloat(boost);

server/src/main/java/org/opensearch/index/query/BoolQueryBuilder.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,15 @@ public List<QueryBuilder> must() {
135135

136136
/**
137137
* Adds a query that <b>must</b> appear in the matching documents but will
138-
* not contribute to scoring. No {@code null} value allowed.
138+
* not contribute to scoring. If null value passed, then do nothing and return.
139+
* @param filter the filter to add to the current ConstantScoreQuery
140+
* @return query builder with filter combined
139141
*/
140-
public BoolQueryBuilder filter(QueryBuilder queryBuilder) {
141-
if (queryBuilder == null) {
142-
throw new IllegalArgumentException("inner bool query clause cannot be null");
142+
public BoolQueryBuilder filter(QueryBuilder filter) {
143+
if (validateFilterParams(filter) == false) {
144+
return this;
143145
}
144-
filterClauses.add(queryBuilder);
146+
filterClauses.add(filter);
145147
return this;
146148
}
147149

server/src/main/java/org/opensearch/index/query/ConstantScoreQueryBuilder.java

+16
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,22 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep
101101
builder.endObject();
102102
}
103103

104+
/**
105+
* Adds a filter to the current ConstantScoreQuery.
106+
* @param filter the filter to add to the current ConstantScoreQuery
107+
* @return query builder with filter combined
108+
*/
109+
public ConstantScoreQueryBuilder filter(QueryBuilder filter) {
110+
if (validateFilterParams(filter) == false) {
111+
return this;
112+
}
113+
QueryBuilder filteredFilterBuilder = filterBuilder.filter(filter);
114+
if (filteredFilterBuilder != filterBuilder) {
115+
return new ConstantScoreQueryBuilder(filteredFilterBuilder);
116+
}
117+
return this;
118+
}
119+
104120
public static ConstantScoreQueryBuilder fromXContent(XContentParser parser) throws IOException {
105121
QueryBuilder query = null;
106122
boolean queryFound = false;

server/src/main/java/org/opensearch/index/query/QueryBuilder.java

+12
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@
4747
@PublicApi(since = "1.0.0")
4848
public interface QueryBuilder extends NamedWriteable, ToXContentObject, Rewriteable<QueryBuilder> {
4949

50+
/**
51+
* This function combines a filter with a query builder. If the query builder itself has
52+
* a filter we will combine the filter and return the query builder itself.
53+
* If not we will use a bool query builder to combine the query builder and
54+
* the filter and then return the bool query builder.
55+
* If the filter is null we simply return the query builder without any operation.
56+
*
57+
* @param filter The null filter to be added to the existing filter.
58+
* @return A QueryBuilder with the filter added to the existing filter.
59+
*/
60+
QueryBuilder filter(QueryBuilder filter);
61+
5062
/**
5163
* Converts this QueryBuilder to a lucene {@link Query}.
5264
* Returns {@code null} if this query should be ignored in the context of

0 commit comments

Comments
 (0)