diff --git a/CHANGELOG.md b/CHANGELOG.md index f9036db3..a5ab2f12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.27.0] - 2025-12-05 +### Added +- Chronicle configuration for default API endpoint version to use for all methods +### Updated +- Following module's methods to add support for configuring API endpoint version: + - Feed management (list, get, create, delete, disable, enable, generate secret) + - Reference list management(create, get, list, update) + - Rule management (create, get, list, update, delete, search) + - Rule deployment (get, update) + - Rule retrohunt (create, get) + ## [0.26.0] - 2025-11-26 ### Added - Search curated rule detection method support diff --git a/CLI.md b/CLI.md index d3fb5518..63a9efca 100644 --- a/CLI.md +++ b/CLI.md @@ -39,6 +39,17 @@ You can also save your service account path: secops config set --service-account "/path/to/service-account.json" --customer-id "your-instance-id" --project-id "your-project-id" --region "us" ``` +Set the default API version for Chronicle API calls: + +```bash +secops config set --api-version "v1" +``` + +**Supported API versions:** +- `v1` - Stable production API (recommended) +- `v1beta` - Beta API with newer features +- `v1alpha` - Alpha API with experimental features (default) + Additionally, you can set default time parameters: ```bash @@ -92,11 +103,19 @@ These parameters can be used with most commands: - `--customer-id ID` - Chronicle instance ID - `--project-id ID` - GCP project ID - `--region REGION` - Chronicle API region (default: us) +- `--api-version VERSION` - Chronicle API version (v1, v1beta, v1alpha; default: v1alpha) - `--output FORMAT` - Output format (json, text) - `--start-time TIME` - Start time in ISO format (YYYY-MM-DDTHH:MM:SSZ) - `--end-time TIME` - End time in ISO format (YYYY-MM-DDTHH:MM:SSZ) - `--time-window HOURS` - Time window in hours (alternative to start/end time) +You can override the configured API version on a per-command basis: + +```bash +# Use v1 for a specific command, even if config has v1alpha +secops rule list --api-version v1 +``` + ## Commands ### Search UDM Events diff --git a/README.md b/README.md index 4830fd38..493dce8e 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,37 @@ chronicle = client.chronicle( ``` [See available regions](https://github.com/google/secops-wrapper/blob/main/regions.md) +#### API Version Control + +The SDK supports flexible API version selection: + +- **Default Version**: Set `default_api_version` during client initialization (default is `v1alpha`) +- **Per-Method Override**: Many methods accept an `api_version` parameter to override the default for specific calls + +**Supported API versions:** +- `v1` - Stable production API +- `v1beta` - Beta API with newer features +- `v1alpha` - Alpha API with experimental features + +**Example with per-method version override:** +```python +from secops.chronicle.models import APIVersion + +# Client defaults to v1alpha +chronicle = client.chronicle( + customer_id="your-chronicle-instance-id", + project_id="your-project-id", + region="us", + default_api_version="v1alpha" +) + +# Use v1 for a specific rule operation +rule = chronicle.get_rule( + rule_id="ru_12345678-1234-1234-1234-123456789abc", + api_version=APIVersion.V1 # Override to use v1 for this call +) +``` + ### Log Ingestion Ingest raw logs directly into Chronicle: diff --git a/pyproject.toml b/pyproject.toml index 54c343ad..a8d25448 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.26.0" +version = "0.27.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.7" diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index a6910e21..6d195c70 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -25,6 +25,7 @@ from secops.auth import RetryConfig from secops.chronicle.alert import get_alerts as _get_alerts from secops.chronicle.case import get_cases_from_list +from secops.chronicle.models import APIVersion from secops.chronicle.dashboard import DashboardAccessType, DashboardView from secops.chronicle.dashboard import add_chart as _add_chart from secops.chronicle.dashboard import create_dashboard as _create_dashboard @@ -274,6 +275,67 @@ class ValueType(Enum): USERNAME = "USERNAME" +class BaseUrl(str): + """Chronicle base url generation based on version and region. + + Supports production, dev, and staging regions with appropriate + domain names. + """ + + def __new__(cls, version: APIVersion, region: str = "us"): + domain = cls._get_domain(region) + return super().__new__(cls, f"https://{domain}/{version}") + + def __init__(self, version: APIVersion, region: str = "us"): + self._default = version + self._region = region + + @staticmethod + def _get_domain(region: str) -> str: + """Get the appropriate domain for the given region. + + Args: + region: Region identifier (e.g., 'us', 'europe', 'dev', + 'staging') + + Returns: + str: Domain name for the region + """ + if region == "dev": + return "autopush-chronicle.sandbox.googleapis.com" + elif region == "staging": + return "staging-chronicle.sandbox.googleapis.com" + else: + return f"{region}-chronicle.googleapis.com" + + def __call__( + self, version: APIVersion = None, allowed: list[APIVersion] = None + ) -> str: + """ + Returns the base URL for a specific API version. + + Args: + version: (Optional) The API version to use. If not provided, + uses the default. + allowed: (Optional) A list of allowed API versions for the + endpoint. + + Returns: + The base URL string. + + Raises: + SecOpsError: If the requested API version is not supported. + """ + selected_version = APIVersion(version or self._default) + if allowed and selected_version not in allowed: + raise SecOpsError( + f"API version '{selected_version}' is not supported for this " + f"endpoint. Allowed versions: {', '.join(allowed)}" + ) + domain = self._get_domain(self._region) + return f"https://{domain}/{selected_version}" + + def _detect_value_type(value: str) -> tuple[Optional[str], Optional[str]]: """Detect value type from a string. @@ -337,6 +399,7 @@ def __init__( extra_scopes: Optional[List[str]] = None, credentials: Optional[Any] = None, retry_config: Optional[Union[RetryConfig, Dict[str, Any], bool]] = None, + default_api_version: Union[APIVersion, str] = APIVersion.V1ALPHA, ): """Initialize ChronicleClient. @@ -350,48 +413,30 @@ def __init__( credentials: Credentials object retry_config: Request retry configurations. If set to false, retry will be disabled. + default_api_version: Default API version to use for requests. """ self.project_id = project_id self.customer_id = customer_id self.region = region + self.default_api_version = APIVersion(default_api_version) self._default_forwarder_display_name: str = "Wrapper-SDK-Forwarder" self._cached_default_forwarder_id: Optional[str] = None # Format the instance ID to match the expected format if region in ["dev", "staging"]: - # For dev and staging environments, - # use a different instance ID format + # Dev and staging use 'us' as the location self.instance_id = ( f"projects/{project_id}/locations/us/instances/{customer_id}" ) - # Set up the base URL for dev/staging - if region == "dev": - self.base_url = ( - "https://autopush-chronicle.sandbox.googleapis.com/v1alpha" - ) - self.base_v1_url = ( - "https://autopush-chronicle.sandbox.googleapis.com/v1" - ) - else: # staging - self.base_url = ( - "https://staging-chronicle.sandbox.googleapis.com/v1alpha" - ) - self.base_v1_url = ( - "https://staging-chronicle.sandbox.googleapis.com/v1" - ) else: # Standard production regions use the normal format self.instance_id = ( f"projects/{project_id}/locations/{region}/" f"instances/{customer_id}" ) - # Set up the base URL - self.base_url = ( - f"https://{self.region}-chronicle.googleapis.com/v1alpha" - ) - self.base_v1_url = ( - f"https://{self.region}-chronicle.googleapis.com/v1" - ) + + # Set up base URLs using BaseUrl for all regions + self.base_url = BaseUrl(self.default_api_version, self.region) # Create a session with authentication if session: diff --git a/src/secops/chronicle/feeds.py b/src/secops/chronicle/feeds.py index 0def8973..f7d06e84 100644 --- a/src/secops/chronicle/feeds.py +++ b/src/secops/chronicle/feeds.py @@ -17,6 +17,7 @@ """ from secops.exceptions import APIError from dataclasses import dataclass, asdict +from secops.chronicle.models import APIVersion from typing import Dict, Any, List, TypedDict, Optional, Union, Annotated import sys import os @@ -35,6 +36,10 @@ def __str__(self) -> str: return self.value +# List of Allowed version for feed endpoints +ALLOWED_ENDPOINT_VERSIONS = [APIVersion.V1ALPHA, APIVersion.V1BETA] + + @dataclass class CreateFeedModel: """Model for creating a feed. @@ -124,7 +129,10 @@ class FeedSecret(TypedDict): def list_feeds( - client, page_size: int = 100, page_token: str = None + client, + page_size: int = 100, + page_token: str = None, + api_version: Optional[APIVersion] = None, ) -> List[Feed]: """List feeds. @@ -132,6 +140,7 @@ def list_feeds( client: ChronicleClient instance page_size: The maximum number of feeds to return page_token: A page token, received from a previous ListFeeds call + api_version: (Optional) Preferred API version to use. Returns: List of feed dictionaries @@ -141,7 +150,10 @@ def list_feeds( """ feeds: list[dict] = [] - url = f"{client.base_url}/{client.instance_id}/feeds" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds" + ) more = True while more: params = {"pageSize": page_size, "pageToken": page_token} @@ -161,12 +173,15 @@ def list_feeds( return feeds -def get_feed(client, feed_id: str) -> Feed: +def get_feed( + client, feed_id: str, api_version: Optional[APIVersion] = None +) -> Feed: """Get a feed by ID. Args: client: ChronicleClient instance feed_id: Feed ID + api_version: (Optional) Preferred API version to use. Returns: Feed dictionary @@ -175,7 +190,10 @@ def get_feed(client, feed_id: str) -> Feed: APIError: If the API request fails """ feed_id = os.path.basename(feed_id) - url = f"{client.base_url}/{client.instance_id}/feeds/{feed_id}" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds/{feed_id}" + ) response = client.session.get(url) if response.status_code != 200: raise APIError(f"Failed to get feed: {response.text}") @@ -183,12 +201,17 @@ def get_feed(client, feed_id: str) -> Feed: return response.json() -def create_feed(client, feed_config: CreateFeedModel) -> Feed: +def create_feed( + client, + feed_config: CreateFeedModel, + api_version: Optional[APIVersion] = None, +) -> Feed: """Create a new feed. Args: client: ChronicleClient instance feed_config: Feed configuration model + api_version: (Optional) Preferred API version to use. Returns: Created feed dictionary @@ -196,7 +219,10 @@ def create_feed(client, feed_config: CreateFeedModel) -> Feed: Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}/feeds" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds" + ) response = client.session.post(url, json=feed_config.to_dict()) if response.status_code != 200: raise APIError(f"Failed to create feed: {response.text}") @@ -209,6 +235,7 @@ def update_feed( feed_id: str, feed_config: CreateFeedModel, update_mask: Optional[Union[List[str], None]] = None, + api_version: Optional[APIVersion] = None, ) -> Feed: """Update an existing feed. @@ -217,6 +244,7 @@ def update_feed( feed_id: Feed ID feed_config: Feed configuration model update_mask: Optional list of fields to update + api_version: (Optional) Preferred API version to use. Returns: Updated feed dictionary @@ -224,7 +252,10 @@ def update_feed( Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}/feeds/{feed_id}" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds/{feed_id}" + ) if update_mask is None: update_mask = [] @@ -246,28 +277,37 @@ def update_feed( return response.json() -def delete_feed(client, feed_id: str) -> None: +def delete_feed( + client, feed_id: str, api_version: Optional[APIVersion] = None +) -> None: """Delete a feed. Args: client: ChronicleClient instance feed_id: Feed ID + api_version: (Optional) Preferred API version to use. Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}/feeds/{feed_id}" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds/{feed_id}" + ) response = client.session.delete(url) if response.status_code != 200: raise APIError(f"Failed to delete feed: {response.text}") -def disable_feed(client, feed_id: str) -> Feed: +def disable_feed( + client, feed_id: str, api_version: Optional[APIVersion] = None +) -> Feed: """Disable a feed. Args: client: ChronicleClient instance feed_id: Feed ID + api_version: (Optional) Preferred API version to use. Returns: Disabled feed dictionary @@ -275,7 +315,10 @@ def disable_feed(client, feed_id: str) -> Feed: Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}/feeds/{feed_id}:disable" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds/{feed_id}:disable" + ) response = client.session.post(url) if response.status_code != 200: raise APIError(f"Failed to disable feed: {response.text}") @@ -283,12 +326,15 @@ def disable_feed(client, feed_id: str) -> Feed: return response.json() -def enable_feed(client, feed_id: str) -> Feed: +def enable_feed( + client, feed_id: str, api_version: Optional[APIVersion] = None +) -> Feed: """Enable a feed. Args: client: ChronicleClient instance feed_id: Feed ID + api_version: (Optional) Preferred API version to use. Returns: Enabled feed dictionary @@ -296,7 +342,10 @@ def enable_feed(client, feed_id: str) -> Feed: Raises: APIError: If the API request fails """ - url = f"{client.base_url}/{client.instance_id}/feeds/{feed_id}:enable" + url = ( + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds/{feed_id}:enable" + ) response = client.session.post(url) if response.status_code != 200: raise APIError(f"Failed to enable feed: {response.text}") @@ -304,12 +353,15 @@ def enable_feed(client, feed_id: str) -> Feed: return response.json() -def generate_secret(client, feed_id: str) -> FeedSecret: +def generate_secret( + client, feed_id: str, api_version: Optional[APIVersion] = None +) -> FeedSecret: """Generate a secret for a feed. Args: client: ChronicleClient instance feed_id: Feed ID + api_version: (Optional) Preferred API version to use. Returns: Dictionary containing the generated secret @@ -318,7 +370,8 @@ def generate_secret(client, feed_id: str) -> FeedSecret: APIError: If the API request fails """ url = ( - f"{client.base_url}/{client.instance_id}/feeds/{feed_id}:generateSecret" + f"{client.base_url(api_version, ALLOWED_ENDPOINT_VERSIONS)}/" + f"{client.instance_id}/feeds/{feed_id}:generateSecret" ) response = client.session.post(url) if response.status_code != 200: diff --git a/src/secops/chronicle/models.py b/src/secops/chronicle/models.py index fa8c158b..229fac30 100644 --- a/src/secops/chronicle/models.py +++ b/src/secops/chronicle/models.py @@ -13,6 +13,7 @@ # limitations under the License. # """Data models for Chronicle API responses.""" +import sys import json from dataclasses import asdict, dataclass, field from datetime import datetime @@ -21,6 +22,16 @@ from secops.exceptions import SecOpsError +# Use built-in StrEnum if Python 3.11+, otherwise create a compatible version +if sys.version_info >= (3, 11): + from enum import StrEnum +else: + class StrEnum(str, Enum): + """String enum implementation for Python versions before 3.11.""" + + def __str__(self) -> str: + return self.value + class AlertState(str, Enum): """Alert state for filtering detections. @@ -462,3 +473,9 @@ def update_fields(self) -> List[str]: ] if getattr(self, field) is not None ] + + +class APIVersion(StrEnum): + V1 = "v1" + V1BETA = "v1beta" + V1ALPHA = "v1alpha" diff --git a/src/secops/chronicle/reference_list.py b/src/secops/chronicle/reference_list.py index 5612637a..bf82c621 100644 --- a/src/secops/chronicle/reference_list.py +++ b/src/secops/chronicle/reference_list.py @@ -8,6 +8,7 @@ REF_LIST_DATA_TABLE_ID_REGEX, validate_cidr_entries, ) +from secops.chronicle.models import APIVersion from secops.exceptions import APIError, SecOpsError # Use built-in StrEnum if Python 3.11+, otherwise create a compatible version @@ -66,6 +67,7 @@ def create_reference_list( description: str = "", entries: List[str] = None, syntax_type: ReferenceListSyntaxType = ReferenceListSyntaxType.STRING, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Create a new reference list. @@ -75,6 +77,7 @@ def create_reference_list( description: A user-provided description of the reference list entries: A list of entries for the reference list syntax_type: The syntax type of the reference list + api_version: Preferred API version to use. Defaults to V1 Returns: Dictionary containing the created reference list @@ -100,7 +103,8 @@ def create_reference_list( _validate_cidr_entries(entries) response = client.session.post( - f"{client.base_v1_url}/{client.instance_id}/referenceLists", + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/referenceLists", json={ "description": description, "entries": [{"value": x} for x in entries], @@ -119,7 +123,10 @@ def create_reference_list( def get_reference_list( - client: "Any", name: str, view: ReferenceListView = ReferenceListView.FULL + client: "Any", + name: str, + view: ReferenceListView = ReferenceListView.FULL, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Get a single reference list. @@ -128,6 +135,7 @@ def get_reference_list( name: The name of the reference list view: How much of the ReferenceList to view. Defaults to REFERENCE_LIST_VIEW_FULL. + api_version: Preferred API version to use. Defaults to V1 Returns: Dictionary containing the reference list @@ -140,7 +148,8 @@ def get_reference_list( params["view"] = view.value response = client.session.get( - f"{client.base_v1_url}/{client.instance_id}/referenceLists/{name}", + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/referenceLists/{name}", params=params if params else None, ) @@ -156,6 +165,7 @@ def get_reference_list( def list_reference_lists( client: "Any", view: ReferenceListView = ReferenceListView.BASIC, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> List[Dict[str, Any]]: """List reference lists. @@ -163,6 +173,7 @@ def list_reference_lists( client: ChronicleClient instance view: How much of each ReferenceList to view. Defaults to REFERENCE_LIST_VIEW_BASIC. + api_version: Preferred API version to use. Defaults to V1 Returns: List of reference lists, ordered in ascending alphabetical order by name @@ -178,7 +189,8 @@ def list_reference_lists( while True: response = client.session.get( - f"{client.base_v1_url}/{client.instance_id}/referenceLists", + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/referenceLists", params=params, ) @@ -205,6 +217,7 @@ def update_reference_list( name: str, description: Optional[str] = None, entries: Optional[List[str]] = None, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Update a reference list. @@ -213,6 +226,7 @@ def update_reference_list( name: The name of the reference list description: A user-provided description of the reference list entries: A list of entries for the reference list + api_version: Preferred API version to use. Defaults to V1 Returns: Dictionary containing the updated reference list @@ -252,7 +266,8 @@ def update_reference_list( params = {"updateMask": ",".join(update_paths)} response = client.session.patch( - f"{client.base_v1_url}/{client.instance_id}/referenceLists/{name}", + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/referenceLists/{name}", json=payload, params=params, ) diff --git a/src/secops/chronicle/rule.py b/src/secops/chronicle/rule.py index e0484e4b..860b9bef 100644 --- a/src/secops/chronicle/rule.py +++ b/src/secops/chronicle/rule.py @@ -17,24 +17,30 @@ from typing import Dict, Any, Iterator, Optional, List, Literal from datetime import datetime, timezone import json +from secops.chronicle.models import APIVersion from secops.exceptions import APIError, SecOpsError import re -def create_rule(client, rule_text: str) -> Dict[str, Any]: +def create_rule( + client, rule_text: str, api_version: Optional[APIVersion] = APIVersion.V1 +) -> Dict[str, Any]: """Creates a new detection rule to find matches in logs. Args: client: ChronicleClient instance rule_text: Content of the new detection rule, used to evaluate logs. - + api_version: Preferred API version to use. Defaults to V1 Returns: Dictionary containing the created rule information Raises: APIError: If the API request fails """ - url = f"{client.base_v1_url}/{client.instance_id}/rules" + url = ( + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules" + ) body = { "text": rule_text, @@ -48,7 +54,9 @@ def create_rule(client, rule_text: str) -> Dict[str, Any]: return response.json() -def get_rule(client, rule_id: str) -> Dict[str, Any]: +def get_rule( + client, rule_id: str, api_version: Optional[APIVersion] = APIVersion.V1 +) -> Dict[str, Any]: """Get a rule by ID. Args: @@ -63,7 +71,10 @@ def get_rule(client, rule_id: str) -> Dict[str, Any]: Raises: APIError: If the API request fails """ - url = f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}" + url = ( + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}" + ) response = client.session.get(url) @@ -78,6 +89,7 @@ def list_rules( view: Optional[str] = "FULL", page_size: Optional[int] = None, page_token: Optional[str] = None, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Gets a list of rules. @@ -92,6 +104,7 @@ def list_rules( Defaults to "FULL". page_size: Maximum number of rules to return per page. page_token: Token for the next page of results, if available. + api_version: (Optional) Preferred API version to use. Returns: Dictionary containing information about rules @@ -106,7 +119,10 @@ def list_rules( params["pageToken"] = page_token while more: - url = f"{client.base_v1_url}/{client.instance_id}/rules" + url = ( + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules" + ) response = client.session.get(url, params=params) if response.status_code != 200: @@ -136,7 +152,12 @@ def list_rules( return rules -def update_rule(client, rule_id: str, rule_text: str) -> Dict[str, Any]: +def update_rule( + client, + rule_id: str, + rule_text: str, + api_version: Optional[APIVersion] = APIVersion.V1, +) -> Dict[str, Any]: """Updates a rule. Args: @@ -150,7 +171,10 @@ def update_rule(client, rule_id: str, rule_text: str) -> Dict[str, Any]: Raises: APIError: If the API request fails """ - url = f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}" + url = ( + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}" + ) body = { "text": rule_text, @@ -166,7 +190,12 @@ def update_rule(client, rule_id: str, rule_text: str) -> Dict[str, Any]: return response.json() -def delete_rule(client, rule_id: str, force: bool = False) -> Dict[str, Any]: +def delete_rule( + client, + rule_id: str, + force: bool = False, + api_version: Optional[APIVersion] = APIVersion.V1, +) -> Dict[str, Any]: """Deletes a rule. Args: @@ -180,7 +209,10 @@ def delete_rule(client, rule_id: str, force: bool = False) -> Dict[str, Any]: Raises: APIError: If the API request fails """ - url = f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}" + url = ( + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}" + ) params = {} if force: @@ -231,7 +263,9 @@ def set_rule_alerting( return update_rule_deployment(client, rule_id, alerting=alerting_enabled) -def get_rule_deployment(client, rule_id: str) -> Dict[str, Any]: +def get_rule_deployment( + client, rule_id: str, api_version: Optional[APIVersion] = APIVersion.V1 +) -> Dict[str, Any]: """Gets the current deployment for a rule. Args: @@ -248,7 +282,8 @@ def get_rule_deployment(client, rule_id: str) -> Dict[str, Any]: """ url = ( - f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}/deployment" + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}/deployment" ) response = client.session.get(url) if response.status_code != 200: @@ -261,6 +296,7 @@ def list_rule_deployments( page_size: Optional[int] = None, page_token: Optional[str] = None, filter_query: Optional[str] = None, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Lists rule deployments for the instance. @@ -288,7 +324,10 @@ def list_rule_deployments( if filter_query: params["filter"] = filter_query - url = f"{client.base_v1_url}/{client.instance_id}/rules/-/deployments" + url = ( + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/-/deployments" + ) if page_size: response = client.session.get(url, params=params) @@ -318,7 +357,9 @@ def list_rule_deployments( return deployments -def search_rules(client, query: str) -> Dict[str, Any]: +def search_rules( + client, query: str, api_version: Optional[APIVersion] = APIVersion.V1 +) -> Dict[str, Any]: """Search for rules. Args: @@ -336,7 +377,7 @@ def search_rules(client, query: str) -> Dict[str, Any]: except re.error as e: raise SecOpsError(f"Invalid regular expression: {query}") from e - rules = list_rules(client) + rules = list_rules(client, api_version=api_version) results = {"rules": []} for rule in rules["rules"]: rule_text = rule.get("text", "") @@ -471,6 +512,7 @@ def update_rule_deployment( alerting: Optional[bool] = None, archived: Optional[bool] = None, run_frequency: Optional[Literal["LIVE", "HOURLY", "DAILY"]] = None, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Update deployment settings for a rule. @@ -503,7 +545,8 @@ def update_rule_deployment( they are specified by the caller. """ url = ( - f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}/deployment" + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}/deployment" ) body: Dict[str, Any] = {} diff --git a/src/secops/chronicle/rule_retrohunt.py b/src/secops/chronicle/rule_retrohunt.py index 2c75075e..1c309b5f 100644 --- a/src/secops/chronicle/rule_retrohunt.py +++ b/src/secops/chronicle/rule_retrohunt.py @@ -15,12 +15,18 @@ """Retrohunt functionality for Chronicle rules.""" from datetime import datetime -from typing import Dict, Any +from typing import Any, Dict, Optional + +from secops.chronicle.models import APIVersion from secops.exceptions import APIError def create_retrohunt( - client, rule_id: str, start_time: datetime, end_time: datetime + client, + rule_id: str, + start_time: datetime, + end_time: datetime, + api_version: Optional[APIVersion] = APIVersion.V1, ) -> Dict[str, Any]: """Creates a retrohunt for a rule. @@ -32,6 +38,7 @@ def create_retrohunt( rule_id: Unique ID of the rule to run retrohunt for ("ru_") start_time: Start time for retrohunt analysis end_time: End time for retrohunt analysis + api_version: Preferred API version to use. Defaults to V1 Returns: Dictionary containing operation information for the retrohunt @@ -40,7 +47,8 @@ def create_retrohunt( APIError: If the API request fails """ url = ( - f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}/retrohunts" + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}/retrohunts" ) body = { @@ -58,7 +66,12 @@ def create_retrohunt( return response.json() -def get_retrohunt(client, rule_id: str, operation_id: str) -> Dict[str, Any]: +def get_retrohunt( + client, + rule_id: str, + operation_id: str, + api_version: Optional[APIVersion] = APIVersion.V1, +) -> Dict[str, Any]: """Get retrohunt status and results. Args: @@ -66,6 +79,7 @@ def get_retrohunt(client, rule_id: str, operation_id: str) -> Dict[str, Any]: rule_id: Unique ID of the rule the retrohunt is for ("ru_" or "ru_@v__") operation_id: Operation ID of the retrohunt + api_version: Preferred API version to use. Defaults to V1 Returns: Dictionary containing retrohunt information @@ -74,8 +88,8 @@ def get_retrohunt(client, rule_id: str, operation_id: str) -> Dict[str, Any]: APIError: If the API request fails """ url = ( - f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}" - f"/retrohunts/{operation_id}" + f"{client.base_url(api_version, list(APIVersion))}/" + f"{client.instance_id}/rules/{rule_id}/retrohunts/{operation_id}" ) response = client.session.get(url) diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index 8f46c609..ddbfdd61 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -105,13 +105,18 @@ def _setup_client_core( chronicle_kwargs = {} # Build kwargs with precedence: CLI args > config file > None - for arg in required_args + ["region"]: # region is optional + optional_args = ["region", "api_version"] + for arg in required_args + optional_args: # Check CLI args first if hasattr(args, arg) and getattr(args, arg): - chronicle_kwargs[arg] = getattr(args, arg) + # Map api_version to default_api_version for chronicle() + key = "default_api_version" if arg == "api_version" else arg + chronicle_kwargs[key] = getattr(args, arg) # Fall back to config if not in args elif arg in config: - chronicle_kwargs[arg] = config[arg] + # Map api_version to default_api_version for chronicle() + key = "default_api_version" if arg == "api_version" else arg + chronicle_kwargs[key] = config[arg] # Check for missing required arguments missing = [ diff --git a/src/secops/cli/commands/config.py b/src/secops/cli/commands/config.py index dfdb02e4..c1ff51d6 100644 --- a/src/secops/cli/commands/config.py +++ b/src/secops/cli/commands/config.py @@ -74,6 +74,8 @@ def handle_config_set_command(args, chronicle=None): config["project_id"] = args.project_id if args.region: config["region"] = args.region + if hasattr(args, "api_version") and args.api_version: + config["api_version"] = args.api_version if args.service_account: config["service_account"] = args.service_account if args.start_time: diff --git a/src/secops/cli/utils/common_args.py b/src/secops/cli/utils/common_args.py index fcc4697c..76014ef9 100644 --- a/src/secops/cli/utils/common_args.py +++ b/src/secops/cli/utils/common_args.py @@ -69,6 +69,16 @@ def add_chronicle_args(parser: argparse.ArgumentParser) -> None: default=config.get("region", "us"), help="Chronicle API region", ) + parser.add_argument( + "--api-version", + "--api_version", + dest="api_version", + choices=["v1", "v1beta", "v1alpha"], + default=config.get("api_version", "v1alpha"), + help=( + "Default API version for Chronicle requests " "(default: v1alpha)" + ), + ) def add_time_range_args(parser: argparse.ArgumentParser) -> None: diff --git a/src/secops/client.py b/src/secops/client.py index 1084f919..bf35f5ec 100644 --- a/src/secops/client.py +++ b/src/secops/client.py @@ -53,7 +53,11 @@ def __init__( self._chronicle = None def chronicle( - self, customer_id: str, project_id: str, region: str = "us" + self, + customer_id: str, + project_id: str, + region: str = "us", + default_api_version: Union[str, Any] = "v1alpha", ) -> ChronicleClient: """Get Chronicle API client. @@ -61,6 +65,8 @@ def chronicle( customer_id: Chronicle customer ID project_id: GCP project ID region: Chronicle API region (default: "us") + default_api_version: Default API version for Chronicle requests. + Can be "v1", "v1beta", or "v1alpha" (default: "v1alpha"). Returns: ChronicleClient instance @@ -70,4 +76,5 @@ def chronicle( project_id=project_id, region=region, auth=self.auth, + default_api_version=default_api_version, ) diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index 81b2d520..ba237101 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -57,7 +57,6 @@ def test_chronicle_client_initialization(): assert client.customer_id == "test-customer" assert client.region == "us" assert client.base_url == "https://us-chronicle.googleapis.com/v1alpha" - assert client.base_v1_url == "https://us-chronicle.googleapis.com/v1" def test_chronicle_client_custom_user_agent(): diff --git a/tests/chronicle/test_data_tables.py b/tests/chronicle/test_data_tables.py index 47547852..77928f19 100644 --- a/tests/chronicle/test_data_tables.py +++ b/tests/chronicle/test_data_tables.py @@ -7,6 +7,7 @@ call, ) # Added call for checking multiple calls if needed +from secops.chronicle.models import APIVersion from secops.chronicle.client import ChronicleClient # This will be the actual client # We'll need to import the enums and functions once they are in their final place @@ -25,14 +26,16 @@ @pytest.fixture -def mock_chronicle_client() -> Mock: - """Provides a mock ChronicleClient with a mock session.""" - client = Mock(spec=ChronicleClient) - client.session = Mock() - client.base_url = "https://test-chronicle.googleapis.com/v1alpha" - client.base_v1_url = "https://test-chronicle.googleapis.com/v1" - client.instance_id = "projects/test-project/locations/us/instances/test-customer" - return client +def mock_chronicle_client(): + """Provides a ChronicleClient with a mock session for testing.""" + with patch("secops.auth.SecOpsAuth") as mock_auth: + mock_session = Mock() + mock_session.headers = {} + mock_auth.return_value.session = mock_session + return ChronicleClient( + customer_id="test-customer", + project_id="test-project", + ) # ---- Test Data Tables ---- @@ -421,7 +424,7 @@ def test_create_reference_list_success( assert result["description"] == description assert len(result["entries"]) == 2 mock_chronicle_client.session.post.assert_called_once_with( - f"{mock_chronicle_client.base_v1_url}/{mock_chronicle_client.instance_id}/referenceLists", + f"{mock_chronicle_client.base_url(APIVersion.V1)}/{mock_chronicle_client.instance_id}/referenceLists", params={"referenceListId": rl_name}, json={ "description": description, @@ -489,7 +492,7 @@ def test_get_reference_list_full_view_success( assert result["description"] == "Full RL details" assert len(result["entries"]) == 1 mock_chronicle_client.session.get.assert_called_once_with( - f"{mock_chronicle_client.base_v1_url}/{mock_chronicle_client.instance_id}/referenceLists/{rl_name}", + f"{mock_chronicle_client.base_url(APIVersion.V1)}/{mock_chronicle_client.instance_id}/referenceLists/{rl_name}", params={"view": ReferenceListView.FULL.value}, ) @@ -518,7 +521,7 @@ def test_list_reference_lists_basic_view_success( assert results[0]["displayName"] == "rl_basic1" assert "entries" not in results[0] # Entries are not in BASIC view mock_chronicle_client.session.get.assert_called_once_with( - f"{mock_chronicle_client.base_v1_url}/{mock_chronicle_client.instance_id}/referenceLists", + f"{mock_chronicle_client.base_url(APIVersion.V1)}/{mock_chronicle_client.instance_id}/referenceLists", params={"pageSize": 1000, "view": ReferenceListView.BASIC.value}, ) @@ -564,7 +567,7 @@ def test_update_reference_list_success( assert result["entries"][0]["value"] == "updated_entryX" mock_chronicle_client.session.patch.assert_called_once_with( - f"{mock_chronicle_client.base_v1_url}/{mock_chronicle_client.instance_id}/referenceLists/{rl_name}", + f"{mock_chronicle_client.base_url(APIVersion.V1)}/{mock_chronicle_client.instance_id}/referenceLists/{rl_name}", json={ "description": new_description, "entries": [{"value": "updated_entryX"}, {"value": "new_entryY"}], diff --git a/tests/chronicle/test_rule.py b/tests/chronicle/test_rule.py index 5dd47ed1..71c94718 100644 --- a/tests/chronicle/test_rule.py +++ b/tests/chronicle/test_rule.py @@ -17,6 +17,7 @@ import pytest from unittest.mock import Mock, patch from secops.chronicle.client import ChronicleClient +from secops.chronicle.models import APIVersion from secops.chronicle.rule import ( create_rule, get_rule, @@ -40,7 +41,8 @@ def chronicle_client(): mock_session.headers = {} mock_auth.return_value.session = mock_session return ChronicleClient( - customer_id="test-customer", project_id="test-project" + customer_id="test-customer", project_id="test-project", + default_api_version=APIVersion.V1 ) @@ -96,7 +98,7 @@ def test_create_rule(chronicle_client, mock_response): # Assert mock_post.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules", json={"text": "rule test {}"}, ) assert result == mock_response.json.return_value @@ -127,7 +129,7 @@ def test_get_rule(chronicle_client, mock_response): # Assert mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}" + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}" ) assert result == mock_response.json.return_value @@ -161,7 +163,7 @@ def test_list_rules(chronicle_client, mock_response): # Assert mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules", params={"pageSize": 1000, "view": "FULL"}, ) assert result == mock_response.json.return_value @@ -181,7 +183,7 @@ def test_list_rules_empty(chronicle_client, mock_response): # Assert mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules", params={"pageSize": 1000, "view": "FULL"}, ) assert result == {"rules": []} @@ -261,7 +263,7 @@ def test_update_rule(chronicle_client, mock_response): # Assert mock_patch.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}", params={"update_mask": "text"}, json={"text": rule_text}, ) @@ -298,7 +300,7 @@ def test_delete_rule(chronicle_client, mock_response): # Assert mock_delete.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}", params={}, ) assert result == {} @@ -333,7 +335,7 @@ def test_delete_rule_force(chronicle_client, mock_response): # Assert mock_delete.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}", params={"force": "true"}, ) assert result == {} @@ -352,7 +354,7 @@ def test_enable_rule(chronicle_client, mock_response): # Assert mock_patch.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}/deployment", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}/deployment", params={"update_mask": "enabled"}, json={"enabled": True}, ) @@ -372,7 +374,7 @@ def test_disable_rule(chronicle_client, mock_response): # Assert mock_patch.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}/deployment", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}/deployment", params={"update_mask": "enabled"}, json={"enabled": False}, ) @@ -409,7 +411,7 @@ def test_search_rules(chronicle_client, mock_response): # Assert mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules", params={"pageSize": 1000, "view": "FULL"}, ) assert result == mock_response.json.return_value @@ -570,7 +572,7 @@ def test_get_rule_deployment(chronicle_client, mock_response): result = get_rule_deployment(chronicle_client, rule_id) mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/{rule_id}/deployment" + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/{rule_id}/deployment" ) assert result == mock_response.json.return_value @@ -603,7 +605,7 @@ def test_list_rule_deployments_single_page(chronicle_client, mock_response): result = list_rule_deployments(chronicle_client) mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/-/deployments", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/-/deployments", params={}, ) assert result == { @@ -678,7 +680,7 @@ def test_list_rule_deployments_empty(chronicle_client, mock_response): # Assert mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/-/deployments", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/-/deployments", params={}, ) assert result == {"ruleDeployments": []} @@ -705,7 +707,7 @@ def test_list_rule_deployments_with_filter(chronicle_client, mock_response): # Assert mock_get.assert_called_once_with( - f"{chronicle_client.base_v1_url}/{chronicle_client.instance_id}/rules/-/deployments", + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/rules/-/deployments", params={"filter": filter_query}, ) assert result == { diff --git a/tests/chronicle/test_rule_deployment.py b/tests/chronicle/test_rule_deployment.py index d576e442..60d02c83 100644 --- a/tests/chronicle/test_rule_deployment.py +++ b/tests/chronicle/test_rule_deployment.py @@ -22,6 +22,7 @@ enable_rule, set_rule_alerting, ) +from secops.chronicle.models import APIVersion from secops.chronicle.client import ChronicleClient from secops.exceptions import APIError, SecOpsError @@ -46,7 +47,7 @@ def response_mock(): def _deployment_url(client: ChronicleClient, rule_id: str) -> str: - return f"{client.base_v1_url}/{client.instance_id}/rules/{rule_id}/deployment" + return f"{client.base_url(APIVersion.V1)}/{client.instance_id}/rules/{rule_id}/deployment" def test_update_rule_deployment_enabled(chronicle_client, response_mock):