Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass in order terms as sorted to TermInSetQuery() #17714

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615))
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
- Faster `terms_query` with already sorted terms ([#17714](https://github.com/opensearch-project/OpenSearch/pull/17714))
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))

### Changed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.mapper;

import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.util.BytesRef;

import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Purposed for passing terms into {@link TermInSetQuery}.
* If the given terms are sorted already, it wrap it with a SortedSet stub.
* Otherwise, it passes terms as list.
*/
public class BytesRefsCollectionBuilder implements Consumer<BytesRef>, Supplier<Collection<BytesRef>> {

/**
* Strategy for building BytesRef collection.
* */
protected interface CollectorStrategy extends Function<BytesRef, CollectorStrategy>, Supplier<Collection<BytesRef>> {}

protected final List<BytesRef> terms = new ArrayList<>();
protected CollectorStrategy delegate = createStartStrategy();

@Override
public void accept(BytesRef bytesRef) {
delegate = delegate.apply(bytesRef);
}

@Override
public Collection<BytesRef> get() {
Collection<BytesRef> result = delegate.get();
delegate = createFrozenStrategy(result);
return result;
}

protected CollectorStrategy createStartStrategy() {
return new CollectorStrategy() {
@Override
public CollectorStrategy apply(BytesRef firstBytes) {
terms.add(firstBytes); // firstly, just store
return createSortedStrategy(firstBytes);
}

@Override
public Collection<BytesRef> get() {
return terms; // empty list
}
};
}

protected CollectorStrategy createSortedStrategy(BytesRef firstBytes) {
return new CollectorStrategy() {
BytesRef prev = firstBytes;

@Override
public CollectorStrategy apply(BytesRef bytesRef) {
terms.add(bytesRef);
if (bytesRef.compareTo(prev) >= 0) { // keep checking sorted
prev = bytesRef;
return this;
} else { // isn't sorted
return createNotSortedStrategy();
}
}

@Override
public Collection<BytesRef> get() {
return new SortedBytesSet(terms);
}
};
}

protected CollectorStrategy createNotSortedStrategy() {
return new CollectorStrategy() {
@Override
public CollectorStrategy apply(BytesRef bytesRef) { // just storing
terms.add(bytesRef);
return this;
}

@Override
public Collection<BytesRef> get() {
return terms;
}
};
}

protected CollectorStrategy createFrozenStrategy(Collection<BytesRef> result) {
return new CollectorStrategy() {

@Override
public CollectorStrategy apply(BytesRef bytesRef) {
throw new IllegalStateException("already build");
}

@Override
public Collection<BytesRef> get() {
return result;
}
};
}

/**
* {@link SortedSet<BytesRef>} for passing into TermInSetQuery()
* */
protected static class SortedBytesSet extends AbstractSet<BytesRef> implements SortedSet<BytesRef> {

private final List<BytesRef> bytesRefs;

public SortedBytesSet(List<BytesRef> bytesRefs) {
this.bytesRefs = bytesRefs;
}

@Override
public Iterator<BytesRef> iterator() {
return bytesRefs.iterator();
}

@Override
public int size() {
return bytesRefs.size();
}

@Override
public Comparator<? super BytesRef> comparator() {
return null;
}

@Override
public SortedSet<BytesRef> subSet(BytesRef fromElement, BytesRef toElement) {
throw new UnsupportedOperationException();
}

@Override
public SortedSet<BytesRef> headSet(BytesRef toElement) {
throw new UnsupportedOperationException();
}

@Override
public SortedSet<BytesRef> tailSet(BytesRef fromElement) {
throw new UnsupportedOperationException();
}

@Override
public BytesRef first() {
throw new UnsupportedOperationException();
}

@Override
public BytesRef last() {
throw new UnsupportedOperationException();
}

/**
* Dedicated for {@link TermInSetQuery#TermInSetQuery(String, Collection)}.
*/
@Override
public <T> T[] toArray(T[] a) {
return bytesRefs.toArray(a);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
Expand Down Expand Up @@ -166,15 +164,15 @@ public Query existsQuery(QueryShardContext context) {
@Override
public Query termsQuery(List<?> values, QueryShardContext context) {
failIfNotIndexed();
Collection<BytesRef> bytesRefs = new ArrayList<>(values.size());
BytesRefsCollectionBuilder bytesRefs = new BytesRefsCollectionBuilder();
for (int i = 0; i < values.size(); i++) {
Object idObject = values.get(i);
if (idObject instanceof BytesRef) {
idObject = ((BytesRef) idObject).utf8ToString();
}
bytesRefs.add(Uid.encodeId(idObject.toString()));
bytesRefs.accept(Uid.encodeId(idObject.toString()));
}
return new TermInSetQuery(MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE, name(), bytesRefs);
return new TermInSetQuery(MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE, name(), bytesRefs.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -449,23 +447,26 @@ public Query termsQuery(List<?> values, QueryShardContext context) {
if (!context.keywordFieldIndexOrDocValuesEnabled()) {
return super.termsQuery(values, context);
}
Collection<BytesRef> iBytesRefs = new ArrayList<>(values.size());
Collection<BytesRef> dVByteRefs = new ArrayList<>(values.size());
BytesRefsCollectionBuilder iBytesRefs = new BytesRefsCollectionBuilder();
BytesRefsCollectionBuilder dVByteRefs = new BytesRefsCollectionBuilder();
for (int i = 0; i < values.size(); i++) {
iBytesRefs.add(indexedValueForSearch(values.get(i)));
dVByteRefs.add(indexedValueForSearch(rewriteForDocValue(values.get(i))));
BytesRef idxBytes = indexedValueForSearch(values.get(i));
iBytesRefs.accept(idxBytes);
BytesRef dvBytes = indexedValueForSearch(rewriteForDocValue(values.get(i)));
dVByteRefs.accept(dvBytes);
}
Query indexQuery = new TermInSetQuery(MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE, name(), iBytesRefs);
Query dvQuery = new TermInSetQuery(MultiTermQuery.DOC_VALUES_REWRITE, name(), dVByteRefs);
Query indexQuery = new TermInSetQuery(MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE, name(), iBytesRefs.get());
Query dvQuery = new TermInSetQuery(MultiTermQuery.DOC_VALUES_REWRITE, name(), dVByteRefs.get());
return new IndexOrDocValuesQuery(indexQuery, dvQuery);
}
// if we only have doc_values enabled, we construct a new query with doc_values re-written
if (hasDocValues()) {
Collection<BytesRef> bytesRefs = new ArrayList<>(values.size());
BytesRefsCollectionBuilder bytesCollector = new BytesRefsCollectionBuilder();
for (int i = 0; i < values.size(); i++) {
bytesRefs.add(indexedValueForSearch(rewriteForDocValue(values.get(i))));
BytesRef dvBytes = indexedValueForSearch(rewriteForDocValue(values.get(i)));
bytesCollector.accept(dvBytes);
}
return new TermInSetQuery(MultiTermQuery.DOC_VALUES_REWRITE, name(), bytesRefs);
return new TermInSetQuery(MultiTermQuery.DOC_VALUES_REWRITE, name(), bytesCollector.get());
}
// has index enabled, we're going to return the query as is
return super.termsQuery(values, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.opensearch.common.lucene.search.AutomatonQueries;
import org.opensearch.index.query.QueryShardContext;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -96,11 +94,12 @@ public Query termQuery(Object value, QueryShardContext context) {
@Override
public Query termsQuery(List<?> values, QueryShardContext context) {
failIfNotIndexed();
Collection<BytesRef> bytesRefs = new ArrayList<>(values.size());
BytesRefsCollectionBuilder bytesCollector = new BytesRefsCollectionBuilder();
for (int i = 0; i < values.size(); i++) {
bytesRefs.add(indexedValueForSearch(values.get(i)));
BytesRef elem = indexedValueForSearch(values.get(i));
bytesCollector.accept(elem);
}
return new TermInSetQuery(MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE, name(), bytesRefs);
return new TermInSetQuery(MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE, name(), bytesCollector.get());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.mapper;

import org.apache.lucene.util.BytesRef;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.stream.Stream;

public class BytesRefsCollectionBuilderTests extends OpenSearchTestCase {

public void testBuildSortedNotSorted() {
String[] seedStrings = generateRandomStringArray(10, 10, false, true);
List<BytesRef> bytesRefList = Arrays.stream(seedStrings).map(BytesRef::new).toList();
List<BytesRef> sortedBytesRefs = bytesRefList.stream().sorted().toList();

Collection<BytesRef> sortedSet = assertCollectionBuilt(sortedBytesRefs);
assertCollectionBuilt(bytesRefList);

assertTrue(sortedSet instanceof SortedSet<BytesRef>);
assertNull(((SortedSet<BytesRef>) sortedSet).comparator());
}

public void testBuildFooBar() {
String[] reverseOrderStrings = new String[] { "foo", "bar" };
List<BytesRef> bytesRefList = Arrays.stream(reverseOrderStrings).map(BytesRef::new).toList();
List<BytesRef> sortedBytesRefs = bytesRefList.stream().sorted().toList();

Collection<BytesRef> sortedSet = assertCollectionBuilt(sortedBytesRefs);
Collection<BytesRef> reverseList = assertCollectionBuilt(bytesRefList);

assertTrue(sortedSet instanceof SortedSet<BytesRef>);
assertNull(((SortedSet<BytesRef>) sortedSet).comparator());

assertTrue(reverseList instanceof List<BytesRef>);
}

public void testFrozen() {
BytesRefsCollectionBuilder builder = new BytesRefsCollectionBuilder();
String[] seedStrings = generateRandomStringArray(5, 10, false, true);
Arrays.stream(seedStrings).map(BytesRef::new).forEachOrdered(builder);
Collection<BytesRef> bytesRefCollection = builder.get();
assertNotNull(bytesRefCollection);
assertEquals(seedStrings.length, bytesRefCollection.size());
assertThrows(IllegalStateException.class, () -> builder.accept(new BytesRef("illegal state")));
assertSame(bytesRefCollection, builder.get());
}

private static Collection<BytesRef> assertCollectionBuilt(List<BytesRef> sortedBytesRefs) {
BytesRefsCollectionBuilder builder = new BytesRefsCollectionBuilder();
sortedBytesRefs.stream().forEachOrdered(builder);
Collection<BytesRef> bytesRefCollection = builder.get();
assertEquals(bytesRefCollection.size(), sortedBytesRefs.size());
for (Iterator<BytesRef> iterator = bytesRefCollection.iterator(), iterator2 = sortedBytesRefs.iterator(); iterator.hasNext()
|| iterator2.hasNext();) {
assertTrue(iterator.next().bytesEquals(iterator2.next()));
}
return bytesRefCollection;
}

public void testCoverUnsupported() {
BytesRefsCollectionBuilder builder = new BytesRefsCollectionBuilder();
Stream.of("in", "order").map(BytesRef::new).forEachOrdered(builder);
SortedSet<BytesRef> bytesRefCollection = (SortedSet<BytesRef>) builder.get();
assertThrows(UnsupportedOperationException.class, () -> bytesRefCollection.subSet(new BytesRef("a"), new BytesRef("z")));
assertThrows(UnsupportedOperationException.class, () -> bytesRefCollection.headSet(new BytesRef("a")));
assertThrows(UnsupportedOperationException.class, () -> bytesRefCollection.tailSet(new BytesRef("a")));
assertThrows(UnsupportedOperationException.class, bytesRefCollection::first);
assertThrows(UnsupportedOperationException.class, bytesRefCollection::last);
}
}
Loading
Loading