Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rule based auto tagging] Add in-memory rule processing service #17365

Merged
merged 19 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615))
- Add FilterFieldType for developers who want to wrap MappedFieldType ([#17627](https://github.com/opensearch-project/OpenSearch/pull/17627))
- [Rule Based Auto-tagging] Add in-memory rule processing service ([#17365](https://github.com/opensearch-project/OpenSearch/pull/17365))
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
Expand Down
20 changes: 20 additions & 0 deletions libs/autotagging-commons/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

dependencies {
api 'org.apache.commons:commons-collections4:4.4'
api project(":server")

testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-core'
}
}

tasks.named("dependencyLicenses").configure {
mapping from: /commons-collections.*/, to: 'commons-collections'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rule;

import org.opensearch.autotagging.Attribute;
import org.opensearch.autotagging.FeatureType;
import org.opensearch.autotagging.Rule;
import org.opensearch.rule.attribute_extractor.AttributeExtractor;
import org.opensearch.rule.storage.AttributeValueStore;
import org.opensearch.rule.storage.AttributeValueStoreFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
* This class is responsible for managing in-memory view of Rules and Find matching Rule for the request
* Each auto-tagging feature should use a separate instance of this class as this avoid potential concurrency overhead
* in case of dynamic updates and attribute sharing scenarios
*/
public class InMemoryRuleProcessingService {

private final AttributeValueStoreFactory attributeValueStoreFactory;

/**
* Constrcutor
* @param featureType
* @param attributeValueStoreSupplier
*/
public InMemoryRuleProcessingService(
FeatureType featureType,
Supplier<AttributeValueStore<String, String>> attributeValueStoreSupplier
) {
attributeValueStoreFactory = new AttributeValueStoreFactory(featureType, attributeValueStoreSupplier);
}

/**
* Adds the rule to in-memory view
* @param rule to be added
*/
public void add(final Rule rule) {
perform(rule, this::addOperation);
}

/**
* Removes the rule from in-memory view
* @param rule to be removed
*/
public void remove(final Rule rule) {
perform(rule, this::removeOperation);
}

private void perform(Rule rule, BiConsumer<Map.Entry<Attribute, Set<String>>, Rule> ruleOperation) {
for (Map.Entry<Attribute, Set<String>> attributeEntry : rule.getAttributeMap().entrySet()) {
ruleOperation.accept(attributeEntry, rule);
}
}

private void removeOperation(Map.Entry<Attribute, Set<String>> attributeEntry, Rule rule) {
AttributeValueStore<String, String> valueStore = attributeValueStoreFactory.getAttributeValueStore(attributeEntry.getKey());
for (String value : attributeEntry.getValue()) {
valueStore.remove(value);
}
}

private void addOperation(Map.Entry<Attribute, Set<String>> attributeEntry, Rule rule) {
AttributeValueStore<String, String> valueStore = attributeValueStoreFactory.getAttributeValueStore(attributeEntry.getKey());
for (String value : attributeEntry.getValue()) {
valueStore.put(value, rule.getFeatureValue());
}
}

/**
* Evaluates the label for the current request. It finds the matches for each attribute value and then it is an
* intersection of all the matches
* @param attributeExtractors list of extractors which are used to get the attribute values to find the
* matching rule
* @return a label if there is unique label otherwise empty
*/
public Optional<String> evaluateLabel(List<AttributeExtractor<String>> attributeExtractors) {
assert attributeValueStoreFactory != null;
Optional<String> result = Optional.empty();
for (AttributeExtractor<String> attributeExtractor : attributeExtractors) {
AttributeValueStore<String, String> valueStore = attributeValueStoreFactory.getAttributeValueStore(
attributeExtractor.getAttribute()
);
for (String value : attributeExtractor.extract()) {
Optional<String> possibleMatch = valueStore.get(value);

if (possibleMatch.isEmpty()) {
return Optional.empty();
}

if (result.isEmpty()) {
result = possibleMatch;
} else {
boolean isThePossibleMatchEqualResult = possibleMatch.get().equals(result.get());
if (!isThePossibleMatchEqualResult) {
return Optional.empty();
}
}
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rule.attribute_extractor;

import org.opensearch.autotagging.Attribute;

/**
* This interface defines the contract for extracting the attributes for Rule based auto-tagging feature
* @param <V>
*/
public interface AttributeExtractor<V> {
/**
* This method returns the Attribute which it is responsible for extracting
* @return attribute
*/
Attribute getAttribute();

/**
* This method returns the attribute values in context of the current request
* @return attribute value
*/
Iterable<V> extract();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains feature attribute extractor interface and its implementations
*/
package org.opensearch.rule.attribute_extractor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Rule based auto-tagging generic constructs
*/
package org.opensearch.rule;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.plugin.wlm.rule.storage;
package org.opensearch.rule.storage;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rule.storage;

import org.opensearch.autotagging.Attribute;
import org.opensearch.autotagging.FeatureType;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

/**
* Factory class for AttributeValueStore per feature type as two feature types can potentially share same attribute
*/
public class AttributeValueStoreFactory {
private final Map<String, AttributeValueStore<String, String>> attributeValueStores = new HashMap<>();

/**
* Constructor
* @param featureType is the feature which are using rule based auto tagging
* @param attributeValueStoreSupplier supplies the feature level AttributeValueStore instance
*/
public AttributeValueStoreFactory(FeatureType featureType, Supplier<AttributeValueStore<String, String>> attributeValueStoreSupplier) {
for (Attribute attribute : featureType.getAllowedAttributesRegistry().values()) {
attributeValueStores.put(attribute.getName(), attributeValueStoreSupplier.get());
}
}

/**
* Factory method which returns the {@link AttributeValueStore} for the given attribute
* @param attribute
* @return
*/
public AttributeValueStore<String, String> getAttributeValueStore(final Attribute attribute) {
final String attributeName = attribute.getName();
if (attributeValueStores == null) {
throw new IllegalStateException("AttributeValueStoreFactory is not initialized yet.");

Check warning on line 43 in libs/autotagging-commons/src/main/java/org/opensearch/rule/storage/AttributeValueStoreFactory.java

View check run for this annotation

Codecov / codecov/patch

libs/autotagging-commons/src/main/java/org/opensearch/rule/storage/AttributeValueStoreFactory.java#L43

Added line #L43 was not covered by tests
}

if (!attributeValueStores.containsKey(attributeName)) {
throw new IllegalArgumentException("[" + attributeName + "] is not a valid attribute for enabled features.");
}

return attributeValueStores.get(attributeName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
* compatible open source license.
*/

package org.opensearch.plugin.wlm.rule.storage;
package org.opensearch.rule.storage;

import org.apache.commons.collections4.trie.PatriciaTrie;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* This is a patricia trie based implementation of AttributeValueStore
Expand All @@ -20,7 +21,10 @@
* ref: https://commons.apache.org/proper/commons-collections/javadocs/api-4.4/org/apache/commons/collections4/trie/PatriciaTrie.html
*/
public class DefaultAttributeValueStore<K extends String, V> implements AttributeValueStore<K, V> {
PatriciaTrie<V> trie;
private final PatriciaTrie<V> trie;
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private static final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

/**
* Default constructor
Expand All @@ -39,34 +43,48 @@ public DefaultAttributeValueStore(PatriciaTrie<V> trie) {

@Override
public void put(K key, V value) {
trie.put(key, value);
writeLock.lock();
try {
trie.put(key, value);
} finally {
writeLock.unlock();
}
}

@Override
public void remove(String key) {
trie.remove(key);
writeLock.lock();
try {
trie.remove(key);
} finally {
writeLock.unlock();
}
}

@Override
public Optional<V> get(String key) {
/**
* Since we are inserting prefixes into the trie and searching for larger strings
* It is important to find the largest matching prefix key in the trie efficiently
* Hence we can do binary search
*/
final String longestMatchingPrefix = findLongestMatchingPrefix(key);
readLock.lock();
try {
/**
* Since we are inserting prefixes into the trie and searching for larger strings
* It is important to find the largest matching prefix key in the trie efficiently
* Hence we can do binary search
*/
final String longestMatchingPrefix = findLongestMatchingPrefix(key);

/**
* Now there are following cases for this prefix
* 1. There is a Rule which has this prefix as one of the attribute values. In this case we should return the
* Rule's label otherwise send empty
*/
for (Map.Entry<String, V> possibleMatch : trie.prefixMap(longestMatchingPrefix).entrySet()) {
if (key.startsWith(possibleMatch.getKey())) {
return Optional.of(possibleMatch.getValue());
/**
* Now there are following cases for this prefix
* 1. There is a Rule which has this prefix as one of the attribute values. In this case we should return the
* Rule's label otherwise send empty
*/
for (Map.Entry<String, V> possibleMatch : trie.prefixMap(longestMatchingPrefix).entrySet()) {
if (key.startsWith(possibleMatch.getKey())) {
return Optional.of(possibleMatch.getValue());
}
}
} finally {
readLock.unlock();
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
/**
* This package contains interfaces and implementations for in memory rule storage mechanisms
*/
package org.opensearch.plugin.wlm.rule.storage;
package org.opensearch.rule.storage;
Loading
Loading