diff --git a/CHANGELOG.md b/CHANGELOG.md index 90bfd2f..1b8217b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.30.0] - 2025-12-22 +### Added +- Support for following watchlist management methods: + - List watchlists + - Create watchlist + - Get watchlist details + - Update watchlist + - Delete watchlist + ## [0.29.0] - 2025-12-17 ### Added - Support for following log/data processing pipeline methods: diff --git a/CLI.md b/CLI.md index ba997e6..65945ac 100644 --- a/CLI.md +++ b/CLI.md @@ -666,6 +666,49 @@ secops parser-extension activate --log-type OKTA --id "1234567890" secops parser-extension delete --log-type OKTA --id "1234567890" ``` +### Watchlist Management + +List watchlists: + +```bash +# List all watchlists +secops watchlist list + +# List watchlist with pagination +secops watchlist list --page-size 50 +``` + +Get watchlist details: + +```bash +secops watchlist get --watchlist-id "abc-123-def" +``` + +Create a new watchlist: + +```bash +secops watchlist create --name "my_watchlist" --display-name "my_watchlist" --description "My watchlist description" --multiplying-factor 1.5 +``` + +Update a watchlist: + +```bash +# Update display name and description +secops watchlist update --watchlist-id "abc-123-def" --display-name "Updated Name" --description "Updated description" + +# Update multiplying factor and pin the watchlist +secops watchlist update --watchlist-id "abc-123-def" --multiplying-factor 2.0 --pinned true + +# Update entity population mechanism (JSON string or file path) +secops watchlist update --watchlist-id "abc-123-def" --entity-population-mechanism '{"manual": {}}' +``` + +Delete a watchlist: + +```bash +secops watchlist delete --watchlist-id "abc-123-def" +``` + ### Rule Management List detection rules: diff --git a/README.md b/README.md index d9fce04..61d7b74 100644 --- a/README.md +++ b/README.md @@ -1704,6 +1704,60 @@ extension_id = "1234567890" chronicle.delete_parser_extension(log_type, extension_id) ``` +## Watchlist Management + +### Creating a Watchlist + +Create a new watchlist: + +```python +watchlist = chronicle.create_watchlist( + name="my_watchlist", + display_name="my_watchlist", + multiplying_factor=1.5, + description="My new watchlist" +) +``` + +### Updating a Watchlist + +Update a watchlist by ID: + +```python +updated_watchlist = chronicle.update_watchlist( + watchlist_id="abc-123-def", + display_name="Updated Watchlist Name", + description="Updated description", + multiplying_factor=2.0, + entity_population_mechanism={"manual": {}}, + watchlist_user_preferences={"pinned": True} +) +``` + +### Deleting a Watchlist + +Delete a watchlist by ID: + +```python +chronicle.delete_watchlist("acb-123-def", force=True) +``` + +### Getting a Watchlist + +Get a watchlist by ID: + +```python +watchlist = chronicle.get_watchlist("acb-123-def") +``` + +### List all Watchlists + +List all watchlists: + +```python +watchlists = chronicle.list_watchlists() +``` + ## Rule Management The SDK provides comprehensive support for managing Chronicle detection rules: diff --git a/api_module_mapping.md b/api_module_mapping.md index 0ed9f0f..726b930 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -4,87 +4,87 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ **Note:** All the REST resources mentioned have suffix `projects.locations.instances`. -|REST Resource |Version|secops-wrapper module |CLI Command | -|------------------------------------------------------------------------------|-------|------------------------------------------------------------|---------------------------------------| -|dataAccessLabels.create |v1 | | | -|dataAccessLabels.delete |v1 | | | -|dataAccessLabels.get |v1 | | | -|dataAccessLabels.list |v1 | | | -|dataAccessLabels.patch |v1 | | | -|dataAccessScopes.create |v1 | | | -|dataAccessScopes.delete |v1 | | | -|dataAccessScopes.get |v1 | | | -|dataAccessScopes.list |v1 | | | -|dataAccessScopes.patch |v1 | | | -|get |v1 | | | -|operations.cancel |v1 | | | -|operations.delete |v1 | | | -|operations.get |v1 | | | -|operations.list |v1 | | | -|referenceLists.create |v1 |chronicle.reference_list.create_reference_list |secops reference-list create | -|referenceLists.get |v1 |chronicle.reference_list.get_reference_list |secops reference-list get | -|referenceLists.list |v1 |chronicle.reference_list.list_reference_lists |secops reference-list list | -|referenceLists.patch |v1 |chronicle.reference_list.update_reference_list |secops reference-list update | -|rules.create |v1 |chronicle.rule.create_rule |secops rule create | -|rules.delete |v1 |chronicle.rule.delete_rule |secops rule delete | -|rules.deployments.list |v1 | | | -|rules.get |v1 |chronicle.rule.get_rule |secops rule get | -|rules.getDeployment |v1 | | | -|rules.list |v1 |chronicle.rule.list_rules |secops rule list | -|rules.listRevisions |v1 | | | -|rules.patch |v1 |chronicle.rule.update_rule |secops rule update | -|rules.retrohunts.create |v1 |chronicle.rule_retrohunt.create_retrohunt | | -|rules.retrohunts.get |v1 |chronicle.rule_retrohunt.get_retrohunt | | -|rules.retrohunts.list |v1 | | | -|rules.updateDeployment |v1 |chronicle.rule.enable_rule |secops rule enable | -|watchlists.create |v1 | | | -|watchlists.delete |v1 | | | -|watchlists.get |v1 | | | -|watchlists.list |v1 | | | -|watchlists.patch |v1 | | | -|dataAccessLabels.create |v1beta | | | -|dataAccessLabels.delete |v1beta | | | -|dataAccessLabels.get |v1beta | | | -|dataAccessLabels.list |v1beta | | | -|dataAccessLabels.patch |v1beta | | | -|dataAccessScopes.create |v1beta | | | -|dataAccessScopes.delete |v1beta | | | -|dataAccessScopes.get |v1beta | | | -|dataAccessScopes.list |v1beta | | | -|dataAccessScopes.patch |v1beta | | | -|get |v1beta | | | -|operations.cancel |v1beta | | | -|operations.delete |v1beta | | | -|operations.get |v1beta | | | -|operations.list |v1beta | | | -|referenceLists.create |v1beta | | | -|referenceLists.get |v1beta | | | -|referenceLists.list |v1beta | | | -|referenceLists.patch |v1beta | | | -|rules.create |v1beta | | | -|rules.delete |v1beta | | | -|rules.deployments.list |v1beta | | | -|rules.get |v1beta | | | -|rules.getDeployment |v1beta | | | -|rules.list |v1beta | | | -|rules.listRevisions |v1beta | | | -|rules.patch |v1beta | | | -|rules.retrohunts.create |v1beta | | | -|rules.retrohunts.get |v1beta | | | -|rules.retrohunts.list |v1beta | | | -|rules.updateDeployment |v1beta | | | -|watchlists.create |v1beta | | | -|watchlists.delete |v1beta | | | -|watchlists.get |v1beta | | | -|watchlists.list |v1beta | | | -|watchlists.patch |v1beta | | | -|analytics.entities.analyticValues.list |v1alpha| | | -|analytics.list |v1alpha| | | -|batchValidateWatchlistEntities |v1alpha| | | -|bigQueryAccess.provide |v1alpha| | | -|bigQueryExport.provision |v1alpha| | | -|cases.countPriorities |v1alpha| | | -|curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.batchUpdate | v1alpha | chronicle.rule_set.batch_update_curated_rule_set_deployments | | +| REST Resource | Version | secops-wrapper module | CLI Command | +|--------------------------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------|------------------------------------------------| +| dataAccessLabels.create | v1 | | | +| dataAccessLabels.delete | v1 | | | +| dataAccessLabels.get | v1 | | | +| dataAccessLabels.list | v1 | | | +| dataAccessLabels.patch | v1 | | | +| dataAccessScopes.create | v1 | | | +| dataAccessScopes.delete | v1 | | | +| dataAccessScopes.get | v1 | | | +| dataAccessScopes.list | v1 | | | +| dataAccessScopes.patch | v1 | | | +| get | v1 | | | +| operations.cancel | v1 | | | +| operations.delete | v1 | | | +| operations.get | v1 | | | +| operations.list | v1 | | | +| referenceLists.create | v1 | chronicle.reference_list.create_reference_list | secops reference-list create | +| referenceLists.get | v1 | chronicle.reference_list.get_reference_list | secops reference-list get | +| referenceLists.list | v1 | chronicle.reference_list.list_reference_lists | secops reference-list list | +| referenceLists.patch | v1 | chronicle.reference_list.update_reference_list | secops reference-list update | +| rules.create | v1 | chronicle.rule.create_rule | secops rule create | +| rules.delete | v1 | chronicle.rule.delete_rule | secops rule delete | +| rules.deployments.list | v1 | | | +| rules.get | v1 | chronicle.rule.get_rule | secops rule get | +| rules.getDeployment | v1 | | | +| rules.list | v1 | chronicle.rule.list_rules | secops rule list | +| rules.listRevisions | v1 | | | +| rules.patch | v1 | chronicle.rule.update_rule | secops rule update | +| rules.retrohunts.create | v1 | chronicle.rule_retrohunt.create_retrohunt | | +| rules.retrohunts.get | v1 | chronicle.rule_retrohunt.get_retrohunt | | +| rules.retrohunts.list | v1 | | | +| rules.updateDeployment | v1 | chronicle.rule.enable_rule | secops rule enable | +| watchlists.create | v1 | chronicle.watchlist.create_watchlist | secops watchlist create | +| watchlists.delete | v1 | chronicle.watchlist.delete_watchlist | secops watchlist delete | +| watchlists.get | v1 | chronicle.watchlist.get_watchlist | secops watchlist get | +| watchlists.list | v1 | chronicle.watchlist.list_watchlists | secops watchlist list | +| watchlists.patch | v1 | chronicle.watchlist.update_watchlist | secops watchlist update | +| dataAccessLabels.create | v1beta | | | +| dataAccessLabels.delete | v1beta | | | +| dataAccessLabels.get | v1beta | | | +| dataAccessLabels.list | v1beta | | | +| dataAccessLabels.patch | v1beta | | | +| dataAccessScopes.create | v1beta | | | +| dataAccessScopes.delete | v1beta | | | +| dataAccessScopes.get | v1beta | | | +| dataAccessScopes.list | v1beta | | | +| dataAccessScopes.patch | v1beta | | | +| get | v1beta | | | +| operations.cancel | v1beta | | | +| operations.delete | v1beta | | | +| operations.get | v1beta | | | +| operations.list | v1beta | | | +| referenceLists.create | v1beta | | | +| referenceLists.get | v1beta | | | +| referenceLists.list | v1beta | | | +| referenceLists.patch | v1beta | | | +| rules.create | v1beta | | | +| rules.delete | v1beta | | | +| rules.deployments.list | v1beta | | | +| rules.get | v1beta | | | +| rules.getDeployment | v1beta | | | +| rules.list | v1beta | | | +| rules.listRevisions | v1beta | | | +| rules.patch | v1beta | | | +| rules.retrohunts.create | v1beta | | | +| rules.retrohunts.get | v1beta | | | +| rules.retrohunts.list | v1beta | | | +| rules.updateDeployment | v1beta | | | +| watchlists.create | v1beta | | | +| watchlists.delete | v1beta | | | +| watchlists.get | v1beta | | | +| watchlists.list | v1beta | | | +| watchlists.patch | v1beta | | | +| analytics.entities.analyticValues.list | v1alpha | | | +| analytics.list | v1alpha | | | +| batchValidateWatchlistEntities | v1alpha | | | +| bigQueryAccess.provide | v1alpha | | | +| bigQueryExport.provision | v1alpha | | | +| cases.countPriorities | v1alpha | | | +| curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.batchUpdate | v1alpha | chronicle.rule_set.batch_update_curated_rule_set_deployments | | | curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.patch | v1alpha | chronicle.rule_set.update_curated_rule_set_deployment | secops curated-rule rule-set-deployment update | | curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.list | v1alpha | chronicle.rule_set.list_curated_rule_set_deployments | secops curated-rule rule-set-deployment list | | curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments.get | v1alpha | chronicle.rule_set.get_curated_rule_set_deployment
chronicle.rule_set.get_curated_rule_set_deployment_by_name | secops curated-rule rule-set-deployment get | diff --git a/pyproject.toml b/pyproject.toml index 897997a..120e676 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.29.0" +version = "0.30.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 50522d9..b874c10 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -175,6 +175,13 @@ find_udm_field_values, ) from secops.chronicle.validate import validate_query +from secops.chronicle.watchlist import ( + list_watchlists, + get_watchlist, + delete_watchlist, + create_watchlist, + update_watchlist, +) __all__ = [ # Client @@ -327,4 +334,10 @@ "fetch_associated_pipeline", "fetch_sample_logs_by_streams", "test_pipeline", + # Watchlist + "list_watchlists", + "get_watchlist", + "delete_watchlist", + "create_watchlist", + "update_watchlist", ] diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 9267222..c96aedc 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -309,6 +309,13 @@ find_udm_field_values as _find_udm_field_values, ) from secops.chronicle.validate import validate_query as _validate_query +from secops.chronicle.watchlist import ( + list_watchlists as _list_watchlists, + get_watchlist as _get_watchlist, + delete_watchlist as _delete_watchlist, + create_watchlist as _create_watchlist, + update_watchlist as _update_watchlist, +) from secops.exceptions import SecOpsError @@ -605,6 +612,132 @@ def validate_query(self, query: str) -> dict[str, Any]: """ return _validate_query(self, query) + def list_watchlists( + self, + page_size: int | None = None, + page_token: str | None = None, + ) -> dict[str, Any]: + """Get a list of all watchlists. + + Args: + page_size: Maximum number of watchlists to return per page + page_token: Token for the next page of results, if available + + Returns: + Dictionary with list of watchlists + + Raises: + APIError: If the API request fails + """ + return _list_watchlists(self, page_size, page_token) + + def get_watchlist( + self, + watchlist_id: str, + ) -> dict[str, Any]: + """Get a specific watchlist by ID. + + Args: + watchlist_id: ID of the watchlist to retrieve + + Returns: + Watchlist + + Raises: + APIError: If the API request fails + """ + return _get_watchlist(self, watchlist_id) + + def delete_watchlist( + self, + watchlist_id: str, + force: bool | None = None, + ) -> dict[str, Any]: + """Delete a watchlist by ID. + + Args: + watchlist_id: ID of the watchlist to delete + force: Optional. If set to true, any entities under this + watchlist will also be deleted. + (Otherwise, the request will only work if the + watchlist has no entities.) + + Returns: + Deleted watchlist + + Raises: + APIError: If the API request fails + """ + return _delete_watchlist(self, watchlist_id, force) + + def create_watchlist( + self, + name: str, + display_name: str, + multiplying_factor: float, + description: str | None = None, + ) -> dict[str, Any]: + """Create a watchlist + + Args: + name: Name of the watchlist + display_name: Display name of the watchlist + multiplying_factor: Multiplying factor for the watchlist + description: Optional. Description of the watchlist + + Returns: + Created watchlist + + Raises: + APIError: If the API request fails + """ + return _create_watchlist( + self, name, display_name, multiplying_factor, description + ) + + def update_watchlist( + self, + watchlist_id: str, + display_name: str | None = None, + description: str | None = None, + multiplying_factor: float | None = None, + entity_population_mechanism: dict[str, Any] | None = None, + watchlist_user_preferences: dict[str, Any] | None = None, + update_mask: str | None = None, + ) -> dict[str, Any]: + """Update a watchlist. + + Args: + watchlist_id: ID of the watchlist to update. + display_name: Optional. Display name of the watchlist. + Must be 1-63 characters. + description: Optional. Description of the watchlist. + multiplying_factor: Optional. Weight applied to risk score + for entities in this watchlist. Default is 1.0. + entity_population_mechanism: Optional. Mechanism to populate + entities in the watchlist. Example: {"manual": {}}. + watchlist_user_preferences: Optional. User preferences for + watchlist configuration. Example: {"pinned": True}. + update_mask: Optional. Comma-separated list of fields to + update. If not provided, all non-None fields are updated. + + Returns: + Updated watchlist. + + Raises: + APIError: If the API request fails. + """ + return _update_watchlist( + self, + watchlist_id, + display_name, + description, + multiplying_factor, + entity_population_mechanism, + watchlist_user_preferences, + update_mask, + ) + def get_stats( self, query: str, diff --git a/src/secops/chronicle/utils/__init__.py b/src/secops/chronicle/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/secops/chronicle/utils/request_utils.py b/src/secops/chronicle/utils/request_utils.py new file mode 100644 index 0000000..0cfe925 --- /dev/null +++ b/src/secops/chronicle/utils/request_utils.py @@ -0,0 +1,157 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Helper functions for Chronicle.""" + +from typing import Any + +from secops.exceptions import APIError +from secops.chronicle.models import APIVersion + + +DEFAULT_PAGE_SIZE = 1000 + + +def chronicle_paginated_request( + client: "ChronicleClient", + base_url: str, + path: str, + items_key: str, + *, + page_size: int | None = None, + page_token: str | None = None, + extra_params: dict[str, Any] | None = None, +) -> dict[str, list[Any]] | list[Any]: + """Helper to get items from endpoints that use pagination. + + Args: + client: ChronicleClient instance + base_url: The base URL to use, example: + - v1alpha (ChronicleClient.base_url) + - v1 (ChronicleClient.base_v1_url) + path: URL path after {base_url}/{instance_id}/ + items_key: JSON key holding the array of items (e.g., 'curatedRules') + page_size: Maximum number of rules to return per page. + page_token: Token for the next page of results, if available. + extra_params: extra query params to include on every request + + Returns: + Union[Dict[str, List[Any]], List[Any]]: List of items from the + paginated collection. If the API returns a dictionary, it will + return the dictionary. Otherwise, it will return the list of items. + + Raises: + APIError: If the HTTP request fails. + """ + url = f"{base_url}/{client.instance_id}/{path}" + results = [] + next_token = page_token + + while True: + # Build params each loop to prevent stale keys being + # included in the next request + params = {"pageSize": DEFAULT_PAGE_SIZE if not page_size else page_size} + if next_token: + params["pageToken"] = next_token + if extra_params: + # copy to avoid passed dict being mutated + params.update(dict(extra_params)) + + response = client.session.get(url, params=params) + if response.status_code != 200: + raise APIError(f"Failed to list {items_key}: {response.text}") + + data = response.json() + results.extend(data.get(items_key, [])) + + # If caller provided page_size, return only this page + if page_size is not None: + break + + # Otherwise, auto-paginate + next_token = data.get("nextPageToken") + if not next_token: + break + + # Return a list if the API returns a list, otherwise return a dict + if isinstance(data, list): + return results + response = {items_key: results} + + if data.get("nextPageToken"): + response["nextPageToken"] = data.get("nextPageToken") + + return response + + +def chronicle_request( + client: "ChronicleClient", + method: str, + endpoint_path: str, + *, + api_version: str = APIVersion.V1, + params: dict[str, Any] | None = None, + json: dict[str, Any] | None = None, + expected_status: int = 200, + error_message: str | None = None, +) -> dict[str, Any]: + """Perform an HTTP request and return JSON, raising APIError on failure. + + Args: + client: requests.Session (or compatible) instance + method: HTTP method, e.g. 'GET', 'POST', 'PATCH' + endpoint_path: URL path after {base_url}/{instance_id}/ + api_version: API version to use + params: Optional query parameters + json: Optional JSON body + expected_status: Expected HTTP status code (default: 200) + error_message: Optional base error message to include on failure + + Returns: + Parsed JSON response. + + Raises: + APIError: If the request fails, returns a non-JSON body, or status + code does not match expected_status. + """ + url = f"{client.base_url(api_version)}/{client.instance_id}/{endpoint_path}" + response = client.session.request( + method=method, url=url, params=params, json=json + ) + + # Try to parse JSON even on error, so we can get more details + try: + data = response.json() + except ValueError: + data = None + + if response.status_code != expected_status: + base_msg = error_message or "API request failed" + if data is not None: + raise APIError( + f"{base_msg}: status={response.status_code}, response={data}" + ) from None + + raise APIError( + f"{base_msg}: status={response.status_code}," + f" response_text={response.text}" + ) from None + + if data is None: + raise APIError( + f"Expected JSON response from {url}" + f" but got non-JSON body: {response.text}" + ) + + return data diff --git a/src/secops/chronicle/watchlist.py b/src/secops/chronicle/watchlist.py new file mode 100644 index 0000000..ee8327d --- /dev/null +++ b/src/secops/chronicle/watchlist.py @@ -0,0 +1,216 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Watchlist functionality for Chronicle.""" + +from typing import Any + +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import ( + chronicle_paginated_request, + chronicle_request, +) + + +def list_watchlists( + client: "ChronicleClient", + page_size: int | None = None, + page_token: str | None = None, +) -> dict[str, Any]: + """Get a list of watchlists. + + Args: + client: ChronicleClient instance + page_size: Number of results to return per page + page_token: Token for the page to retrieve + + Returns: + List of watchlists + + Raises: + APIError: If the API request fails + """ + return chronicle_paginated_request( + client, + base_url=client.base_url(APIVersion.V1), + path="watchlists", + items_key="watchlists", + page_size=page_size, + page_token=page_token, + ) + + +def get_watchlist( + client: "ChronicleClient", watchlist_id: str +) -> dict[str, Any]: + """Get a watchlist by ID + + Args: + client: ChronicleClient instance + watchlist_id: ID of the watchlist to retrieve + + Returns: + Watchlist + + Raises: + APIError: If the API request fails + """ + return chronicle_request( + client, + method="GET", + endpoint_path=f"watchlists/{watchlist_id}", + api_version=APIVersion.V1, + ) + + +def delete_watchlist( + client: "ChronicleClient", watchlist_id: str, force: bool | None = None +) -> dict[str, Any]: + """Delete a watchlist by ID + + Args: + client: ChronicleClient instance + watchlist_id: ID of the watchlist to delete + force: Optional. If set to true, any entities under this + watchlist will also be deleted. + (Otherwise, the request will only work if the + watchlist has no entities.) + + Returns: + Deleted watchlist + + Raises: + APIError: If the API request fails + """ + params = {} + + if force is not None: + params["force"] = force + + return chronicle_request( + client, + method="DELETE", + endpoint_path=f"watchlists/{watchlist_id}", + api_version=APIVersion.V1, + params=params, + ) + + +def create_watchlist( + client: "ChronicleClient", + name: str, + display_name: str, + multiplying_factor: float, + description: str | None = None, +) -> dict[str, Any]: + """Create a watchlist + + Args: + client: ChronicleClient instance + name: Name of the watchlist + display_name: Display name of the watchlist + multiplying_factor: Multiplying factor for the watchlist + description: Optional. Description of the watchlist + + Returns: + Created watchlist + + Raises: + APIError: If the API request fails + """ + return chronicle_request( + client, + method="POST", + endpoint_path="watchlists", + api_version=APIVersion.V1, + json={ + "name": name, + "displayName": display_name, + "multiplyingFactor": multiplying_factor, + "description": description, + "entityPopulationMechanism": {"manual": {}}, + }, + ) + + +def update_watchlist( + client: "ChronicleClient", + watchlist_id: str, + display_name: str | None = None, + description: str | None = None, + multiplying_factor: float | None = None, + entity_population_mechanism: dict[str, Any] | None = None, + watchlist_user_preferences: dict[str, Any] | None = None, + update_mask: str | None = None, +) -> dict[str, Any]: + """Update a watchlist. + + Args: + client: ChronicleClient instance. + watchlist_id: ID of the watchlist to update. + display_name: Optional. Display name of the watchlist. + Must be 1-63 characters. + description: Optional. Description of the watchlist. + multiplying_factor: Optional. Weight applied to risk score + for entities in this watchlist. Default is 1.0. + entity_population_mechanism: Optional. Mechanism to populate + entities in the watchlist. Example: {"manual": {}}. + watchlist_user_preferences: Optional. User preferences for + watchlist configuration. Example: {"pinned": True}. + update_mask: Optional. Comma-separated list of fields to update. + If not provided, all non-None fields will be updated. + + Returns: + Updated watchlist. + + Raises: + APIError: If the API request fails. + """ + body = {} + mask_fields = [] + + if display_name is not None: + body["displayName"] = display_name + mask_fields.append("display_name") + + if description is not None: + body["description"] = description + mask_fields.append("description") + + if multiplying_factor is not None: + body["multiplyingFactor"] = multiplying_factor + mask_fields.append("multiplying_factor") + + if entity_population_mechanism is not None: + body["entityPopulationMechanism"] = entity_population_mechanism + mask_fields.append("entity_population_mechanism") + + if watchlist_user_preferences is not None: + body["watchlistUserPreferences"] = watchlist_user_preferences + mask_fields.append("watchlist_user_preferences") + + params = {} + if update_mask is not None: + params["updateMask"] = update_mask + elif mask_fields: + params["updateMask"] = ",".join(mask_fields) + + return chronicle_request( + client, + method="PATCH", + endpoint_path=f"watchlists/{watchlist_id}", + api_version=APIVersion.V1, + params=params if params else None, + json=body, + ) diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index f48a3f8..e318512 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -33,6 +33,7 @@ from secops.cli.commands.search import setup_search_command from secops.cli.commands.stats import setup_stats_command from secops.cli.commands.udm_search import setup_udm_search_view_command +from secops.cli.commands.watchlist import setup_watchlist_command from secops.cli.utils.common_args import add_chronicle_args, add_common_args from secops.cli.utils.config_utils import load_config from secops.exceptions import AuthenticationError, SecOpsError @@ -179,6 +180,7 @@ def build_parser() -> argparse.ArgumentParser: setup_help_command(subparsers) setup_dashboard_command(subparsers) setup_dashboard_query_command(subparsers) + setup_watchlist_command(subparsers) return parser diff --git a/src/secops/cli/commands/watchlist.py b/src/secops/cli/commands/watchlist.py new file mode 100644 index 0000000..303baf8 --- /dev/null +++ b/src/secops/cli/commands/watchlist.py @@ -0,0 +1,229 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI watchlist commands""" + +import sys + +from secops.cli.utils.formatters import output_formatter +from secops.cli.utils.common_args import ( + add_time_range_args, + add_pagination_args, +) +from secops.cli.utils.input_utils import load_json_or_file + + +def setup_watchlist_command(subparsers): + """Setup watchlist command""" + watchlist_parser = subparsers.add_parser( + "watchlist", + help="Manage Chronicle watchlists", + ) + lvl1 = watchlist_parser.add_subparsers( + dest="watchlist_command", help="Watchlist command" + ) + + # list command + list_parser = lvl1.add_parser("list", help="List watchlists") + add_time_range_args(list_parser) + add_pagination_args(list_parser) + list_parser.set_defaults(func=handle_watchlist_list_command) + + # get command + get_parser = lvl1.add_parser("get", help="Get watchlist by ID") + get_parser.add_argument( + "--watchlist-id", + type=str, + help="ID of watchlist to get", + dest="watchlist_id", + required=True, + ) + get_parser.set_defaults(func=handle_watchlist_get_command) + + # delete command + delete_parser = lvl1.add_parser("delete", help="Delete watchlist by ID") + delete_parser.add_argument( + "--watchlist-id", + type=str, + help="ID of the watchlist to delete", + dest="watchlist_id", + required=True, + ) + delete_parser.add_argument( + "--force", + action="store_true", + help="Flag to remove entities under watchlist", + ) + delete_parser.set_defaults(func=handle_watchlist_delete_command) + + # create command + create_parser = lvl1.add_parser("create", help="Create watchlist") + create_parser.add_argument( + "--name", type=str, help="Watchlist name", dest="name", required=True + ) + create_parser.add_argument( + "--display-name", + type=str, + help="Watchlist display name", + dest="display_name", + required=True, + ) + create_parser.add_argument( + "--multiplying-factor", + type=float, + help="Watchlist multiplying factor", + dest="multiplying_factor", + required=True, + ) + create_parser.add_argument( + "--description", + type=str, + help="Watchlist description", + dest="description", + required=False, + ) + create_parser.set_defaults(func=handle_watchlist_create_command) + + # update command + update_parser = lvl1.add_parser("update", help="Update watchlist by ID") + update_parser.add_argument( + "--watchlist-id", + type=str, + help="ID of the watchlist to update", + dest="watchlist_id", + required=True, + ) + update_parser.add_argument( + "--display-name", + type=str, + help="New display name for the watchlist", + dest="display_name", + required=False, + ) + update_parser.add_argument( + "--description", + type=str, + help="New description for the watchlist", + dest="description", + required=False, + ) + update_parser.add_argument( + "--multiplying-factor", + type=float, + help="New multiplying factor for the watchlist", + dest="multiplying_factor", + required=False, + ) + update_parser.add_argument( + "--pinned", + type=str, + choices=["true", "false"], + help="Pin or unpin the watchlist on dashboard", + dest="pinned", + required=False, + ) + update_parser.add_argument( + "--entity-population-mechanism", + type=str, + help="Entity population mechanism as JSON string or file path", + dest="entity_population_mechanism", + required=False, + ) + update_parser.add_argument( + "--update-mask", + type=str, + help="Comma-separated list of fields to update", + dest="update_mask", + required=False, + ) + update_parser.set_defaults(func=handle_watchlist_update_command) + + +def handle_watchlist_list_command(args, chronicle): + """List watchlists""" + try: + out = chronicle.list_watchlists( + page_size=getattr(args, "page_size", None), + page_token=getattr(args, "page_token", None), + ) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error listing watchlists: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_watchlist_get_command(args, chronicle): + """Get watchlist by ID""" + try: + out = chronicle.get_watchlist(args.watchlist_id) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error getting watchlist: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_watchlist_delete_command(args, chronicle): + """Delete watchlist by ID""" + try: + out = chronicle.delete_watchlist(args.watchlist_id, args.force) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error deleting watchlist: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_watchlist_create_command(args, chronicle): + """Create watchlist""" + try: + out = chronicle.create_watchlist( + name=args.name, + display_name=args.display_name, + multiplying_factor=args.multiplying_factor, + description=args.description, + ) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error creating watchlist: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_watchlist_update_command(args, chronicle): + """Update watchlist by ID.""" + try: + # Build watchlist_user_preferences if pinned is provided + watchlist_user_preferences = None + if args.pinned is not None: + watchlist_user_preferences = { + "pinned": args.pinned.lower() == "true" + } + + # Parse entity_population_mechanism if provided + entity_population_mechanism = None + epm_value = getattr(args, "entity_population_mechanism", None) + if epm_value is not None: + entity_population_mechanism = load_json_or_file(epm_value) + + out = chronicle.update_watchlist( + watchlist_id=args.watchlist_id, + display_name=getattr(args, "display_name", None), + description=getattr(args, "description", None), + multiplying_factor=getattr(args, "multiplying_factor", None), + entity_population_mechanism=entity_population_mechanism, + watchlist_user_preferences=watchlist_user_preferences, + update_mask=getattr(args, "update_mask", None), + ) + output_formatter(out, getattr(args, "output", "json")) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error updating watchlist: {e}", file=sys.stderr) + sys.exit(1) diff --git a/tests/chronicle/test_watchlist.py b/tests/chronicle/test_watchlist.py new file mode 100644 index 0000000..10e0d64 --- /dev/null +++ b/tests/chronicle/test_watchlist.py @@ -0,0 +1,455 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for Chronicle watchlist functions.""" + +from typing import Any, Dict +from unittest.mock import Mock, patch + +import pytest + +from secops.chronicle.client import ChronicleClient +from secops.chronicle.models import APIVersion +from secops.chronicle.watchlist import ( + list_watchlists, + get_watchlist, + delete_watchlist, + create_watchlist, + update_watchlist, +) +from secops.exceptions import APIError + + +@pytest.fixture +def chronicle_client(): + """Create a Chronicle client for testing.""" + with patch("secops.auth.SecOpsAuth") as mock_auth: + mock_session = Mock() + mock_session.headers = {} + mock_auth.return_value.session = mock_session + return ChronicleClient( + customer_id="test-customer", + project_id="test-project", + default_api_version=APIVersion.V1, + ) + + +@pytest.fixture +def mock_response() -> Mock: + """Create a mock API response object.""" + mock = Mock() + mock.status_code = 200 + mock.json.return_value = {} + return mock + + +@pytest.fixture +def mock_error_response() -> Mock: + """Create a mock error API response object.""" + mock = Mock() + mock.status_code = 400 + mock.text = "Error message" + mock.raise_for_status.side_effect = Exception("API Error") + return mock + + +# -- list_watchlists tests -- + + +def test_list_watchlists_success(chronicle_client): + """Test list_watchlists delegates to chronicle_paginated_request.""" + expected: Dict[str, Any] = { + "watchlists": [ + {"name": "watchlist1"}, + {"name": "watchlist2"}, + ] + } + + with patch( + "secops.chronicle.watchlist.chronicle_paginated_request", + return_value=expected, + ) as mock_paginated: + result = list_watchlists( + chronicle_client, + page_size=10, + page_token="next-token", + ) + + assert result == expected + + mock_paginated.assert_called_once_with( + chronicle_client, + base_url=chronicle_client.base_url(APIVersion.V1), + path="watchlists", + items_key="watchlists", + page_size=10, + page_token="next-token", + ) + + +def test_list_watchlists_default_args(chronicle_client): + """Test list_watchlists with default pagination args.""" + expected: Dict[str, Any] = {"watchlists": []} + + with patch( + "secops.chronicle.watchlist.chronicle_paginated_request", + return_value=expected, + ) as mock_paginated: + result = list_watchlists(chronicle_client) + + assert result == expected + + mock_paginated.assert_called_once_with( + chronicle_client, + base_url=chronicle_client.base_url(APIVersion.V1), + path="watchlists", + items_key="watchlists", + page_size=None, + page_token=None, + ) + + +def test_list_watchlists_error(chronicle_client): + """Test list_watchlists propagates APIError from helper.""" + with patch( + "secops.chronicle.watchlist.chronicle_paginated_request", + side_effect=APIError("Failed to list watchlists"), + ): + with pytest.raises(APIError) as exc_info: + list_watchlists(chronicle_client) + + assert "Failed to list watchlists" in str(exc_info.value) + + +# -- get_watchlist tests -- + + +def test_get_watchlist_success(chronicle_client): + """Test get_watchlist returns expected result.""" + expected = { + "name": "test-watchlist-id", + "displayName": "test-watchlist", + "multiplyingFactor": 1, + "description": "test-description", + } + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = get_watchlist(chronicle_client, "test-watchlist-id") + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="GET", + endpoint_path="watchlists/test-watchlist-id", + api_version=APIVersion.V1, + ) + + +def test_get_watchlist_error(chronicle_client): + """Test get_watchlist raises APIError on error.""" + with patch( + "secops.chronicle.watchlist.chronicle_request", + side_effect=APIError("Failed to get watchlist test-watchlist-id"), + ): + with pytest.raises(APIError) as exc_info: + get_watchlist(chronicle_client, "test-watchlist-id") + + assert "Failed to get watchlist" in str(exc_info.value) + + +# -- delete_watchlist tests -- + + +def test_delete_watchlist_success(chronicle_client): + """Test delete_watchlist calls helper and returns response JSON.""" + expected: Dict[str, Any] = {} + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = delete_watchlist(chronicle_client, "watchlist-123") + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="DELETE", + endpoint_path="watchlists/watchlist-123", + api_version=APIVersion.V1, + params={}, + ) + + +def test_delete_watchlist_force_true(chronicle_client): + """Test delete_watchlist with force=True.""" + expected: Dict[str, Any] = {} + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = delete_watchlist( + chronicle_client, + "watchlist-123", + force=True, + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="DELETE", + endpoint_path="watchlists/watchlist-123", + api_version=APIVersion.V1, + params={"force": True}, + ) + + +# -- create_watchlist tests -- + + +def test_create_watchlist_success(chronicle_client): + """Test create_watchlist calls helper and returns response JSON.""" + expected = { + "name": "watchlist-123", + "displayName": "My Watchlist", + "multiplyingFactor": 1.5, + "description": "Test description", + } + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = create_watchlist( + chronicle_client, + name="watchlist-123", + display_name="My Watchlist", + multiplying_factor=1.5, + description="Test description", + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="POST", + endpoint_path="watchlists", + api_version=APIVersion.V1, + json={ + "name": "watchlist-123", + "displayName": "My Watchlist", + "multiplyingFactor": 1.5, + "description": "Test description", + "entityPopulationMechanism": {"manual": {}}, + }, + ) + + +def test_create_watchlist_without_description(chronicle_client): + """Test create_watchlist when description is None.""" + expected = { + "name": "watchlist-123", + "displayName": "My Watchlist", + "multiplyingFactor": 2.0, + "description": None, + } + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = create_watchlist( + chronicle_client, + name="watchlist-123", + display_name="My Watchlist", + multiplying_factor=2.0, + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="POST", + endpoint_path="watchlists", + api_version=APIVersion.V1, + json={ + "name": "watchlist-123", + "displayName": "My Watchlist", + "multiplyingFactor": 2.0, + "description": None, + "entityPopulationMechanism": {"manual": {}}, + }, + ) + + +# -- update_watchlist tests -- + + +def test_update_watchlist_success_all_fields(chronicle_client): + """Test update_watchlist with all fields provided.""" + expected = { + "name": "watchlist-123", + "displayName": "Updated Watchlist", + "description": "Updated description", + "multiplyingFactor": 2.5, + "entityPopulationMechanism": {"manual": {}}, + "watchlistUserPreferences": {"pinned": True}, + } + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = update_watchlist( + chronicle_client, + watchlist_id="watchlist-123", + display_name="Updated Watchlist", + description="Updated description", + multiplying_factor=2.5, + entity_population_mechanism={"manual": {}}, + watchlist_user_preferences={"pinned": True}, + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="PATCH", + endpoint_path="watchlists/watchlist-123", + api_version=APIVersion.V1, + params={ + "updateMask": ( + "display_name,description,multiplying_factor," + "entity_population_mechanism,watchlist_user_preferences" + ) + }, + json={ + "displayName": "Updated Watchlist", + "description": "Updated description", + "multiplyingFactor": 2.5, + "entityPopulationMechanism": {"manual": {}}, + "watchlistUserPreferences": {"pinned": True}, + }, + ) + + +def test_update_watchlist_single_field(chronicle_client): + """Test update_watchlist with only display_name.""" + expected = { + "name": "watchlist-123", + "displayName": "New Name", + } + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = update_watchlist( + chronicle_client, + watchlist_id="watchlist-123", + display_name="New Name", + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="PATCH", + endpoint_path="watchlists/watchlist-123", + api_version=APIVersion.V1, + params={"updateMask": "display_name"}, + json={"displayName": "New Name"}, + ) + + +def test_update_watchlist_explicit_update_mask(chronicle_client): + """Test update_watchlist with explicit update_mask overrides auto-mask.""" + expected = { + "name": "watchlist-123", + "displayName": "Updated Name", + "description": "Updated desc", + } + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = update_watchlist( + chronicle_client, + watchlist_id="watchlist-123", + display_name="Updated Name", + description="Updated desc", + update_mask="display_name", + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="PATCH", + endpoint_path="watchlists/watchlist-123", + api_version=APIVersion.V1, + params={"updateMask": "display_name"}, + json={ + "displayName": "Updated Name", + "description": "Updated desc", + }, + ) + + +def test_update_watchlist_no_fields(chronicle_client): + """Test update_watchlist with no optional fields (edge case).""" + expected = {"name": "watchlist-123"} + + with patch( + "secops.chronicle.watchlist.chronicle_request", + return_value=expected, + ) as mock_request: + result = update_watchlist( + chronicle_client, + watchlist_id="watchlist-123", + ) + + assert result == expected + + mock_request.assert_called_once_with( + chronicle_client, + method="PATCH", + endpoint_path="watchlists/watchlist-123", + api_version=APIVersion.V1, + params=None, + json={}, + ) + + +def test_update_watchlist_error(chronicle_client): + """Test update_watchlist raises APIError on failure.""" + with patch( + "secops.chronicle.watchlist.chronicle_request", + side_effect=APIError("Failed to update watchlist watchlist-123"), + ): + with pytest.raises(APIError) as exc_info: + update_watchlist( + chronicle_client, + watchlist_id="watchlist-123", + display_name="New Name", + ) + + assert "Failed to update watchlist" in str(exc_info.value) diff --git a/tests/chronicle/test_watchlist_integration.py b/tests/chronicle/test_watchlist_integration.py new file mode 100644 index 0000000..01fbbaf --- /dev/null +++ b/tests/chronicle/test_watchlist_integration.py @@ -0,0 +1,172 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle watchlist.""" +import pytest +from datetime import datetime, timezone +from secops import SecOpsClient +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON + + +@pytest.fixture(scope="module") +def chronicle(): + """Fixture to create a Chronicle client.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + return client.chronicle(**CHRONICLE_CONFIG) + + +@pytest.mark.integration +def test_watchlist_crud_workflow(chronicle): + """Test complete watchlist CRUD workflow including update.""" + + ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + watchlist_name = f"secops-test-watchlist-{ts}" + display_name = f"SecOps Test Watchlist {ts}" + description = ( + "Integration test watchlist - created by test_watchlist_integration.py" + ) + multiplying_factor = 1.0 + + created_watchlist = None + watchlist_id = None + + try: + # 1. Create watchlist + print("\n1. Creating watchlist...") + created_watchlist = chronicle.create_watchlist( + name=watchlist_name, + display_name=display_name, + multiplying_factor=multiplying_factor, + description=description, + ) + + assert isinstance(created_watchlist, dict) + assert "name" in created_watchlist + assert created_watchlist.get("displayName") == display_name + assert created_watchlist.get("description") == description + + watchlist_id = created_watchlist["name"].split("/")[-1] + print(f" Created watchlist: {display_name} (ID: {watchlist_id})") + + # 2. Get watchlist + print("\n2. Getting watchlist...") + fetched_watchlist = chronicle.get_watchlist(watchlist_id) + + assert isinstance(fetched_watchlist, dict) + assert fetched_watchlist.get("name") == created_watchlist["name"] + assert fetched_watchlist.get("displayName") == display_name + print(f" Fetched watchlist: {fetched_watchlist.get('displayName')}") + + # 3. Update watchlist - change display_name, description, multiplying_factor + print("\n3. Updating watchlist fields...") + updated_display_name = f"Updated Watchlist {ts}" + updated_description = "Updated description - integration test" + updated_multiplying_factor = 2.5 + + updated_watchlist = chronicle.update_watchlist( + watchlist_id=watchlist_id, + display_name=updated_display_name, + description=updated_description, + multiplying_factor=updated_multiplying_factor, + ) + + assert isinstance(updated_watchlist, dict) + assert updated_watchlist.get("displayName") == updated_display_name + assert updated_watchlist.get("description") == updated_description + assert ( + updated_watchlist.get("multiplyingFactor") + == updated_multiplying_factor + ) + print( + f" Updated display_name: {updated_watchlist.get('displayName')}" + ) + print(f" Updated description: {updated_watchlist.get('description')}") + print( + f" Updated multiplying_factor: " + f"{updated_watchlist.get('multiplyingFactor')}" + ) + + # 4. Update watchlist user preferences (pinned) + print("\n4. Updating watchlist user preferences (pinned=True)...") + pinned_watchlist = chronicle.update_watchlist( + watchlist_id=watchlist_id, + watchlist_user_preferences={"pinned": True}, + ) + + assert isinstance(pinned_watchlist, dict) + user_prefs = pinned_watchlist.get("watchlistUserPreferences", {}) + assert user_prefs.get("pinned") is True + print(f" Pinned: {user_prefs.get('pinned')}") + + # 5. List watchlists and verify our watchlist is present + print("\n5. Listing watchlists...") + watchlists_response = chronicle.list_watchlists(page_size=100) + + assert isinstance(watchlists_response, dict) + watchlists = watchlists_response.get("watchlists", []) + watchlist_names = [w.get("name") for w in watchlists] + assert created_watchlist["name"] in watchlist_names + print( + f" Found {len(watchlists)} watchlists, " + f"verified test watchlist is present" + ) + + # 6. Delete watchlist (cleanup) + print("\n6. Deleting watchlist...") + delete_result = chronicle.delete_watchlist(watchlist_id) + + assert isinstance(delete_result, dict) + print(f" Successfully deleted watchlist {watchlist_id}") + + # Verify deletion + print("\n7. Verifying deletion...") + try: + chronicle.get_watchlist(watchlist_id) + pytest.fail("Watchlist should have been deleted") + except Exception: + print(" Watchlist successfully deleted (get returned error)") + + except Exception as e: + # Cleanup on failure + if watchlist_id: + try: + print( + f"\nCleanup: Attempting to delete watchlist {watchlist_id}" + ) + chronicle.delete_watchlist(watchlist_id, force=True) + print("Cleanup: Successfully deleted watchlist") + except Exception as cleanup_error: + print(f"Cleanup failed: {cleanup_error}") + raise e + + +@pytest.mark.integration +def test_watchlist_list(chronicle): + """Test listing watchlists with pagination.""" + print("\nTesting watchlist list with pagination...") + + # List with small page size + result = chronicle.list_watchlists(page_size=1) + + assert isinstance(result, dict) + watchlists = result.get("watchlists", []) + assert isinstance(watchlists, list) + print(f"Listed {len(watchlists)} watchlist(s) with page_size=1") + + # If there's more data, verify pagination token exists + if len(watchlists) == 1: + # List all to check total count + all_result = chronicle.list_watchlists() + all_watchlists = all_result.get("watchlists", []) + print(f"Total watchlists available: {len(all_watchlists)}") diff --git a/tests/cli/test_watchlist_cli_integration.py b/tests/cli/test_watchlist_cli_integration.py new file mode 100644 index 0000000..d63af97 --- /dev/null +++ b/tests/cli/test_watchlist_cli_integration.py @@ -0,0 +1,277 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""CLI Integration tests for watchlist functionality in Chronicle. + +These tests require valid credentials and API access. +""" + +import json +import subprocess +from datetime import datetime, timezone + +import pytest + + +@pytest.mark.integration +def test_cli_watchlist_list_and_get(cli_env, common_args): + """Test CLI commands for listing and getting watchlists. + + Args: + cli_env: Environment variables for CLI execution. + common_args: Common CLI arguments. + """ + print("\nTesting watchlist list and get commands") + + # 1. List watchlists + print("1. Listing watchlists") + list_cmd = ["secops"] + common_args + ["watchlist", "list"] + + list_result = subprocess.run( + list_cmd, + env=cli_env, + capture_output=True, + text=True, + ) + + # Ensure command succeeded + assert list_result.returncode == 0, f"Command failed: {list_result.stderr}" + + # Parse output + data = json.loads(list_result.stdout) + assert isinstance(data, dict), "Expected dict response from watchlist list" + assert "watchlists" in data, "Missing 'watchlists' key in response" + + watchlists = data["watchlists"] + assert isinstance(watchlists, list), "Expected 'watchlists' to be a list" + assert len(watchlists) > 0, "Expected at least one watchlist" + + first_watchlist = watchlists[0] + assert "name" in first_watchlist, "Missing 'name' in watchlist" + assert ( + "displayName" in first_watchlist + ), "Missing 'displayName' in watchlist" + + # Extract watchlist ID (name is a resource path, ID is last component) + watchlist_name = first_watchlist["name"] + watchlist_id = watchlist_name.split("/")[-1] + display_name = first_watchlist["displayName"] + + print(f"Found watchlist: {display_name} (ID: {watchlist_id})") + + # 2. Get specific watchlist by ID + print("\n2. Getting specific watchlist by ID") + get_cmd = ( + ["secops"] + + common_args + + [ + "watchlist", + "get", + "--watchlist-id", + watchlist_id, + ] + ) + + get_result = subprocess.run( + get_cmd, + env=cli_env, + capture_output=True, + text=True, + ) + + assert get_result.returncode == 0, f"Command failed: {get_result.stderr}" + + watchlist_data = json.loads(get_result.stdout) + assert isinstance( + watchlist_data, dict + ), "Expected dict response from watchlist get" + assert ( + watchlist_data.get("name") == watchlist_name + ), "Watchlist name doesn't match" + assert ( + watchlist_data.get("displayName") == display_name + ), "Watchlist display name doesn't match" + + +@pytest.mark.integration +def test_cli_watchlist_create_update_delete(cli_env, common_args): + """Test CLI commands for creating, updating, and deleting a watchlist. + + Args: + cli_env: Environment variables for CLI execution. + common_args: Common CLI arguments. + """ + print("\nTesting watchlist create, update, and delete commands") + + # Use a timestamped name to avoid collisions + ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + watchlist_name = f"secops-test-watchlist-{ts}" + display_name = f"SecOps Test Watchlist {ts}" + multiplying_factor = 1.5 + description = "Integration test watchlist" + + # 1. Create watchlist + print("1. Creating watchlist") + create_cmd = ( + ["secops"] + + common_args + + [ + "watchlist", + "create", + "--name", + watchlist_name, + "--display-name", + display_name, + "--multiplying-factor", + str(multiplying_factor), + "--description", + description, + ] + ) + + create_result = subprocess.run( + create_cmd, + env=cli_env, + capture_output=True, + text=True, + ) + + assert ( + create_result.returncode == 0 + ), f"Create failed: {create_result.stderr}" + + created_data = json.loads(create_result.stdout) + assert isinstance(created_data, dict), "Expected dict response" + assert created_data.get("name"), "Missing 'name' in created watchlist" + assert ( + created_data.get("displayName") == display_name + ), "Created watchlist display name mismatch" + + created_name = created_data["name"] + created_id = created_name.split("/")[-1] + print(f"Created watchlist: {display_name} (ID: {created_id})") + + # 2. Update watchlist + print("\n2. Updating watchlist") + updated_display_name = f"Updated Watchlist {ts}" + updated_multiplying_factor = 2.5 + updated_description = "Updated integration test watchlist" + + update_cmd = ( + ["secops"] + + common_args + + [ + "watchlist", + "update", + "--watchlist-id", + created_id, + "--display-name", + updated_display_name, + "--multiplying-factor", + str(updated_multiplying_factor), + "--description", + updated_description, + "--pinned", + "true", + ] + ) + + update_result = subprocess.run( + update_cmd, + env=cli_env, + capture_output=True, + text=True, + ) + + assert ( + update_result.returncode == 0 + ), f"Update failed: {update_result.stderr}" + + update_data = json.loads(update_result.stdout) + assert isinstance(update_data, dict), "Expected dict response" + assert ( + update_data.get("displayName") == updated_display_name + ), "Updated display name mismatch" + assert ( + update_data.get("multiplyingFactor") == updated_multiplying_factor + ), "Updated multiplying factor mismatch" + user_prefs = update_data.get("watchlistUserPreferences", {}) + assert user_prefs.get("pinned") is True, "Watchlist should be pinned" + print(f"Updated watchlist: {updated_display_name}") + + # 3. Verify updates via get command + print("\n3. Verifying updates via get command") + get_cmd = ( + ["secops"] + + common_args + + [ + "watchlist", + "get", + "--watchlist-id", + created_id, + ] + ) + + get_result = subprocess.run( + get_cmd, + env=cli_env, + capture_output=True, + text=True, + ) + + assert get_result.returncode == 0, f"Get failed: {get_result.stderr}" + + get_data = json.loads(get_result.stdout) + assert get_data.get("name") == created_name, "Get watchlist name mismatch" + assert ( + get_data.get("displayName") == updated_display_name + ), "Get watchlist display name mismatch" + print("Verified updates successfully") + + # 4. Delete created watchlist (cleanup) + print("\n4. Deleting created watchlist") + delete_cmd = ( + ["secops"] + + common_args + + [ + "watchlist", + "delete", + "--watchlist-id", + created_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, + env=cli_env, + capture_output=True, + text=True, + ) + + assert ( + delete_result.returncode == 0 + ), f"Delete failed: {delete_result.stderr}" + + if delete_result.stdout.strip(): + delete_data = json.loads(delete_result.stdout) + assert isinstance( + delete_data, dict + ), "Expected dict or empty response from delete" + + print(f"Successfully deleted watchlist {created_id}") + + +if __name__ == "__main__": + # Allow running directly + pytest.main(["-v", __file__, "-m", "integration"])