diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2892fdf4..cc5f8da9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [0.23.0] - 2025-11-04
+### Added
+- Support for following methods:
+ - List Curated Rules
+ - Get Curated Rule
+ - Get Curated Rule By Name
+ - List Curated Rule Sets
+ - Get Curated Rule Set
+ - List Curated Rule Set Categories
+ - Get Curated Rule Set Category
+ - List Curated Rule Set Deployments
+ - Get Curated Rule Set Deployment
+ - Get Curated Rule Set Deployment By Name
+ - Updated Curated Rule Set Deployment
+
## [0.22.0] - 2025-10-30
### Added
- Support for entity import method
diff --git a/CLI.md b/CLI.md
index e9e207d7..b33254f2 100644
--- a/CLI.md
+++ b/CLI.md
@@ -569,6 +569,63 @@ secops rule test --file "/path/to/rule.yaral" --time-window 24 > udm_events.json
The `rule test` command outputs UDM events as pure JSON objects that can be piped to a file or processed by other tools. This makes it easy to integrate with other systems or perform additional analysis on the events.
+### Curated Rule Set Management
+
+List all curated rules:
+```bash
+secops curated-rule rule list
+```
+Get curated rules:
+```bash
+# Get rule by UUID
+curated-rule rule get --id "ur_ttp_GCP_ServiceAPIDisable"
+
+# Get rule by name
+curated-rule rule get --name "GCP Service API Disable"
+
+```
+
+List all curated rule sets:
+```bash
+secops curated-rule rule-set list
+```
+
+Get specific curated rule set details:
+```bash
+# Get curated rule set by UUID
+secops curated-rule rule-set get --id "f5533b66-9327-9880-93e6-75a738ac2345"
+```
+
+List all curated rule set categories:
+```bash
+secops curated-rule rule-set-category list
+```
+
+Get specific curated rule set category details:
+```bash
+# Get curated rule set category by UUID
+secops curated-rule rule-set-category get --id "db1114d4-569b-5f5d-0fb4-f65aaa766c92"
+```
+
+List all curated rule set deployments:
+```bash
+secops curated-rule rule-set-deployment list
+```
+
+Get specific curated rule set deployment details:
+```bash
+# Get curated rule set deployment by UUID
+secops curated-rule rule-set-deployment get --id "f5533b66-9327-9880-93e6-75a738ac2345"
+
+# Get curated rule set deployment by name
+secops curated-rule rule-set-deployment get --name "Active Breach Priority Host Indicators"
+```
+
+Update curated rule set deployment:
+```bash
+secops curated-rule rule-set-deployment update --category-id "db1114d4-569b-5f5d-0fb4-f65aaa766c92" --rule-set-id "7e52cd71-03c6-97d2-ffcb-b8d7159e08e1" --precision precise --enabled false --alerting false
+```
+
### Alert Management
Get alerts:
diff --git a/README.md b/README.md
index 59bf1b20..87e7a204 100644
--- a/README.md
+++ b/README.md
@@ -1712,11 +1712,82 @@ for rule_alert in alerts_response.get('ruleAlerts', []):
If `tooManyAlerts` is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.
-### Rule Sets
+### Curated Rule Sets
-Manage curated rule sets:
+Query curated rules:
```python
+# List all curated rules
+rules = chronicle.list_curated_rules()
+for rule in rules:
+ rule_id = rule.get("name", "").split("/")[-1]
+ display_name = rule.get("description")
+ description = rule.get("description")
+ print(f"Rule: {display_name}, Description: {description}")
+
+# Get a curated rule
+rule = chronicle.get_curated_rule("ur_ttp_lol_Atbroker")
+
+# Get a curated rule set by display name
+# NOTE: This is a linear scan of all curated rules which may be inefficient for large rule sets.
+rule_set = chronicle.get_curated_rule_by_name("Atbroker.exe Abuse")
+```
+
+Query curated rule sets:
+
+```python
+# List all curated rule sets
+rule_sets = chronicle.list_curated_rule_sets()
+for rule_set in rule_sets:
+ rule_set_id = rule_set.get("name", "").split("/")[-1]
+ display_name = rule_set.get("displayName")
+ print(f"Rule Set: {display_name}, ID: {rule_set_id}")
+
+# Get a curated rule set by ID
+rule_set = chronicle.get_curated_rule_set("00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93")
+```
+
+Query curated rule set categories:
+
+```python
+# List all curated rule set categories
+rule_set_categories = chronicle.list_curated_rule_set_categories()
+for rule_set_category in rule_set_categories:
+ rule_set_category_id = rule_set_category.get("name", "").split("/")[-1]
+ display_name = rule_set_category.get("displayName")
+ print(f"Rule Set Category: {display_name}, ID: {rule_set_category_id}")
+
+# Get a curated rule set category by ID
+rule_set_category = chronicle.get_curated_rule_set_category("110fa43d-7165-2355-1985-a63b7cdf90e8")
+```
+
+Manage curated rule set deployments (turn alerting on or off (either precise or broad) for curated rule sets):
+
+```python
+# List all curated rule set deployments
+rule_set_deployments = chronicle.list_curated_rule_set_deployments()
+for rs_deployment in rule_set_deployments:
+ rule_set_id = rs_deployment.get("name", "").split("/")[-3]
+ category_id = rs_deployment.get("name", "").split("/")[-5]
+ deployment_status = rs_deployment.get("name", "").split("/")[-1]
+ display_name = rs_deployment.get("displayName")
+ alerting = rs_deployment.get("alerting", False)
+ print(
+ f"Rule Set: {display_name},"
+ f"Rule Set ID: {rule_set_id}",
+ f"Category ID: {category_id}",
+ f"Precision: {deployment_status}",
+ f"Alerting: {alerting}",
+ )
+
+# Get curated rule set deployment by ID
+rule_set_deployment = chronicle.get_curated_rule_set_deployment("00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93")
+
+# Get curated rule set deployment by rule set display name
+# NOTE: This is a linear scan of all curated rules which may be inefficient for large rule sets.
+rule_set_deployment = chronicle.get_curated_rule_set_deployment_by_name("Azure - Network")
+
+# Update multiple curated rule set deployments
# Define deployments for rule sets
deployments = [
{
@@ -1728,8 +1799,17 @@ deployments = [
}
]
-# Update rule set deployments
chronicle.batch_update_curated_rule_set_deployments(deployments)
+
+# Update a single curated rule set deployment
+chronicle.update_curated_rule_set_deployment(
+ category_id="category-uuid",
+ rule_set_id="ruleset-uuid",
+ precision="broad",
+ enabled=True,
+ alerting=False
+)
+
```
### Rule Validation
diff --git a/api_module_mapping.md b/api_module_mapping.md
index 9ce0296c..daeb31d2 100644
--- a/api_module_mapping.md
+++ b/api_module_mapping.md
@@ -84,15 +84,17 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/
|bigQueryAccess.provide |v1alpha| | |
|bigQueryExport.provision |v1alpha| | |
|cases.countPriorities |v1alpha| | |
-|curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.batchUpdate|v1alpha|chronicle.rule_set.batch_update_curated_rule_set_deployments| |
-|curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.patch |v1alpha| | |
-|curatedRuleSetCategories.curatedRuleSets.get |v1alpha| | |
-|curatedRuleSetCategories.curatedRuleSets.list |v1alpha| | |
-|curatedRuleSetCategories.get |v1alpha| | |
-|curatedRuleSetCategories.list |v1alpha| | |
-|curatedRules.get |v1alpha| | |
-|curatedRules.list |v1alpha| | |
-|dashboardCharts.batchGet |v1alpha| | |
+|curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.batchUpdate | v1alpha | chronicle.rule_set.batch_update_curated_rule_set_deployments | |
+| curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.patch | v1alpha | chronicle.rule_set.update_curated_rule_set_deployment | secops curated-rule rule-set-deployment update |
+| curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.list | v1alpha | chronicle.rule_set.list_curated_rule_set_deployments | secops curated-rule rule-set-deployment list |
+| curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.get | v1alpha | chronicle.rule_set.get_curated_rule_set_deployment
chronicle.rule_set.get_curated_rule_set_deployment_by_name | secops curated-rule rule-set-deployment get |
+| curatedRuleSetCategories.curatedRuleSets.get | v1alpha | chronicle.rule_set.get_curated_rule_set | secops curated-rule rule-set get |
+| curatedRuleSetCategories.curatedRuleSets.list | v1alpha | chronicle.rule_set.list_curated_rule_sets | secops curated-rule rule-set list |
+| curatedRuleSetCategories.get | v1alpha | chronicle.rule_set.get_curated_rule_set_category | secops curated-rule rule-set-category get |
+| curatedRuleSetCategories.list | v1alpha | chronicle.rule_set.list_curated_rule_set_categories | secops curated-rule rule-set-category list |
+| curatedRules.get | v1alpha | chronicle.rule_set.get_curated_rule
chronicle.rule_set.get_curated_rule_by_name | secops curated-rule rule get |
+| curatedRules.list | v1alpha | chronicle.rule_set.list_curated_rules | secops curated-rule rule list |
+| dashboardCharts.batchGet |v1alpha| | |
|dashboardCharts.get |v1alpha|chronicle.dashboard.get_chart |secops dashboard get-chart |
|dashboardQueries.execute |v1alpha|chronicle.dashboard_query.execute_query |secops dashboard-query execute |
|dashboardQueries.get |v1alpha|chronicle.dashboard_query.get_execute_query |secops dashboard-query get |
diff --git a/examples/rule_set_example.py b/examples/rule_set_example.py
new file mode 100644
index 00000000..5014a8cc
--- /dev/null
+++ b/examples/rule_set_example.py
@@ -0,0 +1,727 @@
+#!/usr/bin/env python3
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Example script demonstrating Chronicle Rule Set functionality."""
+
+import argparse
+
+
+from secops.chronicle.client import ChronicleClient
+
+
+def get_client(project_id: str, customer_id: str, region: str):
+ """Initialize and return the Chronicle client.
+
+ Args:
+ project_id: Google Cloud Project ID
+ customer_id: Chronicle Customer ID (UUID)
+ region: Chronicle region (us or eu)
+
+ Returns:
+ Chronicle client instance
+ """
+ return ChronicleClient(
+ project_id=project_id, customer_id=customer_id, region=region
+ )
+
+
+def example_list_curated_rule_sets(chronicle):
+ """List all curated rule sets.
+
+ Args:
+ chronicle: ChronicleClient instance
+
+ Returns:
+ List of rule set IDs for further operations
+ """
+ print("\n=== List Curated Rule Sets ===")
+
+ try:
+ rule_sets = chronicle.list_curated_rule_sets(page_size=10)
+ print(f"\nFound {len(rule_sets)} curated rule sets")
+
+ # Return the first few rule sets for use in other examples
+ results = []
+ for i, rule_set in enumerate(rule_sets[:5]):
+ # Full name format: projects/PROJECT/locations/LOCATION/curatedRuleSetCategories/CATEGORY_ID/curatedRuleSets/RULE_SET_ID
+ name = rule_set.get("name", "")
+
+ # Extract rule set ID from the full name
+ rule_set_id = name.split("/")[-1] if name else ""
+
+ # Extract category ID from the full name
+ category_parts = name.split("/curatedRuleSets/")[0].split("/")
+ category_id = category_parts[-1] if len(category_parts) > 1 else ""
+
+ display_name = rule_set.get("displayName", "Unknown")
+ print(f"- {display_name}: {rule_set_id}")
+
+ results.append(
+ {
+ "name": name,
+ "rule_set_id": rule_set_id,
+ "category_id": category_id,
+ "display_name": display_name,
+ }
+ )
+
+ return results
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error listing curated rule sets: {e}")
+ return []
+
+
+def example_get_curated_rule_set(chronicle, rule_set_id):
+ """Get a specific curated rule set by ID.
+
+ Args:
+ chronicle: ChronicleClient instance
+ rule_set_id: ID of the rule set to get
+ """
+ print("\n=== Get Curated Rule Set ===")
+
+ try:
+ rule_set = chronicle.get_curated_rule_set(rule_set_id)
+ print("\nCurated Rule Set details:")
+ print(f"Name: {rule_set.get('name')}")
+ print(f"Display Name: {rule_set.get('displayName')}")
+ print(f"Description: {rule_set.get('description')}")
+ print(f"Category: {rule_set.get('ruleSetCategory')}")
+ print(f"Rules Count: {len(rule_set.get('ruleIds', []))}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule set: {e}")
+
+
+def example_list_curated_rule_set_categories(chronicle):
+ """List all curated rule set categories.
+
+ Args:
+ chronicle: ChronicleClient instance
+
+ Returns:
+ List of category IDs for further operations
+ """
+ print("\n=== List Curated Rule Set Categories ===")
+
+ try:
+ categories = chronicle.list_curated_rule_set_categories(page_size=10)
+ print(f"\nFound {len(categories)} curated rule set categories")
+
+ results = []
+ for i, category in enumerate(categories[:5]):
+ # Full name format: projects/PROJECT/locations/LOCATION/curatedRuleSetCategories/CATEGORY_ID
+ name = category.get("name", "")
+
+ # Extract category ID from the full name
+ category_id = name.split("/")[-1] if name else ""
+
+ display_name = category.get("displayName", "Unknown")
+ print(f"- {display_name}: {category_id}")
+
+ results.append(
+ {
+ "name": name,
+ "category_id": category_id,
+ "display_name": display_name,
+ }
+ )
+
+ return results
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error listing curated rule set categories: {e}")
+ return []
+
+
+def example_get_curated_rule_set_category(chronicle, category_id):
+ """Get a specific curated rule set category by ID.
+
+ Args:
+ chronicle: ChronicleClient instance
+ category_id: ID of the category to get
+ """
+ print("\n=== Get Curated Rule Set Category ===")
+
+ try:
+ category = chronicle.get_curated_rule_set_category(category_id)
+ print("\nCurated Rule Set Category details:")
+ print(f"Name: {category.get('name')}")
+ print(f"Display Name: {category.get('displayName')}")
+ print(f"Description: {category.get('description', 'No description')}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule set category: {e}")
+
+
+def example_list_curated_rules(chronicle):
+ """List all curated rules.
+
+ Args:
+ chronicle: ChronicleClient instance
+
+ Returns:
+ List of rule IDs for further operations
+ """
+ print("\n=== List Curated Rules ===")
+
+ try:
+ rules = chronicle.list_curated_rules(page_size=10)
+ print(f"\nFound {len(rules)} curated rules")
+
+ results = []
+ for i, rule in enumerate(rules[:5]):
+ # Full name format: projects/PROJECT/locations/LOCATION/curatedRules/RULE_ID
+ name = rule.get("name", "")
+
+ # Extract rule ID from the full name
+ rule_id = name.split("/")[-1] if name else ""
+
+ display_name = rule.get("displayName", "Unknown")
+ print(f"- {display_name}: {rule_id}")
+
+ results.append(
+ {"name": name, "rule_id": rule_id, "display_name": display_name}
+ )
+
+ return results
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error listing curated rules: {e}")
+ return []
+
+
+def example_get_curated_rule(chronicle, rule_id):
+ """Get a specific curated rule by ID.
+
+ Args:
+ chronicle: ChronicleClient instance
+ rule_id: ID of the rule to get
+ """
+ print("\n=== Get Curated Rule ===")
+
+ try:
+ rule = chronicle.get_curated_rule(rule_id)
+ print("\nCurated Rule details:")
+ print(f"Name: {rule.get('name')}")
+ print(f"Display Name: {rule.get('displayName')}")
+ print(f"Description: {rule.get('description')}")
+ print(f"Severity: {rule.get('severity')}")
+ print(f"MITRE ATT&CK Tactics: {rule.get('mitreTactics', [])}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule: {e}")
+
+
+def example_get_curated_rule_by_name(chronicle, display_name):
+ """Get a curated rule by display name.
+
+ Args:
+ chronicle: ChronicleClient instance
+ display_name: Display name of the rule to find
+ """
+ print("\n=== Get Curated Rule By Name ===")
+
+ try:
+ print(f"\nSearching for rule with display name: {display_name}")
+ rule = chronicle.get_curated_rule_by_name(display_name)
+ print("\nCurated Rule details:")
+ print(f"Name: {rule.get('name')}")
+ print(f"Display Name: {rule.get('displayName')}")
+ print(f"Description: {rule.get('description')}")
+ print(f"Severity: {rule.get('severity')}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule by name: {e}")
+
+
+def example_list_curated_rule_set_deployments(chronicle):
+ """List all curated rule set deployments.
+
+ Args:
+ chronicle: ChronicleClient instance
+
+ Returns:
+ Dictionary with rule set ID and deployment details
+ """
+ print("\n=== List Curated Rule Set Deployments ===")
+
+ try:
+ deployments = chronicle.list_curated_rule_set_deployments(
+ page_size=10, only_enabled=False
+ )
+ print(f"\nFound {len(deployments)} curated rule set deployments")
+
+ if deployments:
+ # Return the first deployment for use in other examples
+ deployment = deployments[0]
+
+ # Full name format: projects/PROJECT/locations/LOCATION/curatedRuleSetCategories/CATEGORY_ID/curatedRuleSets/RULE_SET_ID/curatedRuleSetDeployments/PRECISION
+ name = deployment.get("name", "")
+
+ # Parse name to extract IDs
+ parts = name.split("/")
+ precision = parts[-1] if len(parts) > 0 else "precise"
+
+ # The full rule set path is everything before /curatedRuleSetDeployments/{precision}
+ rule_set_path = "/".join(parts[:-2]) if len(parts) > 2 else ""
+
+ # Extract rule set ID - it's the part after the last /curatedRuleSets/ segment
+ rule_set_segments = rule_set_path.split("/curatedRuleSets/")
+ rule_set_id = (
+ rule_set_segments[-1] if len(rule_set_segments) > 1 else ""
+ )
+
+ # Extract category ID - it's the part after the last /curatedRuleSetCategories/ but before /curatedRuleSets/
+ if len(rule_set_segments) > 1:
+ category_path = rule_set_segments[0]
+ category_segments = category_path.split(
+ "/curatedRuleSetCategories/"
+ )
+ category_id = (
+ category_segments[-1] if len(category_segments) > 1 else ""
+ )
+ else:
+ category_id = ""
+
+ display_name = deployment.get("displayName", "Unknown")
+ print(f"- {display_name}")
+ print(f" Enabled: {deployment.get('enabled', False)}")
+ print(f" Alerting: {deployment.get('alerting', False)}")
+ print(f" Precision: {deployment.get('precision', 'Unknown')}")
+
+ return {
+ "name": name,
+ "rule_set_path": rule_set_path,
+ "rule_set_id": rule_set_id,
+ "category_id": category_id,
+ "display_name": display_name,
+ "precision": precision,
+ }
+ return None
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error listing curated rule set deployments: {e}")
+ return None
+
+
+def example_get_curated_rule_set_deployment(
+ chronicle, rule_set_id, precision="precise"
+):
+ """Get deployment status of a curated rule set by ID.
+
+ Args:
+ chronicle: ChronicleClient instance
+ rule_set_id: ID of the rule set
+ precision: Precision level ("precise" or "broad")
+ """
+ print("\n=== Get Curated Rule Set Deployment ===")
+
+ try:
+ print(f"\nGetting deployment for rule set ID: {rule_set_id}")
+ deployment = chronicle.get_curated_rule_set_deployment(
+ rule_set_id, precision
+ )
+ print("\nDeployment details:")
+ print(f"Name: {deployment.get('name')}")
+ print(f"Display Name: {deployment.get('displayName')}")
+ print(f"Enabled: {deployment.get('enabled', False)}")
+ print(f"Alerting: {deployment.get('alerting', False)}")
+ print(f"Precision: {deployment.get('precision')}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule set deployment: {e}")
+
+
+def example_get_curated_rule_set_deployment_by_name(
+ chronicle, display_name, precision="precise"
+):
+ """Get deployment status of a curated rule set by name.
+
+ Args:
+ chronicle: ChronicleClient instance
+ display_name: Display name of the rule set
+ precision: Precision level ("precise" or "broad")
+ """
+ print("\n=== Get Curated Rule Set Deployment By Name ===")
+
+ try:
+ print(f"\nGetting deployment for rule set: {display_name}")
+ deployment = chronicle.get_curated_rule_set_deployment_by_name(
+ display_name, precision
+ )
+ print("\nDeployment details:")
+ print(f"Name: {deployment.get('name')}")
+ print(f"Display Name: {deployment.get('displayName')}")
+ print(f"Enabled: {deployment.get('enabled', False)}")
+ print(f"Alerting: {deployment.get('alerting', False)}")
+ print(f"Precision: {deployment.get('precision')}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule set deployment by name: {e}")
+
+
+def example_update_curated_rule_set_deployment(
+ chronicle, category_id, rule_set_id, precision="precise"
+):
+ """Update deployment settings of a curated rule set.
+
+ Args:
+ chronicle: ChronicleClient instance
+ category_id: ID of the category
+ rule_set_id: ID of the rule set
+ precision: Precision level ("precise" or "broad")
+
+ Returns:
+ Original deployment status for later cleanup
+ """
+ print("\n=== Update Curated Rule Set Deployment ===")
+
+ try:
+ print(f"\nCategory ID: {category_id}")
+ print(f"Rule Set ID: {rule_set_id}")
+ print(f"Precision: {precision}")
+
+ # First get the current deployment state
+ current = chronicle.get_curated_rule_set_deployment(
+ rule_set_id, precision
+ )
+ original_state = {
+ "category_id": category_id,
+ "rule_set_id": rule_set_id,
+ "precision": precision,
+ "enabled": current.get("enabled", False),
+ "alerting": current.get("alerting", False),
+ }
+ print(
+ f"\nCurrent deployment state: Enabled={original_state['enabled']}, "
+ f"Alerting={original_state['alerting']}"
+ )
+
+ print(f"\nUpdating deployment for rule set ID: {rule_set_id}")
+
+ # Configuration for updating the deployment
+ deployment_config = {
+ "category_id": category_id,
+ "rule_set_id": rule_set_id,
+ "precision": precision,
+ "enabled": True, # Enable the rule set
+ "alerting": True, # Enable alerting for the rule set
+ }
+
+ # Update the deployment
+ updated = chronicle.update_curated_rule_set_deployment(
+ deployment_config
+ )
+
+ print("\nUpdated deployment details:")
+ print(f"Name: {updated.get('name')}")
+ print(f"Enabled: {updated.get('enabled', False)}")
+ print(f"Alerting: {updated.get('alerting', False)}")
+ print(f"Precision: {updated.get('precision')}")
+
+ return original_state
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error updating curated rule set deployment: {e}")
+ return None
+
+
+def example_cleanup_rule_set_deployment(
+ chronicle, category_id, rule_set_id, original_state
+):
+ """Revert deployment settings of a curated rule set to original state.
+
+ Args:
+ chronicle: ChronicleClient instance
+ category_id: ID of the category
+ rule_set_id: ID of the rule set
+ original_state: Dictionary containing original configuration and states
+ """
+ print("\n=== Cleanup: Revert Rule Set Deployment ===")
+
+ if not original_state:
+ print("No original state provided, cannot revert")
+ return
+
+ try:
+ # Get values from original_state, falling back to parameters if not present
+ category_id = original_state.get("category_id", category_id)
+ rule_set_id = original_state.get("rule_set_id", rule_set_id)
+ precision = original_state.get("precision", "precise")
+
+ print(f"\nReverting deployment for rule set ID: {rule_set_id}")
+ print(
+ f"Restoring to: Enabled={original_state.get('enabled', False)}, "
+ f"Alerting={original_state.get('alerting', False)}"
+ )
+
+ # Configuration for reverting the deployment
+ deployment_config = {
+ "category_id": category_id,
+ "rule_set_id": rule_set_id,
+ "precision": precision,
+ "enabled": original_state.get("enabled", False),
+ "alerting": original_state.get("alerting", False),
+ }
+
+ # Update the deployment back to original state
+ reverted = chronicle.update_curated_rule_set_deployment(
+ deployment_config
+ )
+
+ print("\nReverted deployment details:")
+ print(f"Name: {reverted.get('name')}")
+ print(f"Enabled: {reverted.get('enabled', False)}")
+ print(f"Alerting: {reverted.get('alerting', False)}")
+ print(f"Precision: {reverted.get('precision')}")
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error reverting curated rule set deployment: {e}")
+
+
+# Map of example functions
+EXAMPLES = {
+ "1": example_list_curated_rule_sets,
+ "2": example_get_curated_rule_set,
+ "3": example_list_curated_rule_set_categories,
+ "4": example_get_curated_rule_set_category,
+ "5": example_list_curated_rules,
+ "6": example_get_curated_rule,
+ "7": example_get_curated_rule_by_name,
+ "8": example_list_curated_rule_set_deployments,
+ "9": example_get_curated_rule_set_deployment,
+ "10": example_get_curated_rule_set_deployment_by_name,
+ "11": example_update_curated_rule_set_deployment,
+ "12": example_cleanup_rule_set_deployment,
+}
+
+
+def main():
+ """Main function to run examples."""
+ parser = argparse.ArgumentParser(
+ description="Run Chronicle Rule Set API examples"
+ )
+ parser.add_argument(
+ "--project_id", required=True, help="Google Cloud Project ID"
+ )
+ parser.add_argument(
+ "--customer_id", required=True, help="Chronicle Customer ID (UUID)"
+ )
+ parser.add_argument(
+ "--region", default="us", help="Chronicle region (us or eu)"
+ )
+ parser.add_argument(
+ "--example",
+ "-e",
+ help=(
+ "Example number to run (1-12). If not specified, runs all examples."
+ ),
+ )
+ parser.add_argument(
+ "--rule_name", help="Rule display name for get_by_name examples"
+ )
+ parser.add_argument(
+ "--rule_set_name", help="Rule set display name for get_by_name examples"
+ )
+
+ args = parser.parse_args()
+
+ # Initialize the client
+ chronicle = get_client(args.project_id, args.customer_id, args.region)
+
+ # Data needed across examples
+ rule_sets = None
+ categories = None
+ rules = None
+ deployment_info = None
+
+ if args.example:
+ if args.example not in EXAMPLES:
+ print(
+ "Invalid example number. "
+ f"Available examples: {', '.join(EXAMPLES.keys())}"
+ )
+ return
+
+ # Examples that don't need additional input
+ if args.example in ["1", "3", "5", "8"]:
+ if args.example == "1":
+ rule_sets = EXAMPLES[args.example](chronicle)
+ elif args.example == "3":
+ categories = EXAMPLES[args.example](chronicle)
+ elif args.example == "5":
+ rules = EXAMPLES[args.example](chronicle)
+ elif args.example == "8":
+ deployment_info = EXAMPLES[args.example](chronicle)
+
+ # Examples that need rule_set_id
+ elif args.example == "2":
+ if not rule_sets:
+ rule_sets = example_list_curated_rule_sets(chronicle)
+
+ if rule_sets:
+ EXAMPLES[args.example](chronicle, rule_sets[0]["rule_set_id"])
+
+ # Examples that need category_id
+ elif args.example == "4":
+ if not categories:
+ categories = example_list_curated_rule_set_categories(chronicle)
+
+ if categories:
+ EXAMPLES[args.example](chronicle, categories[0]["category_id"])
+
+ # Examples that need rule_id
+ elif args.example == "6":
+ if not rules:
+ rules = example_list_curated_rules(chronicle)
+
+ if rules:
+ EXAMPLES[args.example](chronicle, rules[0]["rule_id"])
+
+ # Examples that need rule_name
+ elif args.example == "7":
+ EXAMPLES[args.example](chronicle, args.rule_name)
+
+ # Examples that need rule_set_id and precision
+ elif args.example == "9":
+ if not rule_sets:
+ rule_sets = example_list_curated_rule_sets(chronicle)
+
+ if rule_sets:
+ EXAMPLES[args.example](chronicle, rule_sets[0]["rule_set_id"])
+
+ # Examples that need rule_set_name and precision
+ elif args.example == "10":
+ EXAMPLES[args.example](chronicle, args.rule_set_name)
+
+ # Examples that need category_id, rule_set_id and precision
+ elif args.example == "11":
+ if not rule_sets:
+ rule_sets = example_list_curated_rule_sets(chronicle)
+
+ if rule_sets:
+ original_state = EXAMPLES[args.example](
+ chronicle,
+ rule_sets[0]["category_id"],
+ rule_sets[0]["rule_set_id"],
+ )
+ # Perform cleanup after update
+ if original_state and args.example != "12":
+ example_cleanup_rule_set_deployment(
+ chronicle,
+ rule_sets[0]["category_id"],
+ rule_sets[0]["rule_set_id"],
+ original_state,
+ )
+ else:
+ # Run all examples in order
+ print("\nRunning all Rule Set examples...")
+
+ # Examples that return data we need for other examples
+ rule_sets = example_list_curated_rule_sets(chronicle)
+ categories = example_list_curated_rule_set_categories(chronicle)
+ rules = example_list_curated_rules(chronicle)
+ deployment_info = example_list_curated_rule_set_deployments(chronicle)
+
+ # If we have the needed data, run the dependent examples
+ if rule_sets and len(rule_sets) > 0:
+ print(
+ f"\nUsing rule set: {rule_sets[0]['display_name']} (ID: {rule_sets[0]['rule_set_id']})"
+ )
+ example_get_curated_rule_set(chronicle, rule_sets[0]["rule_set_id"])
+
+ if categories and len(categories) > 0:
+ print(
+ f"\nUsing category: {categories[0]['display_name']} (ID: {categories[0]['category_id']})"
+ )
+ example_get_curated_rule_set_category(
+ chronicle, categories[0]["category_id"]
+ )
+
+ if rules and len(rules) > 0:
+ print(
+ f"\nUsing rule: {rules[0]['display_name']} (ID: {rules[0]['rule_id']})"
+ )
+ example_get_curated_rule(chronicle, rules[0]["rule_id"])
+
+ # Examples that use display names (prioritize arguments, fallback to list results)
+ # For curated rule by name
+ if args.rule_name:
+ # Use the user-provided rule name
+ print(
+ f"\nLooking up rule by display name: {args.rule_name} (user-provided)"
+ )
+ rule_display_name = args.rule_name
+ elif rules and len(rules) > 0:
+ # Fallback: use the display name from the first rule in the list
+ rule_display_name = rules[0]["display_name"]
+ print(
+ f"\nLooking up rule by display name: {rule_display_name} (from list)"
+ )
+ else:
+ # Default fallback
+ rule_display_name = "Remote Code Execution via Web Request"
+ print(
+ f"\nLooking up rule by display name: {rule_display_name} (default)"
+ )
+
+ example_get_curated_rule_by_name(chronicle, rule_display_name)
+
+ # For curated rule set deployment by name
+ if args.rule_set_name:
+ # Use the user-provided rule set name
+ print(
+ f"\nLooking up rule set deployment by name: {args.rule_set_name} (user-provided)"
+ )
+ rule_set_display_name = args.rule_set_name
+ elif rule_sets and len(rule_sets) > 0:
+ # Fallback: use the display name from the first rule set in the list
+ rule_set_display_name = rule_sets[0]["display_name"]
+ print(
+ f"\nLooking up rule set deployment by name: {rule_set_display_name} (from list)"
+ )
+ else:
+ # Default fallback
+ rule_set_display_name = "Cloud Security"
+ print(
+ f"\nLooking up rule set deployment by name: {rule_set_display_name} (default)"
+ )
+
+ example_get_curated_rule_set_deployment_by_name(
+ chronicle, rule_set_display_name
+ )
+
+ # Examples that need data from other examples
+ if rule_sets and len(rule_sets) > 0:
+ example_get_curated_rule_set_deployment(
+ chronicle, rule_sets[0]["rule_set_id"]
+ )
+
+ # Update example only if we have all the data
+ if rule_sets and len(rule_sets) > 0:
+ print(
+ f"\nUpdating and then reverting rule set: {rule_sets[0]['display_name']}"
+ )
+ print(f"Category ID: {rule_sets[0]['category_id']}")
+ print(f"Rule set ID: {rule_sets[0]['rule_set_id']}")
+
+ original_state = example_update_curated_rule_set_deployment(
+ chronicle,
+ rule_sets[0]["category_id"],
+ rule_sets[0]["rule_set_id"],
+ )
+
+ # Cleanup after update
+ if original_state:
+ example_cleanup_rule_set_deployment(
+ chronicle,
+ rule_sets[0]["category_id"],
+ rule_sets[0]["rule_set_id"],
+ original_state,
+ )
+
+
+if __name__ == "__main__":
+ main()
diff --git a/pyproject.toml b/pyproject.toml
index bc681962..8e38e3e5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "secops"
-version = "0.22.0"
+version = "0.23.0"
description = "Python SDK for wrapping the Google SecOps API for common use cases"
readme = "README.md"
requires-python = ">=3.7"
diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py
index 58ea12bf..d6f66d23 100644
--- a/src/secops/chronicle/__init__.py
+++ b/src/secops/chronicle/__init__.py
@@ -133,7 +133,20 @@
update_rule_exclusion_deployment,
)
from secops.chronicle.rule_retrohunt import create_retrohunt, get_retrohunt
-from secops.chronicle.rule_set import batch_update_curated_rule_set_deployments
+from secops.chronicle.rule_set import (
+ batch_update_curated_rule_set_deployments,
+ list_curated_rule_sets,
+ list_curated_rule_set_categories,
+ list_curated_rules,
+ get_curated_rule,
+ get_curated_rule_set_category,
+ get_curated_rule_set,
+ list_curated_rule_set_deployments,
+ get_curated_rule_set_deployment,
+ get_curated_rule_set_deployment_by_name,
+ get_curated_rule_by_name,
+ update_curated_rule_set_deployment,
+)
from secops.chronicle.rule_validation import ValidationResult
from secops.chronicle.search import search_udm
from secops.chronicle.stats import get_stats
@@ -228,6 +241,17 @@
"get_retrohunt",
# Rule set operations
"batch_update_curated_rule_set_deployments",
+ "list_curated_rule_sets",
+ "list_curated_rule_set_categories",
+ "list_curated_rules",
+ "get_curated_rule",
+ "get_curated_rule_set_category",
+ "get_curated_rule_set",
+ "list_curated_rule_set_deployments",
+ "get_curated_rule_set_deployment",
+ "get_curated_rule_set_deployment_by_name",
+ "get_curated_rule_by_name",
+ "update_curated_rule_set_deployment",
# Native Dashboard
"add_chart",
"create_dashboard",
diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py
index a9f1d640..ec88989a 100644
--- a/src/secops/chronicle/client.py
+++ b/src/secops/chronicle/client.py
@@ -223,6 +223,17 @@
from secops.chronicle.rule_retrohunt import get_retrohunt as _get_retrohunt
from secops.chronicle.rule_set import (
batch_update_curated_rule_set_deployments as _batch_update_curated_rule_set_deployments, # pylint: disable=line-too-long
+ list_curated_rule_sets as _list_curated_rule_sets,
+ list_curated_rule_set_categories as _list_curated_rule_set_categories,
+ list_curated_rules as _list_curated_rules,
+ get_curated_rule as _get_curated_rule,
+ get_curated_rule_set_category as _get_curated_rule_set_category,
+ get_curated_rule_set as _get_curated_rule_set,
+ list_curated_rule_set_deployments as _list_curated_rule_set_deployments,
+ get_curated_rule_set_deployment as _get_curated_rule_set_deployment,
+ get_curated_rule_set_deployment_by_name as _get_curated_rule_set_deployment_by_name,
+ get_curated_rule_by_name as _get_curated_rule_by_name,
+ update_curated_rule_set_deployment as _update_curated_rule_set_deployment,
)
from secops.chronicle.rule_validation import validate_rule as _validate_rule
from secops.chronicle.search import search_udm as _search_udm
@@ -1732,6 +1743,213 @@ def batch_update_curated_rule_set_deployments(
"""
return _batch_update_curated_rule_set_deployments(self, deployments)
+ def list_curated_rule_sets(
+ self,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ ) -> List[Dict[str, Any]]:
+ """Get a list of all curated rule sets.
+
+ Args:
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+
+ Returns:
+ Dictionary containing the list of curated rule sets
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _list_curated_rule_sets(self, page_size, page_token)
+
+ def list_curated_rule_set_categories(
+ self,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ ) -> List[Dict[str, Any]]:
+ """Get a list of all curated rule set categories.
+
+ Args:
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+
+ Returns:
+ Dictionary containing the list of curated rule set categories
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _list_curated_rule_set_categories(self, page_size, page_token)
+
+ def list_curated_rule_set_deployments(
+ self,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ only_enabled: Optional[bool] = False,
+ only_alerting: Optional[bool] = False,
+ ) -> List[Dict[str, Any]]:
+ """Get a list of all curated rule set deployments.
+
+ Args:
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+ only_enabled: Only return enabled rule set deployments
+ only_alerting: Only return alerting rule set deployments
+
+ Returns:
+ Dictionary containing the list of curated rule set deployments
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _list_curated_rule_set_deployments(
+ self, page_size, page_token, only_enabled, only_alerting
+ )
+
+ def get_curated_rule_set_deployment(
+ self,
+ rule_set_id: str,
+ precision: str = "precise",
+ ) -> Dict[str, Any]:
+ """Get a curated rule set deployment by ID
+
+ Args:
+ rule_set_id: Unique ID of the curated rule set
+ precision: Precision level ("precise" or "broad")
+
+ Returns:
+ Dictionary containing the curated rule set deployment
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _get_curated_rule_set_deployment(self, rule_set_id, precision)
+
+ def get_curated_rule_set_deployment_by_name(
+ self,
+ display_name: str,
+ precision: str = "precise",
+ ) -> Dict[str, Any]:
+ """Get a curated rule set deployment by human-readable display name
+ NOTE: This is a linear scan of all curated rules,
+ so it may be inefficient for large rule sets.
+
+ Args:
+ display_name: Display name of the curated rule set
+ precision: Precision level ("precise" or "broad")
+
+ Returns:
+ Dictionary containing the curated rule set deployment
+
+ Raises:
+ APIError: If the API request fails
+ SecOpsError: If the rule set is not found or precision is invalid
+ """
+ return _get_curated_rule_set_deployment_by_name(
+ self, display_name, precision
+ )
+
+ def list_curated_rules(
+ self,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ ) -> List[Dict[str, Any]]:
+ """Get a list of all curated rules.
+
+ Args:
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+
+ Returns:
+ Dictionary containing the list of curated rules
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _list_curated_rules(self, page_size, page_token)
+
+ def get_curated_rule(self, rule_id: str) -> Dict[str, Any]:
+ """Get a curated rule by ID.
+
+ Args:
+ rule_id: ID of the curated rule
+
+ Returns:
+ Dictionary containing the curated rule
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _get_curated_rule(self, rule_id)
+
+ def get_curated_rule_by_name(self, display_name: str) -> Dict[str, Any]:
+ """Get a curated rule by human-readable display name
+ NOTE: This is a linear scan of all curated rules,
+ so it may be inefficient for large rule sets.
+
+ Args:
+ display_name: Display name of the curated rule
+
+ Returns:
+ Dictionary containing the curated rule
+
+ Raises:
+ APIError: If the API request fails
+ SecOpsError: If the rule is not found
+ """
+ return _get_curated_rule_by_name(self, display_name)
+
+ def update_curated_rule_set_deployment(
+ self, deployment: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Update a curated rule set deployment to enable or disable
+ alerting or change precision.
+
+ Args:
+ deployment: Dict of deployment configuration containing:
+ - category_id: UUID of the category
+ - rule_set_id: UUID of the rule set
+ - precision: Precision level either "broad" or "precise"
+ - enabled: Whether the rule set should be enabled
+ - alerting: Whether alerting should be enabled for the rule set
+
+ Returns:
+ Dictionary containing the updated curated rule set deployment
+
+ Raises:
+ APIError: If the API request fails
+ SecOpsError: If the rule set is not found or precision is invalid
+ """
+ return _update_curated_rule_set_deployment(self, deployment)
+
+ def get_curated_rule_set_category(self, category_id: str) -> Dict[str, Any]:
+ """Get a curated rule set category by ID.
+
+ Args:
+ category_id: ID of the curated rule set category
+
+ Returns:
+ Dictionary containing the curated rule set category
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _get_curated_rule_set_category(self, category_id)
+
+ def get_curated_rule_set(self, rule_set_id: str) -> Dict[str, Any]:
+ """Get a curated rule set by ID.
+
+ Args:
+ rule_set_id: ID of the curated rule set
+
+ Returns:
+ Dictionary containing the curated rule set
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _get_curated_rule_set(self, rule_set_id)
+
def validate_rule(self, rule_text: str):
"""Validates a YARA-L2 rule against the Chronicle API.
diff --git a/src/secops/chronicle/rule_set.py b/src/secops/chronicle/rule_set.py
index a1e3ffc7..ae075786 100644
--- a/src/secops/chronicle/rule_set.py
+++ b/src/secops/chronicle/rule_set.py
@@ -12,10 +12,463 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-"""Rule set functionality for Chronicle."""
+"""Curated rule set functionality for Chronicle."""
-from typing import Dict, Any, List
-from secops.exceptions import APIError
+from typing import Dict, Any, List, Optional
+from secops.exceptions import APIError, SecOpsError
+
+
+def _paginated_request(
+ client,
+ path: str,
+ items_key: str,
+ *,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ extra_params: Optional[Dict[str, Any]] = None,
+) -> List[Dict[str, Any]]:
+ """
+ Helper to get items from endpoints that use pagination.
+
+ Args:
+ client: ChronicleClient instance
+ path: URL path after {base_url}/{instance_id}/
+ items_key: JSON key holding the array of items (e.g., 'curatedRules')
+ page_size: Maximum number of rules to return per page.
+ page_token: Token for the next page of results, if available.
+ extra_params: extra query params to include on every request
+
+ Returns:
+ List of items from the paginated collection.
+
+ Raises:
+ APIError: If the HTTP request fails.
+ """
+ url = f"{client.base_url}/{client.instance_id}/{path}"
+ results = []
+ next_token = page_token
+
+ while True:
+ # Build params each loop to prevent stale keys being
+ # included in the next request
+ params = {"pageSize": 1000 if not page_size else page_size}
+ if next_token:
+ params["pageToken"] = next_token
+ if extra_params:
+ # copy to avoid passed dict being mutated
+ params.update(dict(extra_params))
+
+ response = client.session.get(url, params=params)
+ if response.status_code != 200:
+ raise APIError(f"Failed to list {items_key}: {response.text}")
+
+ data = response.json()
+ results.extend(data.get(items_key, []))
+
+ # If caller provided page_size, return only this page
+ if page_size is not None:
+ break
+
+ # Otherwise, auto-paginate
+ next_token = data.get("nextPageToken")
+ if not next_token:
+ break
+
+ return results
+
+
+def list_curated_rule_sets(
+ client,
+ page_size: Optional[str] = None,
+ page_token: Optional[str] = None,
+) -> List[Dict[str, Any]]:
+ """Get a list of all curated rule sets
+
+ Args:
+ client: ChronicleClient instance
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+
+ Returns:
+ List of curated rule sets
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _paginated_request(
+ client,
+ path="curatedRuleSetCategories/-/curatedRuleSets",
+ items_key="curatedRuleSets",
+ page_size=page_size,
+ page_token=page_token,
+ )
+
+
+def get_curated_rule_set(client, rule_set_id: str) -> Dict[str, Any]:
+ """Get a curated rule set by ID
+
+ Args:
+ client: ChronicleClient instance
+ rule_set_id: Unique ID of the curated rule set
+
+ Returns:
+ Dictionary containing the curated rule set
+
+ Raises:
+ APIError: If the API request fails
+ """
+ base_url = (
+ f"{client.base_url}/{client.instance_id}/"
+ f"curatedRuleSetCategories/-/curatedRuleSets/{rule_set_id}"
+ )
+
+ response = client.session.get(base_url)
+ if response.status_code != 200:
+ raise APIError(f"Failed to get rule set: {response.text}")
+
+ return response.json()
+
+
+def list_curated_rule_set_categories(
+ client,
+ page_size: Optional[str] = None,
+ page_token: Optional[str] = None,
+) -> List[Dict[str, Any]]:
+ """Get a list of all curated rule set categories
+
+ Args:
+ client: ChronicleClient instance
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+
+ Returns:
+ List of curated rule set categories
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _paginated_request(
+ client,
+ path="curatedRuleSetCategories",
+ items_key="curatedRuleSetCategories",
+ page_size=page_size,
+ page_token=page_token,
+ )
+
+
+def get_curated_rule_set_category(client, category_id: str) -> Dict[str, Any]:
+ """Get a curated rule set category by ID
+
+ Args:
+ client: ChronicleClient instance
+ category_id: Unique ID of the curated rule set category
+
+ Returns:
+ Dictionary containing the curated rule set category
+
+ Raises:
+ APIError: If the API request fails
+ """
+ base_url = (
+ f"{client.base_url}/{client.instance_id}/"
+ f"curatedRuleSetCategories/{category_id}"
+ )
+
+ response = client.session.get(base_url)
+ if response.status_code != 200:
+ raise APIError(
+ f"Failed to get curated rule set category: {response.text}"
+ )
+
+ return response.json()
+
+
+def list_curated_rules(
+ client,
+ page_size: Optional[str] = None,
+ page_token: Optional[str] = None,
+) -> List[Dict[str, Any]]:
+ """Get a list of all curated rules
+
+ Args:
+ client: ChronicleClient instance
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+
+ Returns:
+ List of curated rules
+
+ Raises:
+ APIError: If the API request fails
+ """
+ return _paginated_request(
+ client,
+ path="curatedRules",
+ items_key="curatedRules",
+ page_size=page_size,
+ page_token=page_token,
+ )
+
+
+def get_curated_rule(client, rule_id: str) -> Dict[str, Any]:
+ """Get a curated rule by ID
+
+ Args:
+ client: ChronicleClient instance
+ rule_id: Unique ID of the curated rule to retrieve ("ur_"
+ or "ur_).
+ Examples:
+ `ur_ffac5fa0-5b0b-463e-9f92-2443f8f1b6fd`
+ `ur_ttp_GCP_MassSecretDeletion`
+
+ Returns:
+ Dictionary containing the curated rule
+
+ Raises:
+ APIError: If the API request fails
+ """
+ base_url = f"{client.base_url}/{client.instance_id}/curatedRules/{rule_id}"
+
+ response = client.session.get(base_url)
+ if response.status_code != 200:
+ raise APIError(f"Failed to get curated rule: {response.text}")
+
+ return response.json()
+
+
+def get_curated_rule_by_name(client, display_name: str) -> Dict[str, Any]:
+ """Get a curated rule by display name
+ NOTE: This is a linear scan of all curated rules,
+ so it may be inefficient for large rule sets.
+
+ Args:
+ client: ChronicleClient instance
+ display_name: Display name of the curated rule
+
+ Returns:
+ Dictionary containing the curated rule
+
+ Raises:
+ APIError: If the API request fails
+ """
+ rule = None
+ for r in list_curated_rules(client):
+ if r.get("displayName", "").lower() == display_name.lower():
+ rule = r
+ break
+ if not rule:
+ raise SecOpsError(f"Rule with name '{display_name}' not found")
+
+ return rule
+
+
+def list_curated_rule_set_deployments(
+ client,
+ page_size: Optional[str] = None,
+ page_token: Optional[str] = None,
+ only_enabled: Optional[bool] = False,
+ only_alerting: Optional[bool] = False,
+) -> List[Dict[str, Any]]:
+ """Get a list of all curated rule set deployment statuses
+
+ Args:
+ client: ChronicleClient instance
+ page_size: Number of results to return per page
+ page_token: Token for the page to retrieve
+ only_enabled: Only return enabled rule set deployments
+ only_alerting: Only return alerting rule set deployments
+
+ Returns:
+ List of curated rule set deployments
+
+ Raises:
+ APIError: If the API request fails
+ """
+ rule_set_deployments = _paginated_request(
+ client,
+ path="curatedRuleSetCategories/-/curatedRuleSets/"
+ "-/curatedRuleSetDeployments",
+ items_key="curatedRuleSetDeployments",
+ page_size=page_size,
+ page_token=page_token,
+ )
+
+ # Enrich the deployment data with the rule set displayName
+ all_rule_sets = list_curated_rule_sets(client)
+ for deployment in rule_set_deployments:
+ rule_set_id = (
+ deployment.get("name", "")
+ .split("curatedRuleSetDeployment")[0]
+ .rstrip("/")
+ )
+ for rule_set in all_rule_sets:
+ if rule_set.get("name", "") == rule_set_id:
+ deployment["displayName"] = rule_set.get("displayName", "")
+
+ # Apply filters for only enabled and/or alerting rule sets
+ if only_enabled:
+ rule_set_deployments = [
+ deployment
+ for deployment in rule_set_deployments
+ if deployment.get("enabled", False)
+ ]
+ if only_alerting:
+ rule_set_deployments = [
+ deployment
+ for deployment in rule_set_deployments
+ if deployment.get("alerting", False)
+ ]
+
+ return rule_set_deployments
+
+
+def get_curated_rule_set_deployment(
+ client,
+ rule_set_id: str,
+ precision: str = "precise",
+) -> Dict[str, Any]:
+ """Get the deployment status of a curated rule set by ID
+
+ Args:
+ client: ChronicleClient instance
+ rule_set_id: Unique ID of the curated rule set
+ precision: Precision level ("precise" or "broad")
+
+ Returns:
+ Dictionary containing the curated rule set deployment
+
+ Raises:
+ APIError: If the API request fails
+ SecOpsError: If the rule set is not found or precision is invalid
+ """
+ if precision not in ["precise", "broad"]:
+ raise SecOpsError("Precision must be 'precise' or 'broad'")
+
+ # Get the rule set by ID
+ rule_set = get_curated_rule_set(client, rule_set_id)
+
+ url = (
+ f"{client.base_url}/{rule_set.get('name', '')}/"
+ f"curatedRuleSetDeployments/{precision}"
+ )
+
+ response = client.session.get(url)
+ if response.status_code != 200:
+ raise APIError(
+ f"Failed to get curated rule set deployment: {response.text}"
+ )
+
+ # Enrich the deployment data with the rule set displayName
+ deployment = response.json()
+ deployment["displayName"] = rule_set.get("displayName", "")
+
+ return deployment
+
+
+def get_curated_rule_set_deployment_by_name(
+ client,
+ display_name: str,
+ precision: str = "precise",
+) -> Dict[str, Any]:
+ """Get the deployment status of a curated rule set by its display name
+ NOTE: This is a linear scan of all curated rule sets,
+ so it may be inefficient for large rule sets.
+
+ Args:
+ client: ChronicleClient instance
+ display_name: Display name of the curated rule set (case-insensitive)
+ precision: Precision level ("precise" or "broad")
+
+ Returns:
+ Dictionary containing the curated rule set deployment
+
+ Raises:
+ APIError: If the API request fails
+ SecOpsError: If the rule set is not found or precision is invalid
+ """
+ if precision not in ["precise", "broad"]:
+ raise SecOpsError("Precision must be 'precise' or 'broad'")
+
+ rule_set = None
+ for rs in list_curated_rule_sets(client):
+ # Names normalised as lowercase
+ if rs.get("displayName", "").lower() == display_name.lower():
+ rule_set = rs
+ break
+
+ if not rule_set:
+ raise SecOpsError(f"Rule set with name '{display_name}' not found")
+
+ # Extract the rule set ID from the resource name
+ name_parts = rule_set["name"].split("/")
+ rule_set_id = name_parts[-1]
+
+ # Get the deployment status using existing function
+ return get_curated_rule_set_deployment(client, rule_set_id, precision)
+
+
+def update_curated_rule_set_deployment(
+ client, deployment: Dict[str, Any]
+) -> Dict[str, Any]:
+ """Update a curated rule set deployment to enable or disable
+ alerting or change precision.
+
+ Args:
+ client: ChronicleClient instance
+ deployment: Dict of deployment configuration containing:
+ - category_id: UUID of the category
+ - rule_set_id: UUID of the rule set
+ - precision: Precision level either "broad" or "precise"
+ - enabled: Whether the rule set should be enabled
+ - alerting: Whether alerting should be enabled for the rule set
+
+ Returns:
+ Dictionary containing the updated curated rule set deployment
+
+ Raises:
+ APIError: If the API request fails
+ SecOpsError: If the rule set is not found or precision is invalid
+ """
+ # Check required fields
+ required_fields = ["category_id", "rule_set_id", "precision", "enabled"]
+ missing_fields = [
+ field for field in required_fields if field not in deployment
+ ]
+
+ if missing_fields:
+ raise ValueError(
+ f"Deployment missing required fields: {missing_fields}"
+ )
+
+ # Get deployment configuration
+ category_id = deployment["category_id"]
+ rule_set_id = deployment["rule_set_id"]
+ precision = deployment["precision"]
+ enabled = deployment["enabled"]
+ alerting = deployment.get("alerting", False)
+
+ deployment_name = (
+ f"{client.instance_id}/curatedRuleSetCategories/{category_id}"
+ f"/curatedRuleSets/{rule_set_id}"
+ f"/curatedRuleSetDeployments/{precision}"
+ )
+
+ deployment = {
+ "name": deployment_name,
+ "precision": precision,
+ "enabled": enabled,
+ "alerting": alerting,
+ }
+
+ url = f"{client.base_url}/{deployment_name}"
+
+ response = client.session.patch(url, json=deployment)
+ if response.status_code != 200:
+ raise APIError(
+ f"Failed to patch curated rule set deployment: {response.text}"
+ )
+
+ return response.json()
def batch_update_curated_rule_set_deployments(
diff --git a/src/secops/cli.py b/src/secops/cli.py
index eb116987..2c1aa429 100644
--- a/src/secops/cli.py
+++ b/src/secops/cli.py
@@ -4830,6 +4830,318 @@ def handle_forwarder_delete_command(args, chronicle):
sys.exit(1)
+# --- Curated rule set commands ---
+
+
+def setup_curated_rules_command(subparsers):
+ """Set up the curated-rule command group."""
+ top = subparsers.add_parser(
+ "curated-rule", help="Manage curated rules and rule sets"
+ )
+ lvl1 = top.add_subparsers(dest="curated_cmd", required=True)
+
+ # ---- rules ----
+ rules = lvl1.add_parser("rule", help="Manage curated rules")
+ rules_sp = rules.add_subparsers(dest="rule_cmd", required=True)
+
+ rules_list = rules_sp.add_parser("list", help="List curated rules")
+ rules_list.add_argument(
+ "--page-size",
+ type=int,
+ dest="page_size",
+ help="The number of results to return per page.",
+ )
+ rules_list.add_argument(
+ "--page-token",
+ type=str,
+ dest="page_token",
+ help="A page token, received from a previous `list` call.",
+ )
+ rules_list.set_defaults(func=handle_curated_rules_rules_list_command)
+
+ rules_get = rules_sp.add_parser("get", help="Get a curated rule")
+ rg = rules_get.add_mutually_exclusive_group(required=True)
+ rg.add_argument("--id", help="Rule UUID (e.g., ur_abc...)")
+ rg.add_argument("--name", help="Rule display name")
+ rules_get.set_defaults(func=handle_curated_rules_rules_get_command)
+
+ # ---- rule-set ----
+ rule_set = lvl1.add_parser("rule-set", help="Manage curated rule sets")
+ rule_set_subparser = rule_set.add_subparsers(dest="rset_cmd", required=True)
+
+ rule_set_list = rule_set_subparser.add_parser(
+ "list", help="List curated rule sets"
+ )
+ rule_set_list.add_argument(
+ "--page-size",
+ type=int,
+ dest="page_size",
+ help="The number of results to return per page.",
+ )
+ rule_set_list.add_argument(
+ "--page-token",
+ type=str,
+ dest="page_token",
+ help="A page token, received from a previous `list` call.",
+ )
+ rule_set_list.set_defaults(func=handle_curated_rules_rule_set_list_command)
+
+ rule_set_get = rule_set_subparser.add_parser(
+ "get", help="Get a curated rule set"
+ )
+ rule_set_get.add_argument(
+ "--id", required=True, help="Curated rule set UUID)"
+ )
+ rule_set_get.set_defaults(func=handle_curated_rules_rule_set_get_command)
+
+ # ---- rule-set-category ----
+ rule_set_cat = lvl1.add_parser(
+ "rule-set-category", help="Manage curated rule set categories"
+ )
+ rule_set_cat_subparser = rule_set_cat.add_subparsers(
+ dest="rcat_cmd", required=True
+ )
+
+ rule_set_cat_list = rule_set_cat_subparser.add_parser(
+ "list", help="List curated rule set categories"
+ )
+ rule_set_cat_list.add_argument(
+ "--page-size",
+ type=int,
+ dest="page_size",
+ help="The number of results to return per page.",
+ )
+ rule_set_cat_list.add_argument(
+ "--page-token",
+ type=str,
+ dest="page_token",
+ help="A page token, received from a previous `list` call.",
+ )
+ rule_set_cat_list.set_defaults(
+ func=handle_curated_rules_rule_set_category_list_command
+ )
+
+ rule_set_cat_get = rule_set_cat_subparser.add_parser(
+ "get", help="Get a curated rule set category"
+ )
+ rule_set_cat_get.add_argument("--id", required=True, help="Category UUID")
+ rule_set_cat_get.set_defaults(
+ func=handle_curated_rules_rule_set_category_get_command
+ )
+
+ # ---- rule-set-deployment ----
+ rule_set_deployment = lvl1.add_parser(
+ "rule-set-deployment", help="Manage curated rule set deployments"
+ )
+ rule_set_deployment_subparser = rule_set_deployment.add_subparsers(
+ dest="rdep_cmd", required=True
+ )
+
+ rule_set_deployment_list = rule_set_deployment_subparser.add_parser(
+ "list", help="List curated rule set deployments"
+ )
+ rule_set_deployment_list.add_argument(
+ "--only-enabled", dest="only_enabled", action="store_true"
+ )
+ rule_set_deployment_list.add_argument(
+ "--only-alerting", dest="only_alerting", action="store_true"
+ )
+ rule_set_deployment_list.add_argument(
+ "--page-size",
+ type=int,
+ dest="page_size",
+ help="The number of results to return per page.",
+ )
+ rule_set_deployment_list.add_argument(
+ "--page-token",
+ type=str,
+ dest="page_token",
+ help="A page token, received from a previous `list` call.",
+ )
+ rule_set_deployment_list.set_defaults(
+ func=handle_curated_rules_rule_set_deployment_list_command
+ )
+
+ rule_set_deployment_get = rule_set_deployment_subparser.add_parser(
+ "get", help="Get a curated rule set deployment"
+ )
+ get_group = rule_set_deployment_get.add_mutually_exclusive_group(
+ required=True
+ )
+ get_group.add_argument("--id", help="Curated rule set ID (crs_...)")
+ get_group.add_argument(
+ "--name", help="Curated rule set display name (case-insensitive)"
+ )
+ rule_set_deployment_get.add_argument(
+ "--precision", choices=["precise", "broad"], default="precise"
+ )
+ rule_set_deployment_get.set_defaults(
+ func=handle_curated_rules_rule_set_deployment_get_command
+ )
+
+ rule_set_deployment_update = rule_set_deployment_subparser.add_parser(
+ "update", help="Update a curated rule set deployment"
+ )
+ rule_set_deployment_update.add_argument(
+ "--category-id", required=True, dest="category_id"
+ )
+ rule_set_deployment_update.add_argument(
+ "--rule-set-id", required=True, dest="rule_set_id"
+ )
+ rule_set_deployment_update.add_argument(
+ "--precision", choices=["precise", "broad"], required=True
+ )
+ rule_set_deployment_update.add_argument(
+ "--enabled", choices=["true", "false"], required=True
+ )
+ rule_set_deployment_update.add_argument(
+ "--alerting", choices=["true", "false"], help="Enable/disable alerting"
+ )
+ rule_set_deployment_update.set_defaults(
+ func=handle_curated_rules_rule_set_deployment_update_command
+ )
+
+
+# ----------------- handlers -----------------
+
+
+def handle_curated_rules_rules_list_command(args, chronicle):
+ """List curated rules."""
+ try:
+ out = chronicle.list_curated_rules(
+ page_size=getattr(args, "page_size", None),
+ page_token=getattr(args, "page_token", None),
+ )
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error listing curated rules: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+def handle_curated_rules_rules_get_command(args, chronicle):
+ """Get curated rule by ID or display name."""
+ try:
+ if args.id:
+ out = chronicle.get_curated_rule(args.id)
+ else:
+ # by display name
+ out = chronicle.get_curated_rule_by_name(args.name)
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_list_command(args, chronicle):
+ """List all curated rule sets"""
+ try:
+ out = chronicle.list_curated_rule_sets(
+ page_size=getattr(args, "page_size", None),
+ page_token=getattr(args, "page_token", None),
+ )
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error listing curated rule sets: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_get_command(args, chronicle):
+ """Get curated rule set by ID."""
+ try:
+ out = chronicle.get_curated_rule_set(args.id)
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule set: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_category_list_command(args, chronicle):
+ """List all curated rule set categories."""
+ try:
+ out = chronicle.list_curated_rule_set_categories(
+ page_size=getattr(args, "page_size", None),
+ page_token=getattr(args, "page_token", None),
+ )
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(
+ f"Error listing curated rule set categories: {e}", file=sys.stderr
+ )
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_category_get_command(args, chronicle):
+ """Get curated rule set category by ID."""
+ try:
+ out = chronicle.get_curated_rule_set_category(args.id)
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error getting curated rule set category: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_deployment_list_command(args, chronicle):
+ try:
+ out = chronicle.list_curated_rule_set_deployments(
+ only_enabled=bool(args.only_enabled),
+ only_alerting=bool(args.only_alerting),
+ page_size=getattr(args, "page_size", None),
+ page_token=getattr(args, "page_token", None),
+ )
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(
+ f"Error listing curated rule set deployments: {e}", file=sys.stderr
+ )
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_deployment_get_command(args, chronicle):
+ try:
+ if args.name:
+ out = chronicle.get_curated_rule_set_deployment_by_name(
+ args.name, precision=args.precision
+ )
+ else:
+ out = chronicle.get_curated_rule_set_deployment(
+ args.id, precision=args.precision
+ )
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(
+ f"Error getting curated rule set deployment: {e}", file=sys.stderr
+ )
+ sys.exit(1)
+
+
+def handle_curated_rules_rule_set_deployment_update_command(args, chronicle):
+ """Update curated rule set deployment fields."""
+ try:
+
+ def _convert_bool(s):
+ """Convert "true"/"false" to bool."""
+ return None if s is None else str(s).lower() == "true"
+
+ payload = {
+ "category_id": args.category_id,
+ "rule_set_id": args.rule_set_id,
+ "precision": args.precision,
+ "enabled": _convert_bool(args.enabled),
+ }
+ if args.alerting is not None:
+ payload["alerting"] = _convert_bool(args.alerting)
+ out = chronicle.update_curated_rule_set_deployment(payload)
+ output_formatter(out, getattr(args, "output", "json"))
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(
+ f"Error updating curated rule set deployment: {e}", file=sys.stderr
+ )
+ sys.exit(1)
+
+
+# --- ---
+
+
def main() -> None:
"""Main entry point for the CLI."""
parser = argparse.ArgumentParser(description="Google SecOps CLI")
@@ -4862,6 +5174,7 @@ def main() -> None:
setup_reference_list_command(subparsers) # Add reference list command
setup_rule_exclusion_command(subparsers) # Add rule exclusion command
setup_forwarder_command(subparsers) # Add forwarder command
+ setup_curated_rules_command(subparsers) # Add rule set command
setup_config_command(subparsers)
setup_help_command(subparsers)
setup_dashboard_command(subparsers)
@@ -4892,6 +5205,7 @@ def main() -> None:
"export",
"gemini",
"rule-exclusion",
+ "curated-rule",
"forwarder",
"dashboard",
]
diff --git a/tests/chronicle/test_curated_rule_integration.py b/tests/chronicle/test_curated_rule_integration.py
new file mode 100644
index 00000000..cb96c33a
--- /dev/null
+++ b/tests/chronicle/test_curated_rule_integration.py
@@ -0,0 +1,355 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Integration tests for curated rule set functionality in Chronicle API.
+
+These tests require valid credentials and API access.
+"""
+import pytest
+
+from secops import SecOpsClient
+
+from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON
+
+
+@pytest.fixture(scope="module")
+def chronicle():
+ """Fixture to create a Chronicle client for testing."""
+ client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON)
+ return client.chronicle(**CHRONICLE_CONFIG)
+
+
+@pytest.mark.integration
+def test_curated_rule_sets(chronicle):
+ """Test listing and retrieving curated rule sets."""
+ # Test basic listing
+ rule_sets = chronicle.list_curated_rule_sets()
+ assert isinstance(rule_sets, list)
+ assert len(rule_sets) > 0, "Expected at least one curated rule set to exist"
+
+ # Test with pagination parameters
+ page_size = 5
+ rule_sets_paged = chronicle.list_curated_rule_sets(page_size=page_size)
+ assert isinstance(rule_sets_paged, list)
+ assert len(rule_sets_paged) <= page_size
+
+ # Keep first rule set for get test and later use in other tests
+ first_rule_set = rule_sets[0]
+ assert "name" in first_rule_set
+ assert "displayName" in first_rule_set
+
+ # Extract rule set ID from the name field
+ rule_set_id = first_rule_set["name"].split("/")[-1]
+ assert rule_set_id, "Failed to extract rule set ID from name"
+
+ # Test get operation
+ print(f"\nTesting get_curated_rule_set with ID: {rule_set_id}")
+ rule_set = chronicle.get_curated_rule_set(rule_set_id)
+ assert rule_set["name"] == first_rule_set["name"]
+ assert rule_set["displayName"] == first_rule_set["displayName"]
+
+ return first_rule_set
+
+
+@pytest.mark.integration
+def test_curated_rule_set_categories(chronicle):
+ """Test listing and retrieving curated rule set categories."""
+ # Test basic listing
+ categories = chronicle.list_curated_rule_set_categories()
+ assert isinstance(categories, list)
+ assert len(categories) > 0, "Expected at least one category to exist"
+
+ # Test with pagination parameters
+ page_size = 5
+ categories_paged = chronicle.list_curated_rule_set_categories(
+ page_size=page_size
+ )
+ assert isinstance(categories_paged, list)
+ assert len(categories_paged) <= page_size
+
+ # Keep first category for get test and later use in other tests
+ first_category = categories[0]
+ assert "name" in first_category
+ assert "displayName" in first_category
+
+ # Extract category ID from the name field
+ category_id = first_category["name"].split("/")[-1]
+ assert category_id, "Failed to extract category ID from name"
+
+ # Test get operation
+ print(f"\nTesting get_curated_rule_set_category with ID: {category_id}")
+ category = chronicle.get_curated_rule_set_category(category_id)
+ assert category["name"] == first_category["name"]
+ assert category["displayName"] == first_category["displayName"]
+
+ return first_category
+
+
+@pytest.mark.integration
+def test_curated_rules(chronicle):
+ """Test listing and retrieving curated rules."""
+ # Test basic listing
+ rules = chronicle.list_curated_rules()
+ assert isinstance(rules, list)
+ assert len(rules) > 0, "Expected at least one curated rule to exist"
+
+ # Test with pagination parameters
+ page_size = 5
+ rules_paged = chronicle.list_curated_rules(page_size=page_size)
+ assert isinstance(rules_paged, list)
+ assert len(rules_paged) <= page_size
+
+ # Keep first rule for get tests and later use in other tests
+ first_rule = rules[0]
+ assert "name" in first_rule
+ assert "displayName" in first_rule
+
+ # Extract rule ID from the name field
+ rule_id = first_rule["name"].split("/")[-1]
+ assert rule_id, "Failed to extract rule ID from name"
+
+ # Test get operation by ID
+ print(f"\nTesting get_curated_rule with ID: {rule_id}")
+ rule = chronicle.get_curated_rule(rule_id)
+ assert rule["name"] == first_rule["name"]
+ assert rule["displayName"] == first_rule["displayName"]
+
+ # Test get operation by display name
+ display_name = first_rule["displayName"]
+ print(f"\nTesting get_curated_rule_by_name with name: {display_name}")
+ rule_by_name = chronicle.get_curated_rule_by_name(display_name)
+ assert rule_by_name["name"] == first_rule["name"]
+ assert rule_by_name["displayName"].lower() == display_name.lower()
+
+ return first_rule
+
+
+@pytest.mark.integration
+def test_curated_rule_set_deployments(chronicle):
+ """Test listing and retrieving curated rule set deployments."""
+ # Part 1: Test listing deployments
+ print("\nTesting list_curated_rule_set_deployments")
+ deployments = chronicle.list_curated_rule_set_deployments()
+ assert isinstance(deployments, list)
+
+ if not deployments:
+ pytest.skip("No rule set deployments found to test with")
+
+ # Test with filters
+ enabled_deployments = chronicle.list_curated_rule_set_deployments(
+ only_enabled=True
+ )
+ assert isinstance(enabled_deployments, list)
+ for deployment in enabled_deployments:
+ assert deployment.get("enabled") is True
+
+ alerting_deployments = chronicle.list_curated_rule_set_deployments(
+ only_alerting=True
+ )
+ assert isinstance(alerting_deployments, list)
+ for deployment in alerting_deployments:
+ assert deployment.get("alerting") is True
+
+ # Test with pagination parameters
+ page_size = 5
+ deployments_paged = chronicle.list_curated_rule_set_deployments(
+ page_size=page_size
+ )
+ assert isinstance(deployments_paged, list)
+ assert len(deployments_paged) <= page_size
+
+ # Keep first deployment for reference
+ first_deployment = deployments[0]
+ assert "name" in first_deployment
+ assert "displayName" in first_deployment
+
+ # Part 2: Test getting deployment by rule set ID and precision
+ print("\nTesting get_curated_rule_set_deployment")
+ rule_sets = chronicle.list_curated_rule_sets()
+ assert rule_sets, "No rule sets found to test with"
+
+ # Get the first rule set's ID
+ first_rule_set = rule_sets[0]
+ rule_set_id = first_rule_set["name"].split("/")[-1]
+
+ # Try to get deployment for both precision levels
+ deployment_found = False
+ for precision in ["precise", "broad"]:
+ try:
+ deployment = chronicle.get_curated_rule_set_deployment(
+ rule_set_id, precision
+ )
+ print(f"Found {precision} deployment for rule set {rule_set_id}")
+ assert "name" in deployment
+ assert "displayName" in deployment
+ # Ensure the precision in the response matches what we requested
+ assert deployment.get("precision", "").upper() == precision.upper()
+ deployment_found = True
+ break # If we succeed with either precision, continue to next test
+ except Exception as e:
+ # Some rule sets might not have deployments for both precision levels
+ print(f"No {precision} deployment for rule set {rule_set_id}: {e}")
+
+ if not deployment_found:
+ pytest.skip("No deployments found for any rule sets")
+
+ # Part 3: Test getting deployment by display name and precision
+ print("\nTesting get_curated_rule_set_deployment_by_name")
+ display_name = first_rule_set["displayName"]
+
+ # Try to get deployment by display name for both precision levels
+ found_by_name = False
+ for precision in ["precise", "broad"]:
+ try:
+ deployment_by_name = (
+ chronicle.get_curated_rule_set_deployment_by_name(
+ display_name, precision
+ )
+ )
+ print(f"Found {precision} deployment for rule set '{display_name}'")
+ assert "name" in deployment_by_name
+ assert (
+ deployment_by_name.get("displayName").lower()
+ == display_name.lower()
+ )
+ # Ensure the precision in the response matches what we requested
+ assert (
+ deployment_by_name.get("precision", "").upper()
+ == precision.upper()
+ )
+ found_by_name = True
+ break # If we succeed with either precision, that's enough
+ except Exception as e:
+ print(
+ f"No {precision} deployment for rule set '{display_name}': {e}"
+ )
+
+ if not found_by_name:
+ pytest.skip(f"No deployments found for rule set '{display_name}'")
+
+ return first_deployment
+
+
+@pytest.mark.integration
+def test_update_curated_rule_set_deployment(chronicle):
+ """Test updating and restoring a curated rule set deployment."""
+ print("\nTesting update_curated_rule_set_deployment lifecycle")
+
+ # 1. Find valid rule set and category IDs
+ rule_sets = chronicle.list_curated_rule_sets()
+ assert rule_sets, "No rule sets found to test with"
+
+ # Get a rule set ID
+ first_rule_set = rule_sets[0]
+ rule_set_name = first_rule_set["name"]
+ rule_set_id = rule_set_name.split("/")[-1]
+
+ # Extract category ID from rule set name
+ # Format: projects/PROJECT/locations/LOCATION/curatedRuleSetCategories/CATEGORY_ID/curatedRuleSets/RULE_SET_ID
+ name_parts = rule_set_name.split("/")
+ category_index = name_parts.index("curatedRuleSetCategories")
+ category_id = name_parts[category_index + 1]
+
+ print(
+ f"Using rule set: {first_rule_set['displayName']} (ID: {rule_set_id})"
+ )
+ print(f"Category ID: {category_id}")
+
+ # Try both precision levels to find one that works
+ deployment_found = False
+ precision = None
+ current = None
+
+ for prec in ["precise", "broad"]:
+ try:
+ current = chronicle.get_curated_rule_set_deployment(
+ rule_set_id, prec
+ )
+ deployment_found = True
+ precision = prec
+ print(f"Found {prec} deployment for rule set {rule_set_id}")
+ break
+ except Exception as e:
+ print(f"No {prec} deployment available: {e}")
+
+ if not deployment_found:
+ pytest.skip(f"No deployments found for rule set {rule_set_id}")
+
+ # Save original state for restoration
+ original_enabled = current.get("enabled")
+ original_alerting = current.get("alerting")
+
+ if original_enabled is None or original_alerting is None:
+ pytest.skip("Original state not found")
+
+ print(
+ f"Original state - enabled: {original_enabled}, alerting: {original_alerting}"
+ )
+
+ try:
+ # Define the deployment configuration with opposite values
+ deployment_config = {
+ "category_id": category_id,
+ "rule_set_id": rule_set_id,
+ "precision": precision,
+ "enabled": not original_enabled,
+ "alerting": not original_alerting,
+ }
+
+ print(
+ f"Updating to - enabled: {not original_enabled}, alerting: {not original_alerting}"
+ )
+
+ # Update the deployment
+ updated = chronicle.update_curated_rule_set_deployment(
+ deployment_config
+ )
+ print("Update successful")
+
+ # Verify the update
+ assert updated is not None
+
+ # Double-check by getting the deployment again
+ updated_get = chronicle.get_curated_rule_set_deployment(
+ rule_set_id, precision
+ )
+ if "enabled" in updated_get:
+ assert updated_get.get("enabled") == (not original_enabled)
+
+ if "alerting" in updated_get:
+ assert updated_get.get("alerting") == (not original_alerting)
+
+ finally:
+ # Always restore the original state
+ try:
+ print(
+ f"Restoring to original state - enabled: {original_enabled}, alerting: {original_alerting}"
+ )
+ restore_config = {
+ "category_id": category_id,
+ "rule_set_id": rule_set_id,
+ "precision": precision,
+ "enabled": original_enabled,
+ "alerting": original_alerting,
+ }
+
+ chronicle.update_curated_rule_set_deployment(restore_config)
+ print(f"Successfully restored deployment to original state")
+ except Exception as cleanup_error:
+ print(f"Warning: Failed to restore original state: {cleanup_error}")
+
+
+if __name__ == "__main__":
+ # This allows running the tests directly from this file
+ pytest.main(["-v", __file__, "-m", "integration"])
diff --git a/tests/chronicle/test_rule_set.py b/tests/chronicle/test_rule_set.py
new file mode 100644
index 00000000..7de438db
--- /dev/null
+++ b/tests/chronicle/test_rule_set.py
@@ -0,0 +1,600 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Tests for Chronicle curated rule set functions."""
+
+import pytest
+from typing import Optional
+from unittest.mock import Mock, patch
+from secops.chronicle.client import ChronicleClient
+from secops.chronicle.rule_set import (
+ _paginated_request,
+ get_curated_rule,
+ list_curated_rules,
+ get_curated_rule_by_name,
+ get_curated_rule_set,
+ get_curated_rule_set_category,
+ list_curated_rule_sets,
+ list_curated_rule_set_categories,
+ list_curated_rule_set_deployments,
+ get_curated_rule_set_deployment,
+ get_curated_rule_set_deployment_by_name,
+ update_curated_rule_set_deployment,
+ batch_update_curated_rule_set_deployments,
+)
+from secops.exceptions import APIError, SecOpsError
+
+
+@pytest.fixture
+def chronicle_client():
+ """Create a Chronicle client for testing."""
+ with patch("secops.auth.SecOpsAuth") as mock_auth:
+ mock_session = Mock()
+ mock_session.headers = {}
+ mock_auth.return_value.session = mock_session
+ return ChronicleClient(
+ customer_id="test-customer", project_id="test-project"
+ )
+
+
+@pytest.fixture
+def mock_response():
+ """Create a mock API response object."""
+ mock = Mock()
+ mock.status_code = 200
+ # Default return value, can be overridden in specific tests
+ mock.json.return_value = {}
+ return mock
+
+
+@pytest.fixture
+def mock_error_response():
+ """Create a mock error API response object."""
+ mock = Mock()
+ mock.status_code = 400
+ mock.text = "Error message"
+ mock.raise_for_status.side_effect = Exception(
+ "API Error"
+ ) # To simulate requests.exceptions.HTTPError
+ return mock
+
+
+# --- get_curated_rule tests ---
+def test_get_curated_rule_success(chronicle_client, mock_response):
+ """Test get_curated_rule returns the JSON for a curated rule when the request succeeds."""
+ mock_response.json.return_value = {
+ "name": "projects/test-project/locations/us/curatedRules/ur_abc-123",
+ "displayName": "Test ABC 123",
+ }
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_response
+ ) as mocked_request:
+ result = get_curated_rule(chronicle_client, "ur_abc-123")
+ assert result == mock_response.json.return_value
+ # Verify URL
+ expected_url = (
+ f"{chronicle_client.base_url}/{chronicle_client.instance_id}/"
+ f"curatedRules/ur_abc-123"
+ )
+ mocked_request.assert_called_once_with(expected_url)
+
+
+def test_get_curated_rule_error(chronicle_client, mock_error_response):
+ """Test get_curated_rule raises APIError when the API returns non-200."""
+ # Arrange
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_error_response
+ ):
+ # Act and Assert
+ with pytest.raises(APIError):
+ get_curated_rule(chronicle_client, "ur_abc-123")
+
+
+# --- helpers ---
+
+
+def _page(items_key: str, items: list[dict], next_token: Optional[str] = None):
+ """Helper function for paginated 200 OK responses."""
+ data = {items_key: items}
+ if next_token:
+ data["nextPageToken"] = next_token
+ resp = Mock()
+ resp.status_code = 200
+ resp.json.return_value = data
+ return resp
+
+
+# --- _paginated_request tests ---
+
+def test_paginated_request_auto_paginates_success(chronicle_client):
+ p1 = _page("curatedRules", [{"name": ".../ur_1"}], next_token="t2")
+ p2 = _page("curatedRules", [{"name": ".../ur_2"}])
+ with patch.object(chronicle_client.session, "get", side_effect=[p1, p2]) as mocked:
+ result = _paginated_request(
+ chronicle_client,
+ path="curatedRules",
+ items_key="curatedRules",
+ page_size=None,
+ )
+ assert [r["name"] for r in result] == [".../ur_1", ".../ur_2"]
+ base = f"{chronicle_client.base_url}/{chronicle_client.instance_id}/curatedRules"
+ assert mocked.call_args_list[0].args[0] == base
+ assert mocked.call_args_list[0].kwargs["params"] == {"pageSize": 1000}
+ assert mocked.call_args_list[1].kwargs["params"] == {
+ "pageSize": 1000,
+ "pageToken": "t2",
+ }
+
+
+def test_paginated_request_when_page_size_given_success(chronicle_client):
+ p1 = _page("curatedRules", [{"name": ".../ur_1"}], next_token="t2")
+ with patch.object(chronicle_client.session, "get", return_value=p1) as mocked:
+ result = _paginated_request(
+ chronicle_client,
+ path="curatedRules",
+ items_key="curatedRules",
+ page_size=1000,
+ )
+ assert [r["name"] for r in result] == [".../ur_1"]
+ # Only one call, no follow-up with nextPageToken
+ assert mocked.call_count == 1
+ assert mocked.call_args.kwargs["params"] == {"pageSize": 1000}
+
+
+
+def test_paginated_request_error(chronicle_client, mock_error_response):
+ """Test helper function _paginated_request raises APIError on HTTP errors."""
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ _paginated_request(
+ chronicle_client,
+ path="curatedRules",
+ items_key="curatedRules",
+ )
+
+
+# --- list_curated_rule_sets & list_curated_rule_set_categories function tests ---
+def test_list_curated_rules_success(chronicle_client, mock_response):
+ """Test list_curated_rules"""
+ mock_response.json.return_value = {"curatedRules": [{"name": "n1"}]}
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_response
+ ):
+ rules = list_curated_rules(chronicle_client, page_size=50)
+ assert rules == [{"name": "n1"}]
+
+
+def test_list_curated_rules_error(chronicle_client, mock_error_response):
+ """Test list_curated_rules failure."""
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ list_curated_rules(chronicle_client)
+
+
+def test_list_curated_rule_sets_and_categories_success(
+ chronicle_client, mock_response
+):
+ """Test the two list_ functions."""
+ mock_response.json.side_effect = [
+ {"curatedRuleSets": [{"name": "rs1"}]},
+ {"curatedRuleSetCategories": [{"name": "cat1"}]},
+ ]
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_response
+ ) as mocked_response:
+ rule_sets = list_curated_rule_sets(chronicle_client)
+ categories = list_curated_rule_set_categories(chronicle_client)
+ assert rule_sets == [{"name": "rs1"}]
+ assert categories == [{"name": "cat1"}]
+ # ensure two calls happened
+ assert mocked_response.call_count == 2
+
+
+def test_list_curated_rule_sets_and_categories_error(
+ chronicle_client, mock_error_response
+):
+ """Test the two list_ functions error correctly."""
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ list_curated_rule_sets(chronicle_client)
+ list_curated_rule_set_categories(chronicle_client)
+
+
+# --- get_curated_rule_by_name tests---
+
+
+def test_get_curated_rule_by_name_success(chronicle_client):
+ """Test get_curated_rule_by_name returns the rule matching displayName (case-insensitive)."""
+ p = _page(
+ "curatedRules",
+ [
+ {"displayName": "Alpha", "name": ".../ur_A"},
+ {"displayName": "Bravo", "name": ".../ur_B"},
+ ],
+ )
+ with patch.object(chronicle_client.session, "get", return_value=p):
+ out = get_curated_rule_by_name(chronicle_client, "bravo")
+ assert out["name"].endswith("ur_B")
+
+
+def test_get_curated_rule_by_name_error(chronicle_client):
+ """Test get_curated_rule_by_name raises SecOpsError when not found."""
+ p = _page("curatedRules", [{"displayName": "Alpha"}])
+ with patch.object(chronicle_client.session, "get", return_value=p):
+ with pytest.raises(SecOpsError):
+ get_curated_rule_by_name(chronicle_client, "charlie")
+
+
+# --- get_curated_rule_set tests ---
+
+
+def test_get_curated_rule_set_success(chronicle_client, mock_response):
+ """Test get_curated_rule_set returns the rule set matching name."""
+ mock_response.json.return_value = {"name": ".../curatedRuleSets/crs_1"}
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_response
+ ) as get_:
+ out = get_curated_rule_set(chronicle_client, "crs_1")
+ assert out["name"].endswith("/crs_1")
+ expected = (
+ f"{chronicle_client.base_url}/{chronicle_client.instance_id}/"
+ "curatedRuleSetCategories/-/curatedRuleSets/crs_1"
+ )
+ get_.assert_called_once()
+ assert get_.call_args.args[0] == expected
+
+
+def test_get_curated_rule_set_error(chronicle_client, mock_error_response):
+ """Test get_curated_rule_set raises APIError on HTTP errors."""
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ get_curated_rule_set(chronicle_client, "crs_1")
+
+
+# --- get_curated_rule_set_category tests ---
+def test_get_curated_rule_set_category_success(chronicle_client, mock_response):
+ mock_response.json.return_value = {
+ "name": ".../curatedRuleSetCategories/cat_1"
+ }
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_response
+ ) as get_:
+ out = get_curated_rule_set_category(chronicle_client, "cat_1")
+ assert out["name"].endswith("/cat_1")
+ expected = (
+ f"{chronicle_client.base_url}/{chronicle_client.instance_id}/"
+ "curatedRuleSetCategories/cat_1"
+ )
+ assert get_.call_args.args[0] == expected
+
+
+def test_get_curated_rule_set_category_error(
+ chronicle_client, mock_error_response
+):
+ with patch.object(
+ chronicle_client.session, "get", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ get_curated_rule_set_category(chronicle_client, "cat_1")
+
+
+# --- list_curated_rule_set_deployments ---
+
+
+def test_list_deployments_success(chronicle_client):
+ """Test list_curated_rule_set_deployments enriches deployments with displayName and respects filters."""
+ deployments_page = _page(
+ "curatedRuleSetDeployments",
+ [
+ {
+ "name": f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_1/curatedRuleSetDeployments/precise",
+ "enabled": True,
+ "alerting": False,
+ },
+ {
+ "name": f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_2/curatedRuleSetDeployments/broad",
+ "enabled": False,
+ "alerting": True,
+ },
+ ],
+ )
+ rulesets_page = _page(
+ "curatedRuleSets",
+ [
+ {
+ "name": f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_1",
+ "displayName": "One",
+ },
+ {
+ "name": f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_2",
+ "displayName": "Two",
+ },
+ ],
+ )
+
+ # First: list deployments & list rulesets for enrichment
+ with patch.object(
+ chronicle_client.session,
+ "get",
+ side_effect=[deployments_page, rulesets_page],
+ ):
+ out = list_curated_rule_set_deployments(chronicle_client)
+ names = {d["displayName"] for d in out}
+ assert names == {"One", "Two"}
+
+ # Now verify only_enabled and only_alerting filters
+ with patch.object(
+ chronicle_client.session,
+ "get",
+ side_effect=[deployments_page, rulesets_page],
+ ):
+ out_enabled = list_curated_rule_set_deployments(
+ chronicle_client, only_enabled=True
+ )
+ assert len(out_enabled) == 1 and out_enabled[0]["displayName"] == "One"
+
+ with patch.object(
+ chronicle_client.session,
+ "get",
+ side_effect=[deployments_page, rulesets_page],
+ ):
+ out_alerting = list_curated_rule_set_deployments(
+ chronicle_client, only_alerting=True
+ )
+ assert (
+ len(out_alerting) == 1 and out_alerting[0]["displayName"] == "Two"
+ )
+
+
+# --- get_curated_rule_set_deployment ---
+
+
+def test_get_ruleset_deployment_success(chronicle_client, mock_response):
+ """Test get_curated_rule_set_deployment returns the deployment matching name."""
+ ruleset = Mock()
+ ruleset.status_code = 200
+ ruleset.json.return_value = {
+ "name": f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_1",
+ "displayName": "My Ruleset",
+ }
+
+ deployment = Mock()
+ deployment.status_code = 200
+ deployment.json.return_value = {"enabled": True, "alerting": False}
+
+ with patch.object(
+ chronicle_client.session, "get", side_effect=[ruleset, deployment]
+ ) as mocked_request:
+ out = get_curated_rule_set_deployment(chronicle_client, "crs_1", "precise")
+ assert out["displayName"] == "My Ruleset"
+
+ dep_url = (
+ f"{chronicle_client.base_url}/"
+ f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_1/"
+ "curatedRuleSetDeployments/precise"
+ )
+ assert mocked_request.call_args_list[1].args[0] == dep_url
+
+
+def test_get_ruleset_deployment_error_invalid_precision(chronicle_client):
+ """Test get_curated_rule_set_deployment failure."""
+ with pytest.raises(SecOpsError):
+ get_curated_rule_set_deployment(chronicle_client, "crs_1", "medium")
+
+
+def test_get_ruleset_deployment_ruleset_error_not_found(chronicle_client):
+ """Test get_curated_rule_set_deployment failure when ruleset ID doesn't exist."""
+ not_found = Mock()
+ not_found.status_code = 404
+ not_found.text = "Not found"
+
+ with patch.object(chronicle_client.session, "get", return_value=not_found):
+ with pytest.raises(APIError):
+ get_curated_rule_set_deployment(chronicle_client, "crs_404", "precise")
+
+
+
+# --- get_curated_rule_set_deployment_by_name ---
+
+
+def test_get_ruleset_deployment_by_name_success(chronicle_client):
+ """Test get_curated_rule_set_deployment_by_name success."""
+ rulesets_data = [
+ {
+ "name": f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/curatedRuleSets/crs_1",
+ "displayName": "Case Insensitive Name",
+ }
+ ]
+ deployment = Mock()
+ deployment.status_code = 200
+ deployment.json.return_value = {"enabled": True}
+
+ with patch(
+ "secops.chronicle.rule_set.list_curated_rule_sets",
+ return_value=rulesets_data,
+ ):
+ with patch.object(
+ chronicle_client.session, "get", return_value=deployment
+ ):
+ out = get_curated_rule_set_deployment_by_name(
+ chronicle_client, "case insensitive name", "broad"
+ )
+ assert out["enabled"] is True
+
+
+def test_get_ruleset_deployment_by_name_error(chronicle_client):
+ """Test get_curated_rule_set_deployment_by_name failure."""
+ rulesets = _page("curatedRuleSets", [{"displayName": "Other"}])
+ with patch.object(chronicle_client.session, "get", return_value=rulesets):
+ with pytest.raises(SecOpsError):
+ get_curated_rule_set_deployment_by_name(chronicle_client, "missing")
+
+
+# --- update_curated_rule_set_deployment ---
+
+
+def test_update_ruleset_deployment_success(
+ chronicle_client,
+):
+ """Test update_curated_rule_set_deployment builds the correct PATCH payload and URL."""
+ patch_resp = Mock()
+ patch_resp.status_code = 200
+ patch_resp.json.return_value = {"ok": True}
+ with patch.object(
+ chronicle_client.session, "patch", return_value=patch_resp
+ ) as mocked_request:
+ out = update_curated_rule_set_deployment(
+ chronicle_client,
+ {
+ "category_id": "c1",
+ "rule_set_id": "crs_1",
+ "precision": "precise",
+ "enabled": True,
+ "alerting": False,
+ },
+ )
+ assert out == {"ok": True}
+ name = (
+ f"{chronicle_client.instance_id}/curatedRuleSetCategories/c1/"
+ "curatedRuleSets/crs_1/curatedRuleSetDeployments/precise"
+ )
+ expected_url = f"{chronicle_client.base_url}/{name}"
+ mocked_request.assert_called_once()
+ assert mocked_request.call_args.args[0] == expected_url
+ assert mocked_request.call_args.kwargs["json"] == {
+ "name": name,
+ "precision": "precise",
+ "enabled": True,
+ "alerting": False,
+ }
+
+
+def test_update_ruleset_deployment_error_missing_fields(chronicle_client):
+ """Test update_curated_rule_set_deployment failure."""
+ with pytest.raises(ValueError):
+ update_curated_rule_set_deployment(
+ chronicle_client,
+ {
+ "category_id": "c1",
+ # 'rule_set_id' missing
+ "precision": "precise",
+ "enabled": True,
+ },
+ )
+
+
+def test_update_ruleset_deployment_error_http(
+ chronicle_client, mock_error_response
+):
+ """Test update_curated_rule_set_deployment failure."""
+ with patch.object(
+ chronicle_client.session, "patch", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ update_curated_rule_set_deployment(
+ chronicle_client,
+ {
+ "category_id": "c1",
+ "rule_set_id": "crs_1",
+ "precision": "precise",
+ "enabled": True,
+ },
+ )
+
+
+# --- batch_update_curated_rule_set_deployments ---
+
+
+def test_batch_update_curated_rule_set_success(chronicle_client):
+ """Test batch_update_curated_rule_set_deployments success."""
+ post_resp = Mock()
+ post_resp.status_code = 200
+ post_resp.json.return_value = {"status": "ok"}
+ with patch.object(
+ chronicle_client.session, "post", return_value=post_resp
+ ) as post_:
+ out = batch_update_curated_rule_set_deployments(
+ chronicle_client,
+ [
+ {
+ "category_id": "c1",
+ "rule_set_id": "r1",
+ "precision": "precise",
+ "enabled": True,
+ "alerting": True,
+ },
+ {
+ "category_id": "c2",
+ "rule_set_id": "r2",
+ "precision": "broad",
+ "enabled": False,
+ },
+ ],
+ )
+ assert out == {"status": "ok"}
+
+ # Inspect payload
+ payload = post_.call_args.kwargs["json"]
+ assert payload["parent"].startswith(
+ chronicle_client.instance_id + "/curatedRuleSetCategories/-"
+ )
+ reqs = payload["requests"]
+ assert len(reqs) == 2
+ assert reqs[0]["curated_rule_set_deployment"]["enabled"] is True
+ assert reqs[0]["curated_rule_set_deployment"]["alerting"] is True
+ assert reqs[0]["update_mask"]["paths"] == ["alerting", "enabled"]
+
+
+def test_batch_update_curated_rule_set_error_missing_fields(chronicle_client):
+ """Test batch_update_curated_rule_set_deployments failure."""
+ with pytest.raises(ValueError):
+ batch_update_curated_rule_set_deployments(
+ chronicle_client,
+ [
+ {
+ "category_id": "c1",
+ "precision": "precise",
+ "enabled": True,
+ }, # rule_set_id missing
+ ],
+ )
+
+
+def test_batch_update_curated_rule_set_error_http(
+ chronicle_client, mock_error_response
+):
+ """Test batch_update_curated_rule_set_deployments failure."""
+ with patch.object(
+ chronicle_client.session, "post", return_value=mock_error_response
+ ):
+ with pytest.raises(APIError):
+ batch_update_curated_rule_set_deployments(
+ chronicle_client,
+ [
+ {
+ "category_id": "c1",
+ "rule_set_id": "r1",
+ "precision": "precise",
+ "enabled": True,
+ },
+ ],
+ )
diff --git a/tests/cli/test_curated_rule_cli_integration.py b/tests/cli/test_curated_rule_cli_integration.py
new file mode 100644
index 00000000..0b0e486f
--- /dev/null
+++ b/tests/cli/test_curated_rule_cli_integration.py
@@ -0,0 +1,542 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""CLI Integration tests for curated rule set functionality in Chronicle.
+
+These tests require valid credentials and API access.
+"""
+import json
+import pytest
+import subprocess
+
+
+@pytest.mark.integration
+def test_cli_curated_rule_sets(cli_env, common_args):
+ """Test CLI commands for listing and getting curated rule sets.
+
+ Args:
+ cli_env: Environment variables for CLI execution.
+ common_args: Common CLI arguments.
+ """
+ print("\nTesting rule-set list and get commands")
+
+ # Test list command
+ print("1. Listing curated rule sets")
+ list_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + ["curated-rule", "rule-set", "list"]
+ )
+
+ list_result = subprocess.run(
+ list_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert list_result.returncode == 0, f"Command failed: {list_result.stderr}"
+
+ # Parse the output
+ rule_sets = json.loads(list_result.stdout)
+ assert isinstance(rule_sets, list), "Expected a list of rule sets"
+ assert len(rule_sets) > 0, "Expected at least one rule set"
+
+ # Check structure of first rule set
+ first_rule_set = rule_sets[0]
+ assert "name" in first_rule_set, "Missing name in rule set"
+ assert "displayName" in first_rule_set, "Missing displayName in rule set"
+
+ # Extract rule set ID from first result
+ rule_set_id = first_rule_set["name"].split("/")[-1]
+ print(
+ f"Found rule set: {first_rule_set['displayName']} (ID: {rule_set_id})"
+ )
+
+ # Test get command with the extracted ID
+ print("\n2. Getting specific rule set by ID")
+ get_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule-set",
+ "get",
+ "--id", # parameter is --id, not --rule-set-id
+ rule_set_id,
+ ]
+ )
+
+ get_result = subprocess.run(
+ get_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert get_result.returncode == 0, f"Command failed: {get_result.stderr}"
+
+ # Parse and verify the output
+ rule_set_data = json.loads(get_result.stdout)
+ assert (
+ rule_set_data["name"] == first_rule_set["name"]
+ ), "Rule set name doesn't match"
+ assert (
+ rule_set_data["displayName"] == first_rule_set["displayName"]
+ ), "Rule set display name doesn't match"
+
+ return rule_set_id, first_rule_set["displayName"]
+
+
+@pytest.mark.integration
+def test_cli_curated_rule_set_categories(cli_env, common_args):
+ """Test CLI commands for listing and getting curated rule set categories.
+
+ Args:
+ cli_env: Environment variables for CLI execution.
+ common_args: Common CLI arguments.
+ """
+ print("\nTesting rule-set categories commands")
+
+ # Test list categories command
+ print("1. Listing curated rule set categories")
+ list_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + ["curated-rule", "rule-set-category", "list"]
+ )
+
+ list_result = subprocess.run(
+ list_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert list_result.returncode == 0, f"Command failed: {list_result.stderr}"
+
+ # Parse the output
+ categories = json.loads(list_result.stdout)
+ assert isinstance(categories, list), "Expected a list of categories"
+ assert len(categories) > 0, "Expected at least one category"
+
+ # Check structure of first category
+ first_category = categories[0]
+ assert "name" in first_category, "Missing name in category"
+ assert "displayName" in first_category, "Missing displayName in category"
+
+ # Extract category ID from first result
+ category_id = first_category["name"].split("/")[-1]
+ print(
+ f"Found category: {first_category['displayName']} (ID: {category_id})"
+ )
+
+ # Test get category command
+ print("\n2. Getting specific category by ID")
+ get_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule-set-category",
+ "get",
+ "--id", # parameter is --id, not --category-id
+ category_id,
+ ]
+ )
+
+ get_result = subprocess.run(
+ get_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert get_result.returncode == 0, f"Command failed: {get_result.stderr}"
+
+ # Parse and verify the output
+ category_data = json.loads(get_result.stdout)
+ assert (
+ category_data["name"] == first_category["name"]
+ ), "Category name doesn't match"
+ assert (
+ category_data["displayName"] == first_category["displayName"]
+ ), "Category display name doesn't match"
+
+ return category_id, first_category["displayName"]
+
+
+@pytest.mark.integration
+def test_cli_curated_rules(cli_env, common_args):
+ """Test CLI commands for listing and getting curated rules.
+
+ Args:
+ cli_env: Environment variables for CLI execution.
+ common_args: Common CLI arguments.
+ """
+ print("\nTesting curated rules commands")
+
+ # List curated rules
+ print("1. Listing curated rules")
+ list_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + ["curated-rule", "rule", "list"]
+ )
+
+ list_result = subprocess.run(
+ list_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert list_result.returncode == 0, f"Command failed: {list_result.stderr}"
+
+ # Parse the output
+ rules = json.loads(list_result.stdout)
+ assert isinstance(rules, list), "Expected a list of rules"
+ assert len(rules) > 0, "Expected at least one rule"
+
+ # Check structure of first rule
+ first_rule = rules[0]
+ assert "name" in first_rule, "Missing name in rule"
+ assert "displayName" in first_rule, "Missing displayName in rule"
+
+ # Extract rule ID and display name
+ rule_id = first_rule["name"].split("/")[-1]
+ display_name = first_rule["displayName"]
+ print(f"Found rule: {display_name} (ID: {rule_id})")
+
+ # Get rule by ID
+ print("\n2. Getting specific rule by ID")
+ get_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule",
+ "get",
+ "--id", # parameter is --id, not --rule-id
+ rule_id,
+ ]
+ )
+
+ get_result = subprocess.run(
+ get_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert get_result.returncode == 0, f"Command failed: {get_result.stderr}"
+
+ # Parse and verify the output
+ rule_data = json.loads(get_result.stdout)
+ assert rule_data["name"] == first_rule["name"], "Rule name doesn't match"
+ assert (
+ rule_data["displayName"] == first_rule["displayName"]
+ ), "Rule display name doesn't match"
+
+ # Get rule by display name
+ print(f"\n3. Getting rule by display name: {display_name}")
+ # Need to quote the display name to handle spaces and special characters
+ name_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule",
+ "get",
+ "--name", # Use --name instead of ID
+ f"{display_name}",
+ ]
+ )
+
+ name_result = subprocess.run(
+ name_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert name_result.returncode == 0, f"Command failed: {name_result.stderr}"
+
+ # Parse and verify the output
+ rule_by_name_data = json.loads(name_result.stdout)
+ assert (
+ rule_by_name_data["name"] == first_rule["name"]
+ ), "Rule name doesn't match"
+ assert (
+ rule_by_name_data["displayName"].lower() == display_name.lower()
+ ), "Rule display name doesn't match"
+
+ return rule_id, display_name
+
+
+@pytest.mark.integration
+def test_cli_curated_rule_set_deployments(cli_env, common_args):
+ """Test CLI commands for listing, getting, and updating curated rule set deployments.
+
+ Args:
+ cli_env: Environment variables for CLI execution.
+ common_args: Common CLI arguments.
+ """
+ print("\nTesting rule-set deployment commands")
+
+ # Part 1: List deployments
+ print("1. Listing curated rule set deployments")
+ list_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + ["curated-rule", "rule-set-deployment", "list"]
+ )
+
+ list_result = subprocess.run(
+ list_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert list_result.returncode == 0, f"Command failed: {list_result.stderr}"
+
+ # Parse the output
+ deployments = json.loads(list_result.stdout)
+ assert isinstance(deployments, list), "Expected a list of deployments"
+
+ # Part 2: Get rule set for testing
+ print("\n2. First list rule sets to get a valid ID")
+ list_rs_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + ["curated-rule", "rule-set", "list"]
+ )
+
+ list_rs_result = subprocess.run(
+ list_rs_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert (
+ list_rs_result.returncode == 0
+ ), f"Command failed: {list_rs_result.stderr}"
+
+ # Parse the output
+ rule_sets = json.loads(list_rs_result.stdout)
+ assert len(rule_sets) > 0, "No rule sets found for testing"
+
+ # Get first rule set metadata
+ first_rule_set = rule_sets[0]
+ rule_set_name = first_rule_set["name"]
+ rule_set_id = rule_set_name.split("/")[-1]
+ display_name = first_rule_set["displayName"]
+ print(f"Using rule set: {display_name} (ID: {rule_set_id})")
+
+ # Extract category ID from the rule set name
+ try:
+ # Format: projects/PROJECT/locations/LOCATION/curatedRuleSetCategories/CATEGORY_ID/curatedRuleSets/RULE_SET_ID
+ name_parts = rule_set_name.split("/")
+ category_index = name_parts.index("curatedRuleSetCategories")
+ category_id = name_parts[category_index + 1]
+ print(f"Category ID: {category_id}")
+ except (ValueError, IndexError) as e:
+ pytest.skip(f"Cannot extract category ID from rule set name: {e}")
+
+ # Part 3: Try to get a deployment
+ print("\n3. Finding a valid deployment")
+ deployment_found = False
+ working_precision = None
+
+ for precision in ["precise", "broad"]:
+ print(f"Trying {precision} precision...")
+ get_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule-set-deployment",
+ "get",
+ "--id", # parameter is --id, not --rule-set-id
+ rule_set_id,
+ "--precision",
+ precision,
+ ]
+ )
+
+ get_result = subprocess.run(
+ get_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ if get_result.returncode == 0:
+ deployment_data = json.loads(get_result.stdout)
+ assert "name" in deployment_data, "Missing name in deployment"
+ assert (
+ deployment_data.get("precision", "").upper()
+ == precision.upper()
+ )
+
+ # We found a working deployment
+ deployment_found = True
+ working_precision = precision
+ print(f"Found {precision} deployment for rule set {rule_set_id}")
+
+ # Get original state for restoration
+ original_enabled = deployment_data.get("enabled")
+ original_alerting = deployment_data.get("alerting")
+
+ if original_enabled is None or original_alerting is None:
+ print(
+ "Warning: Couldn't determine original state, skipping update test"
+ )
+ pytest.skip("Original state not available")
+
+ print(
+ f"Original state - enabled: {original_enabled}, alerting: {original_alerting}"
+ )
+ break
+ else:
+ print(f"No {precision} deployment available: {get_result.stderr}")
+
+ if not deployment_found:
+ pytest.skip(f"No deployments found for rule set {rule_set_id}")
+
+ # Part 4: Test update
+ print("\n4. Testing update-deployment command")
+ print(
+ f"Updating to - enabled: {not original_enabled}, alerting: {not original_alerting}"
+ )
+
+ update_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule-set-deployment",
+ "update",
+ "--category-id",
+ category_id,
+ "--rule-set-id",
+ rule_set_id,
+ "--precision",
+ working_precision,
+ "--enabled",
+ str(not original_enabled).lower(),
+ "--alerting",
+ str(not original_alerting).lower(),
+ ]
+ )
+
+ try:
+ update_result = subprocess.run(
+ update_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert (
+ update_result.returncode == 0
+ ), f"Update failed: {update_result.stderr}"
+ print("Update command successful")
+
+ # Verify the update worked by getting the deployment again
+ print("\n5. Verifying the update")
+ verify_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule-set-deployment",
+ "get",
+ "--id",
+ rule_set_id,
+ "--precision",
+ working_precision,
+ ]
+ )
+
+ verify_result = subprocess.run(
+ verify_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert (
+ verify_result.returncode == 0
+ ), f"Verification failed: {verify_result.stderr}"
+
+ # Parse and verify
+ verify_data = json.loads(verify_result.stdout)
+
+ # Check only if fields exist, they might not always be updated
+ if "enabled" in verify_data:
+ assert verify_data.get("enabled") == (
+ not original_enabled
+ ), "Enabled state not updated"
+
+ if "alerting" in verify_data:
+ assert verify_data.get("alerting") == (
+ not original_alerting
+ ), "Alerting state not updated"
+
+ print("Update verified successfully")
+
+ finally:
+ # Part 6: Restore the original state
+ print("\n6. Restoring original state")
+ restore_cmd = (
+ [
+ "secops",
+ ]
+ + common_args
+ + [
+ "curated-rule",
+ "rule-set-deployment",
+ "update",
+ "--category-id",
+ category_id,
+ "--rule-set-id",
+ rule_set_id,
+ "--precision",
+ working_precision,
+ "--enabled",
+ str(original_enabled).lower(),
+ "--alerting",
+ str(original_alerting).lower(),
+ ]
+ )
+
+ try:
+ restore_result = subprocess.run(
+ restore_cmd, env=cli_env, capture_output=True, text=True
+ )
+
+ # Check that the command executed successfully
+ assert (
+ restore_result.returncode == 0
+ ), f"Restore failed: {restore_result.stderr}"
+ print(f"Successfully restored deployment to original state")
+ except Exception as cleanup_error:
+ print(f"Warning: Failed to restore original state: {cleanup_error}")
+
+
+if __name__ == "__main__":
+ # This allows running the tests directly from this file
+ pytest.main(["-v", __file__, "-m", "integration"])