From a6d7e2aa98fb403d1f21d77937cf2d24b2f3548f Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:18:11 +0530 Subject: [PATCH 01/10] feat: added support for log processing pipeline methods --- examples/log_processing_pipelines_example.py | 404 ++++++++++++++++++ src/secops/chronicle/__init__.py | 23 + src/secops/chronicle/client.py | 216 ++++++++++ .../chronicle/log_processing_pipelines.py | 370 ++++++++++++++++ 4 files changed, 1013 insertions(+) create mode 100644 examples/log_processing_pipelines_example.py create mode 100644 src/secops/chronicle/log_processing_pipelines.py diff --git a/examples/log_processing_pipelines_example.py b/examples/log_processing_pipelines_example.py new file mode 100644 index 00000000..9a6a1c02 --- /dev/null +++ b/examples/log_processing_pipelines_example.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python3 +"""Example usage of the Google SecOps SDK for Log Processing Pipelines.""" + +import argparse +import json +import time +import uuid + +from secops import SecOpsClient + + +def get_client(project_id, customer_id, region): + """Initialize and return the Chronicle client. + + Args: + project_id: Google Cloud Project ID. + customer_id: Chronicle Customer ID (UUID). + region: Chronicle region (us or eu). + + Returns: + Chronicle client instance. + """ + client = SecOpsClient() + chronicle = client.chronicle( + customer_id=customer_id, project_id=project_id, region=region + ) + return chronicle + + +def example_list_pipelines(chronicle): + """Example 1: List Log Processing Pipelines.""" + print("\n=== Example 1: List Log Processing Pipelines ===") + + try: + # List all pipelines + response = chronicle.list_log_processing_pipelines() + pipelines = response.get("logProcessingPipelines", []) + + print(f"\nFound {len(pipelines)} pipeline(s)") + + if pipelines: + print("\nSample pipeline details:") + sample_pipeline = pipelines[0] + print(f"Name: {sample_pipeline.get('name')}") + print(f"Display Name: {sample_pipeline.get('displayName')}") + print(f"Description: {sample_pipeline.get('description', 'N/A')}") + + # Extract pipeline ID from the name + pipeline_id = sample_pipeline.get("name", "").split("/")[-1] + print(f"Pipeline ID: {pipeline_id}") + + # Print processor count + processors = sample_pipeline.get("processors", []) + print(f"Number of processors: {len(processors)}") + else: + print("No pipelines found in your Chronicle instance.") + + except Exception as e: + print(f"Error listing pipelines: {e}") + + +def example_create_and_get_pipeline(chronicle): + """Example 2: Create and Get Pipeline.""" + print("\n=== Example 2: Create and Get Pipeline ===") + + # Generate unique pipeline name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Define a simple filter processor pipeline + pipeline_config = { + "displayName": display_name, + "description": "Example pipeline created by SDK", + "processors": [ + { + "filterProcessor": { + "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, + "errorMode": "ERROR_MODE_UNSPECIFIED", + } + } + ], + "customMetadata": [ + {"key": "environment", "value": "test"}, + {"key": "created_by", "value": "sdk_example"}, + ], + } + + created_pipeline = None + + try: + # Create the pipeline + print(f"\nCreating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + # Extract pipeline ID from the name + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + + print(f"Pipeline created successfully!") + print(f"Pipeline ID: {pipeline_id}") + print(f"Display Name: {created_pipeline.get('displayName')}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Get the pipeline to verify it was created + print(f"\nRetrieving pipeline details for ID: {pipeline_id}") + retrieved_pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + + print("Pipeline details retrieved:") + print(f"Name: {retrieved_pipeline.get('name')}") + print(f"Display Name: {retrieved_pipeline.get('displayName')}") + print(f"Description: {retrieved_pipeline.get('description', 'N/A')}") + + except Exception as e: + print(f"Error creating or getting pipeline: {e}") + + finally: + # Clean up: delete the pipeline if it was created + if created_pipeline: + try: + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"\nCleaning up: Deleting pipeline ID: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except Exception as e: + print(f"Warning: Failed to delete test pipeline: {e}") + + +def example_update_pipeline(chronicle): + """Example 3: Update (Patch) Pipeline.""" + print("\n=== Example 3: Update Pipeline ===") + + # Generate unique pipeline name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Initial pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Original description", + "processors": [ + { + "filterProcessor": { + "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, + "errorMode": "ERROR_MODE_UNSPECIFIED", + } + } + ], + } + + created_pipeline = None + + try: + # Create the pipeline + print(f"\nCreating pipeline to update: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"Pipeline created with ID: {pipeline_id}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Update the pipeline with new display name and description + updated_pipeline_config = { + "name": created_pipeline.get("name"), + "displayName": f"Updated {display_name}", + "description": "Updated description via SDK", + } + + print("\nUpdating pipeline...") + updated_pipeline = chronicle.patch_log_processing_pipeline( + pipeline_id=pipeline_id, + pipeline=updated_pipeline_config, + update_mask="displayName,description", + ) + + print("Pipeline updated successfully!") + print(f"New Display Name: {updated_pipeline.get('displayName')}") + print(f"New Description: {updated_pipeline.get('description', 'N/A')}") + + except Exception as e: + print(f"Error updating pipeline: {e}") + + finally: + # Clean up: delete the pipeline if it was created + if created_pipeline: + try: + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"\nCleaning up: Deleting pipeline ID: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except Exception as e: + print(f"Warning: Failed to delete test pipeline: {e}") + + +def example_stream_association(chronicle): + """Example 4: Associate and Dissociate Streams.""" + print("\n=== Example 4: Associate and Dissociate Streams ===") + + # Generate unique pipeline name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Pipeline for stream association example", + "processors": [ + { + "filterProcessor": { + "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, + "errorMode": "ERROR_MODE_UNSPECIFIED", + } + } + ], + } + + created_pipeline = None + + try: + # Create the pipeline + print(f"\nCreating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"Pipeline created with ID: {pipeline_id}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Define streams to associate + # Note: Replace with actual log type or feed ID from environment + streams = [{"logType": "WINEVTLOG"}] + + print("\nAssociating streams with pipeline...") + print(f"Streams: {json.dumps(streams, indent=2)}") + + chronicle.associate_streams(pipeline_id=pipeline_id, streams=streams) + print("Streams associated successfully!") + + # Wait a moment + time.sleep(2) + + # Dissociate the streams + print("\nDissociating streams from pipeline...") + chronicle.dissociate_streams(pipeline_id=pipeline_id, streams=streams) + print("Streams dissociated successfully!") + + except Exception as e: + print(f"Error in stream association operations: {e}") + print( + "Note: Make sure the log type or feed ID exists " + "in your environment." + ) + + finally: + # Clean up: delete the pipeline if it was created + if created_pipeline: + try: + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"\nCleaning up: Deleting pipeline ID: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except Exception as e: + print(f"Warning: Failed to delete test pipeline: {e}") + + +def example_test_pipeline(chronicle): + """Example 5: Test Pipeline with Sample Logs.""" + print("\n=== Example 5: Test Pipeline ===") + + # Define a pipeline configuration to test + pipeline_config = { + "displayName": "Test Pipeline (Not Created)", + "processors": [ + { + "filterProcessor": { + "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, + "errorMode": "ERROR_MODE_UNSPECIFIED", + } + } + ], + } + + # Sample input logs + input_logs = [ + {"logText": "Sample log entry 1"}, + {"logText": "Sample log entry 2"}, + ] + + try: + print("\nTesting pipeline configuration...") + print(f"Pipeline: {pipeline_config['displayName']}") + print(f"Number of input logs: {len(input_logs)}") + + result = chronicle.test_pipeline( + pipeline=pipeline_config, input_logs=input_logs + ) + + processed_logs = result.get("logs", []) + print(f"\nProcessed {len(processed_logs)} log(s)") + + if processed_logs: + print("\nFirst processed log:") + print(json.dumps(processed_logs[0], indent=2)) + + except Exception as e: + print(f"Error testing pipeline: {e}") + print( + "Note: This example uses simplified log structure. " + "Actual logs may need more fields." + ) + + +def example_fetch_associated_pipeline(chronicle): + """Example 6: Fetch Pipeline Associated with a Stream.""" + print("\n=== Example 6: Fetch Associated Pipeline ===") + + # Define a stream to query + # Note: Replace with actual log type or feed ID from your environment + stream = {"logType": "WINEVTLOG"} + + try: + print(f"\nFetching pipeline for stream: {json.dumps(stream)}") + result = chronicle.fetch_associated_pipeline(stream=stream) + + if result: + print("\nAssociated pipeline found:") + print(f"Name: {result.get('name')}") + print(f"Display Name: {result.get('displayName')}") + print(f"Description: {result.get('description', 'N/A')}") + else: + print("No pipeline associated with this stream.") + + except Exception as e: + print(f"Error fetching associated pipeline: {e}") + print( + "Note: Make sure the stream exists and has an " + "associated pipeline." + ) + + +# Map of example functions +EXAMPLES = { + "1": example_list_pipelines, + "2": example_create_and_get_pipeline, + "3": example_update_pipeline, + "4": example_stream_association, + "5": example_test_pipeline, + "6": example_fetch_associated_pipeline, +} + + +def main(): + """Main function to run examples.""" + parser = argparse.ArgumentParser( + description="Run Chronicle Log Processing Pipeline examples" + ) + parser.add_argument( + "--project_id", required=True, help="Google Cloud Project ID" + ) + parser.add_argument( + "--customer_id", required=True, help="Chronicle Customer ID (UUID)" + ) + parser.add_argument( + "--region", default="us", help="Chronicle region (us or eu)" + ) + parser.add_argument( + "--example", + "-e", + help=( + "Example number to run (1-6). " + "If not specified, runs all examples." + ), + ) + + args = parser.parse_args() + + # Initialize the client + chronicle = get_client(args.project_id, args.customer_id, args.region) + + if args.example: + if args.example not in EXAMPLES: + print( + f"Invalid example number. Available examples: " + f"{', '.join(EXAMPLES.keys())}" + ) + return + EXAMPLES[args.example](chronicle) + else: + # Run all examples in order + for example_num in sorted(EXAMPLES.keys()): + EXAMPLES[example_num](chronicle) + + +if __name__ == "__main__": + main() diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 403640df..560211f7 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -78,6 +78,18 @@ is_valid_log_type, search_log_types, ) +from secops.chronicle.log_processing_pipelines import ( + associate_streams, + create_log_processing_pipeline, + delete_log_processing_pipeline, + dissociate_streams, + fetch_associated_pipeline, + fetch_sample_logs_by_streams, + get_log_processing_pipeline, + list_log_processing_pipelines, + patch_log_processing_pipeline, + test_pipeline, +) from secops.chronicle.models import ( AlertCount, AlertState, @@ -304,4 +316,15 @@ "update_data_table", "update_data_table_rows", "replace_data_table_rows", + # Log Processing Pipelines + "list_log_processing_pipelines", + "get_log_processing_pipeline", + "create_log_processing_pipeline", + "patch_log_processing_pipeline", + "delete_log_processing_pipeline", + "associate_streams", + "dissociate_streams", + "fetch_associated_pipeline", + "fetch_sample_logs_by_streams", + "test_pipeline", ] diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index a6910e21..0a1412af 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -113,6 +113,36 @@ ) from secops.chronicle.log_types import is_valid_log_type as _is_valid_log_type from secops.chronicle.log_types import search_log_types as _search_log_types +from secops.chronicle.log_processing_pipelines import ( + associate_streams as _associate_streams, +) +from secops.chronicle.log_processing_pipelines import ( + create_log_processing_pipeline as _create_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + delete_log_processing_pipeline as _delete_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + dissociate_streams as _dissociate_streams, +) +from secops.chronicle.log_processing_pipelines import ( + fetch_associated_pipeline as _fetch_associated_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + fetch_sample_logs_by_streams as _fetch_sample_logs_by_streams, +) +from secops.chronicle.log_processing_pipelines import ( + get_log_processing_pipeline as _get_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + list_log_processing_pipelines as _list_log_processing_pipelines, +) +from secops.chronicle.log_processing_pipelines import ( + patch_log_processing_pipeline as _patch_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + test_pipeline as _test_pipeline, +) from secops.chronicle.models import ( CaseList, DashboardChart, @@ -1109,6 +1139,192 @@ def generate_secret(self, feed_id: str) -> Dict[str, Any]: def delete_feed(self, feed_id: str) -> Dict[str, Any]: return _delete_feed(self, feed_id) + # Log Processing Pipeline methods + + def list_log_processing_pipelines( + self, + page_size: Optional[int] = None, + page_token: Optional[str] = None, + filter_expr: Optional[str] = None, + ) -> Dict[str, Any]: + """Lists log processing pipelines. + + Args: + page_size: Maximum number of pipelines to return. + page_token: Page token for pagination. + filter_expr: Filter expression to restrict results. + + Returns: + Dictionary containing pipelines and pagination info. + + Raises: + APIError: If the API request fails. + """ + return _list_log_processing_pipelines( + self, page_size, page_token, filter_expr + ) + + def get_log_processing_pipeline(self, pipeline_id: str) -> Dict[str, Any]: + """Gets a log processing pipeline by ID. + + Args: + pipeline_id: ID of the pipeline to retrieve. + + Returns: + Dictionary containing pipeline information. + + Raises: + APIError: If the API request fails. + """ + return _get_log_processing_pipeline(self, pipeline_id) + + def create_log_processing_pipeline( + self, + pipeline: Dict[str, Any], + pipeline_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Creates a new log processing pipeline. + + Args: + pipeline: Pipeline configuration dict. + pipeline_id: Optional ID for the pipeline. + + Returns: + Dictionary containing the created pipeline. + + Raises: + APIError: If the API request fails. + """ + return _create_log_processing_pipeline(self, pipeline, pipeline_id) + + def patch_log_processing_pipeline( + self, + pipeline_id: str, + pipeline: Dict[str, Any], + update_mask: Optional[str] = None, + ) -> Dict[str, Any]: + """Updates a log processing pipeline. + + Args: + pipeline_id: ID of the pipeline to update. + pipeline: Pipeline configuration with fields to update. + update_mask: Optional comma-separated list of fields. + + Returns: + Dictionary containing the updated pipeline. + + Raises: + APIError: If the API request fails. + """ + return _patch_log_processing_pipeline( + self, pipeline_id, pipeline, update_mask + ) + + def delete_log_processing_pipeline( + self, pipeline_id: str, etag: Optional[str] = None + ) -> Dict[str, Any]: + """Deletes a log processing pipeline. + + Args: + pipeline_id: ID of the pipeline to delete. + etag: Optional etag for optimistic concurrency control. + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + return _delete_log_processing_pipeline(self, pipeline_id, etag) + + def associate_streams( + self, pipeline_id: str, streams: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Associates streams with a pipeline. + + Args: + pipeline_id: ID of the pipeline. + streams: List of stream dicts. + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + return _associate_streams(self, pipeline_id, streams) + + def dissociate_streams( + self, pipeline_id: str, streams: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Dissociates streams from a pipeline. + + Args: + pipeline_id: ID of the pipeline. + streams: List of stream dicts. + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + return _dissociate_streams(self, pipeline_id, streams) + + def fetch_associated_pipeline( + self, stream: Dict[str, Any] + ) -> Dict[str, Any]: + """Fetches the pipeline associated with a stream. + + Args: + stream: Stream dict (logType or feedId). + + Returns: + Dictionary containing the associated pipeline. + + Raises: + APIError: If the API request fails. + """ + return _fetch_associated_pipeline(self, stream) + + def fetch_sample_logs_by_streams( + self, + streams: List[Dict[str, Any]], + sample_logs_count: Optional[int] = None, + ) -> Dict[str, Any]: + """Fetches sample logs for specified streams. + + Args: + streams: List of stream dicts. + sample_logs_count: Number of sample logs per stream. + + Returns: + Dictionary containing sample logs. + + Raises: + APIError: If the API request fails. + """ + return _fetch_sample_logs_by_streams(self, streams, sample_logs_count) + + def test_pipeline( + self, + pipeline: Dict[str, Any], + input_logs: List[Dict[str, Any]], + ) -> Dict[str, Any]: + """Tests a pipeline with input logs. + + Args: + pipeline: Pipeline configuration to test. + input_logs: List of log objects to process. + + Returns: + Dictionary containing processed logs. + + Raises: + APIError: If the API request fails. + """ + return _test_pipeline(self, pipeline, input_logs) + def list_rules( self, view: Optional[str] = "FULL", diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py new file mode 100644 index 00000000..b3ce3af0 --- /dev/null +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -0,0 +1,370 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Provides log processing pipeline management for Chronicle.""" + +from typing import Any, Dict, List, Optional + +from secops.exceptions import APIError + + +def list_log_processing_pipelines( + client, + page_size: Optional[int] = None, + page_token: Optional[str] = None, + filter_expr: Optional[str] = None, +) -> Dict[str, Any]: + """Lists log processing pipelines. + + Args: + client: ChronicleClient instance. + page_size: Maximum number of pipelines to return. If not + specified, server determines the number. + page_token: Page token from a previous list call to retrieve + the next page. + filter_expr: Filter expression (AIP-160) to restrict results. + + Returns: + Dictionary containing: + - logProcessingPipelines: List of pipeline dicts + - nextPageToken: Token for next page (if more results exist) + + Raises: + APIError: If the API request fails. + """ + url = f"{client.base_url}/{client.instance_id}/logProcessingPipelines" + + params: Dict[str, Any] = {} + if page_size is not None: + params["pageSize"] = page_size + if page_token: + params["pageToken"] = page_token + if filter_expr: + params["filter"] = filter_expr + + response = client.session.get(url, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to list log processing pipelines: {response.text}" + ) + + return response.json() + + +def get_log_processing_pipeline(client, pipeline_id: str) -> Dict[str, Any]: + """Gets a log processing pipeline by ID. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to retrieve. + + Returns: + Dictionary containing pipeline information. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + + response = client.session.get(url) + if response.status_code != 200: + raise APIError( + f"Failed to get log processing pipeline: {response.text}" + ) + + return response.json() + + +def create_log_processing_pipeline( + client, + pipeline: Dict[str, Any], + pipeline_id: Optional[str] = None, +) -> Dict[str, Any]: + """Creates a new log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline: LogProcessingPipeline configuration dict containing: + - displayName: Display name for the pipeline + - description: Optional description + - processors: List of processor configurations + - customMetadata: Optional custom metadata list + pipeline_id: Optional ID for the pipeline. If omitted, server + assigns a unique ID. + + Returns: + Dictionary containing the created pipeline. + + Raises: + APIError: If the API request fails. + """ + url = f"{client.base_url}/{client.instance_id}/logProcessingPipelines" + + params: Dict[str, Any] = {} + if pipeline_id: + params["logProcessingPipelineId"] = pipeline_id + + response = client.session.post(url, json=pipeline, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to create log processing pipeline: {response.text}" + ) + + return response.json() + + +def patch_log_processing_pipeline( + client, + pipeline_id: str, + pipeline: Dict[str, Any], + update_mask: Optional[str] = None, +) -> Dict[str, Any]: + """Updates a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to update. + pipeline: LogProcessingPipeline configuration dict with fields + to update. + update_mask: Optional comma-separated list of fields to update + (e.g., "displayName,description"). If not included, all + fields with default/non-default values will be overwritten. + + Returns: + Dictionary containing the updated pipeline. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + + params: Dict[str, Any] = {} + if update_mask: + params["updateMask"] = update_mask + + response = client.session.patch(url, json=pipeline, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to patch log processing pipeline: {response.text}" + ) + + return response.json() + + +def delete_log_processing_pipeline( + client, pipeline_id: str, etag: Optional[str] = None +) -> Dict[str, Any]: + """Deletes a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to delete. + etag: Optional etag value. If provided, deletion only succeeds + if the resource's current etag matches this value. + + Returns: + Empty dictionary on successful deletion. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + + params: Dict[str, Any] = {} + if etag: + params["etag"] = etag + + response = client.session.delete(url, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to delete log processing pipeline: {response.text}" + ) + + return response.json() + + +def associate_streams( + client, pipeline_id: str, streams: List[Dict[str, Any]] +) -> Dict[str, Any]: + """Associates streams with a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to associate streams with. + streams: List of stream dicts. Each stream can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}:associateStreams" + ) + + body = {"streams": streams} + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError(f"Failed to associate streams: {response.text}") + + return response.json() + + +def dissociate_streams( + client, pipeline_id: str, streams: List[Dict[str, Any]] +) -> Dict[str, Any]: + """Dissociates streams from a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to dissociate streams from. + streams: List of stream dicts. Each stream can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}:dissociateStreams" + ) + + body = {"streams": streams} + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError(f"Failed to dissociate streams: {response.text}") + + return response.json() + + +def fetch_associated_pipeline(client, stream: Dict[str, Any]) -> Dict[str, Any]: + """Fetches the pipeline associated with a specific stream. + + Args: + client: ChronicleClient instance. + stream: Stream dict, can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + + Returns: + Dictionary containing the associated pipeline. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines:fetchAssociatedPipeline" + ) + + params = {"stream": stream} + + response = client.session.get(url, params=params) + if response.status_code != 200: + raise APIError(f"Failed to fetch associated pipeline: {response.text}") + + return response.json() + + +def fetch_sample_logs_by_streams( + client, + streams: List[Dict[str, Any]], + sample_logs_count: Optional[int] = None, +) -> Dict[str, Any]: + """Fetches sample logs for specified streams. + + Args: + client: ChronicleClient instance. + streams: List of stream dicts. Each stream can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + sample_logs_count: Number of sample logs to fetch per stream. + Default is 100. Max is 1000 or 4MB per stream. + + Returns: + Dictionary containing: + - logs: List of log objects + - sampleLogs: List of base64-encoded log strings (deprecated) + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines:fetchSampleLogsByStreams" + ) + + body: Dict[str, Any] = {"streams": streams} + if sample_logs_count is not None: + body["sampleLogsCount"] = sample_logs_count + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError( + f"Failed to fetch sample logs by streams: {response.text}" + ) + + return response.json() + + +def test_pipeline( + client, + pipeline: Dict[str, Any], + input_logs: List[Dict[str, Any]], +) -> Dict[str, Any]: + """Tests a log processing pipeline with input logs. + + Args: + client: ChronicleClient instance. + pipeline: LogProcessingPipeline configuration to test. + input_logs: List of log objects to process through the pipeline. + + Returns: + Dictionary containing: + - logs: List of processed log objects + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines:testPipeline" + ) + + body = {"logProcessingPipeline": pipeline, "inputLogs": input_logs} + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError(f"Failed to test pipeline: {response.text}") + + return response.json() From a22f9fab61a8fdf704bb4c26e9d8598a68afda45 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 11 Dec 2025 11:51:49 +0530 Subject: [PATCH 02/10] chore: updated for p310 syntax --- src/secops/chronicle/client.py | 50 ++++++++-------- .../chronicle/log_processing_pipelines.py | 60 +++++++++---------- 2 files changed, 55 insertions(+), 55 deletions(-) diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 2ab558c5..cca68320 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -1336,10 +1336,10 @@ def delete_feed( def list_log_processing_pipelines( self, - page_size: Optional[int] = None, - page_token: Optional[str] = None, - filter_expr: Optional[str] = None, - ) -> Dict[str, Any]: + page_size: int | None = None, + page_token: str | None = None, + filter_expr: str | None = None, + ) -> dict[str, Any]: """Lists log processing pipelines. Args: @@ -1357,7 +1357,7 @@ def list_log_processing_pipelines( self, page_size, page_token, filter_expr ) - def get_log_processing_pipeline(self, pipeline_id: str) -> Dict[str, Any]: + def get_log_processing_pipeline(self, pipeline_id: str) -> dict[str, Any]: """Gets a log processing pipeline by ID. Args: @@ -1373,9 +1373,9 @@ def get_log_processing_pipeline(self, pipeline_id: str) -> Dict[str, Any]: def create_log_processing_pipeline( self, - pipeline: Dict[str, Any], - pipeline_id: Optional[str] = None, - ) -> Dict[str, Any]: + pipeline: dict[str, Any], + pipeline_id: str | None = None, + ) -> dict[str, Any]: """Creates a new log processing pipeline. Args: @@ -1393,9 +1393,9 @@ def create_log_processing_pipeline( def patch_log_processing_pipeline( self, pipeline_id: str, - pipeline: Dict[str, Any], - update_mask: Optional[str] = None, - ) -> Dict[str, Any]: + pipeline: dict[str, Any], + update_mask: str | None = None, + ) -> dict[str, Any]: """Updates a log processing pipeline. Args: @@ -1414,8 +1414,8 @@ def patch_log_processing_pipeline( ) def delete_log_processing_pipeline( - self, pipeline_id: str, etag: Optional[str] = None - ) -> Dict[str, Any]: + self, pipeline_id: str, etag: str | None = None + ) -> dict[str, Any]: """Deletes a log processing pipeline. Args: @@ -1431,8 +1431,8 @@ def delete_log_processing_pipeline( return _delete_log_processing_pipeline(self, pipeline_id, etag) def associate_streams( - self, pipeline_id: str, streams: List[Dict[str, Any]] - ) -> Dict[str, Any]: + self, pipeline_id: str, streams: list[dict[str, Any]] + ) -> dict[str, Any]: """Associates streams with a pipeline. Args: @@ -1448,8 +1448,8 @@ def associate_streams( return _associate_streams(self, pipeline_id, streams) def dissociate_streams( - self, pipeline_id: str, streams: List[Dict[str, Any]] - ) -> Dict[str, Any]: + self, pipeline_id: str, streams: list[dict[str, Any]] + ) -> dict[str, Any]: """Dissociates streams from a pipeline. Args: @@ -1465,8 +1465,8 @@ def dissociate_streams( return _dissociate_streams(self, pipeline_id, streams) def fetch_associated_pipeline( - self, stream: Dict[str, Any] - ) -> Dict[str, Any]: + self, stream: dict[str, Any] + ) -> dict[str, Any]: """Fetches the pipeline associated with a stream. Args: @@ -1482,9 +1482,9 @@ def fetch_associated_pipeline( def fetch_sample_logs_by_streams( self, - streams: List[Dict[str, Any]], - sample_logs_count: Optional[int] = None, - ) -> Dict[str, Any]: + streams: list[dict[str, Any]], + sample_logs_count: int | None = None, + ) -> dict[str, Any]: """Fetches sample logs for specified streams. Args: @@ -1501,9 +1501,9 @@ def fetch_sample_logs_by_streams( def test_pipeline( self, - pipeline: Dict[str, Any], - input_logs: List[Dict[str, Any]], - ) -> Dict[str, Any]: + pipeline: dict[str, Any], + input_logs: list[dict[str, Any]], + ) -> dict[str, Any]: """Tests a pipeline with input logs. Args: diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py index b3ce3af0..815f06b3 100644 --- a/src/secops/chronicle/log_processing_pipelines.py +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -14,17 +14,17 @@ # """Provides log processing pipeline management for Chronicle.""" -from typing import Any, Dict, List, Optional +from typing import Any from secops.exceptions import APIError def list_log_processing_pipelines( client, - page_size: Optional[int] = None, - page_token: Optional[str] = None, - filter_expr: Optional[str] = None, -) -> Dict[str, Any]: + page_size: int | None = None, + page_token: str | None = None, + filter_expr: str | None = None, +) -> dict[str, Any]: """Lists log processing pipelines. Args: @@ -45,7 +45,7 @@ def list_log_processing_pipelines( """ url = f"{client.base_url}/{client.instance_id}/logProcessingPipelines" - params: Dict[str, Any] = {} + params: dict[str, Any] = {} if page_size is not None: params["pageSize"] = page_size if page_token: @@ -62,7 +62,7 @@ def list_log_processing_pipelines( return response.json() -def get_log_processing_pipeline(client, pipeline_id: str) -> Dict[str, Any]: +def get_log_processing_pipeline(client, pipeline_id: str) -> dict[str, Any]: """Gets a log processing pipeline by ID. Args: @@ -91,9 +91,9 @@ def get_log_processing_pipeline(client, pipeline_id: str) -> Dict[str, Any]: def create_log_processing_pipeline( client, - pipeline: Dict[str, Any], - pipeline_id: Optional[str] = None, -) -> Dict[str, Any]: + pipeline: dict[str, Any], + pipeline_id: str | None = None, +) -> dict[str, Any]: """Creates a new log processing pipeline. Args: @@ -114,7 +114,7 @@ def create_log_processing_pipeline( """ url = f"{client.base_url}/{client.instance_id}/logProcessingPipelines" - params: Dict[str, Any] = {} + params: dict[str, Any] = {} if pipeline_id: params["logProcessingPipelineId"] = pipeline_id @@ -130,9 +130,9 @@ def create_log_processing_pipeline( def patch_log_processing_pipeline( client, pipeline_id: str, - pipeline: Dict[str, Any], - update_mask: Optional[str] = None, -) -> Dict[str, Any]: + pipeline: dict[str, Any], + update_mask: str | None = None, +) -> dict[str, Any]: """Updates a log processing pipeline. Args: @@ -155,7 +155,7 @@ def patch_log_processing_pipeline( f"logProcessingPipelines/{pipeline_id}" ) - params: Dict[str, Any] = {} + params: dict[str, Any] = {} if update_mask: params["updateMask"] = update_mask @@ -169,8 +169,8 @@ def patch_log_processing_pipeline( def delete_log_processing_pipeline( - client, pipeline_id: str, etag: Optional[str] = None -) -> Dict[str, Any]: + client, pipeline_id: str, etag: str | None = None +) -> dict[str, Any]: """Deletes a log processing pipeline. Args: @@ -190,7 +190,7 @@ def delete_log_processing_pipeline( f"logProcessingPipelines/{pipeline_id}" ) - params: Dict[str, Any] = {} + params: dict[str, Any] = {} if etag: params["etag"] = etag @@ -204,8 +204,8 @@ def delete_log_processing_pipeline( def associate_streams( - client, pipeline_id: str, streams: List[Dict[str, Any]] -) -> Dict[str, Any]: + client, pipeline_id: str, streams: list[dict[str, Any]] +) -> dict[str, Any]: """Associates streams with a log processing pipeline. Args: @@ -236,8 +236,8 @@ def associate_streams( def dissociate_streams( - client, pipeline_id: str, streams: List[Dict[str, Any]] -) -> Dict[str, Any]: + client, pipeline_id: str, streams: list[dict[str, Any]] +) -> dict[str, Any]: """Dissociates streams from a log processing pipeline. Args: @@ -267,7 +267,7 @@ def dissociate_streams( return response.json() -def fetch_associated_pipeline(client, stream: Dict[str, Any]) -> Dict[str, Any]: +def fetch_associated_pipeline(client, stream: dict[str, Any]) -> dict[str, Any]: """Fetches the pipeline associated with a specific stream. Args: @@ -298,9 +298,9 @@ def fetch_associated_pipeline(client, stream: Dict[str, Any]) -> Dict[str, Any]: def fetch_sample_logs_by_streams( client, - streams: List[Dict[str, Any]], - sample_logs_count: Optional[int] = None, -) -> Dict[str, Any]: + streams: list[dict[str, Any]], + sample_logs_count: int | None = None, +) -> dict[str, Any]: """Fetches sample logs for specified streams. Args: @@ -324,7 +324,7 @@ def fetch_sample_logs_by_streams( f"logProcessingPipelines:fetchSampleLogsByStreams" ) - body: Dict[str, Any] = {"streams": streams} + body: dict[str, Any] = {"streams": streams} if sample_logs_count is not None: body["sampleLogsCount"] = sample_logs_count @@ -339,9 +339,9 @@ def fetch_sample_logs_by_streams( def test_pipeline( client, - pipeline: Dict[str, Any], - input_logs: List[Dict[str, Any]], -) -> Dict[str, Any]: + pipeline: dict[str, Any], + input_logs: list[dict[str, Any]], +) -> dict[str, Any]: """Tests a log processing pipeline with input logs. Args: From ebd838b7c1736c0cfe9f7b23cd74048bcdd61ab1 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 11 Dec 2025 18:02:42 +0530 Subject: [PATCH 03/10] chore: added CLI support --- src/secops/cli/cli_client.py | 4 + src/secops/cli/commands/log_processing.py | 342 ++++++++++++++++++++++ src/secops/cli/utils/input_utils.py | 79 +++++ 3 files changed, 425 insertions(+) create mode 100644 src/secops/cli/commands/log_processing.py create mode 100644 src/secops/cli/utils/input_utils.py diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index 499949ec..f48a3f82 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -22,6 +22,9 @@ from secops.cli.commands.help import setup_help_command from secops.cli.commands.iocs import setup_iocs_command from secops.cli.commands.log import setup_log_command +from secops.cli.commands.log_processing import ( + setup_log_processing_command, +) from secops.cli.commands.parser import setup_parser_command from secops.cli.commands.parser_extension import setup_parser_extension_command from secops.cli.commands.reference_list import setup_reference_list_command @@ -158,6 +161,7 @@ def build_parser() -> argparse.ArgumentParser: setup_entity_command(subparsers) setup_iocs_command(subparsers) setup_log_command(subparsers) + setup_log_processing_command(subparsers) setup_parser_command(subparsers) setup_parser_extension_command(subparsers) setup_feed_command(subparsers) diff --git a/src/secops/cli/commands/log_processing.py b/src/secops/cli/commands/log_processing.py new file mode 100644 index 00000000..25b5ca7d --- /dev/null +++ b/src/secops/cli/commands/log_processing.py @@ -0,0 +1,342 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI log processing pipeline commands""" + +import sys + +from secops.cli.utils.formatters import output_formatter +from secops.cli.utils.input_utils import load_json_or_file + + +def setup_log_processing_command(subparsers): + """Set up the log-processing command parser.""" + log_processing_parser = subparsers.add_parser( + "log-processing", help="Manage log processing pipelines" + ) + log_processing_subparsers = log_processing_parser.add_subparsers( + dest="log_processing_command", help="Log processing command" + ) + log_processing_parser.set_defaults( + func=lambda args, _: log_processing_parser.print_help() + ) + + # List pipelines command + list_parser = log_processing_subparsers.add_parser( + "list", help="List log processing pipelines" + ) + list_parser.add_argument( + "--page-size", + "--page_size", + dest="page_size", + type=int, + help="Maximum number of pipelines to return", + ) + list_parser.add_argument( + "--page-token", + "--page_token", + dest="page_token", + help="Page token for pagination", + ) + list_parser.add_argument( + "--filter", help="Filter expression to restrict results" + ) + list_parser.set_defaults(func=handle_list_command) + + # Get pipeline command + get_parser = log_processing_subparsers.add_parser( + "get", help="Get a log processing pipeline" + ) + get_parser.add_argument("--id", required=True, help="Pipeline ID") + get_parser.set_defaults(func=handle_get_command) + + # Create pipeline command + create_parser = log_processing_subparsers.add_parser( + "create", help="Create a log processing pipeline" + ) + create_parser.add_argument( + "--pipeline", + required=True, + help="Pipeline config as JSON string or file path", + ) + create_parser.add_argument("--id", help="Optional pipeline ID") + create_parser.set_defaults(func=handle_create_command) + + # Update pipeline command + update_parser = log_processing_subparsers.add_parser( + "update", help="Update a log processing pipeline" + ) + update_parser.add_argument("--id", required=True, help="Pipeline ID") + update_parser.add_argument( + "--pipeline", + required=True, + help="Pipeline config as JSON string or file path", + ) + update_parser.add_argument( + "--update-mask", + "--update_mask", + dest="update_mask", + help="Comma-separated list of fields to update", + ) + update_parser.set_defaults(func=handle_update_command) + + # Delete pipeline command + delete_parser = log_processing_subparsers.add_parser( + "delete", help="Delete a log processing pipeline" + ) + delete_parser.add_argument("--id", required=True, help="Pipeline ID") + delete_parser.add_argument( + "--etag", help="Optional etag for concurrency control" + ) + delete_parser.set_defaults(func=handle_delete_command) + + # Associate streams command + associate_streams_parser = log_processing_subparsers.add_parser( + "associate-streams", help="Associate streams with a pipeline" + ) + associate_streams_parser.add_argument( + "--id", required=True, help="Pipeline ID" + ) + associate_streams_parser.add_argument( + "--streams", + required=True, + help="JSON array of stream objects or file path", + ) + associate_streams_parser.set_defaults(func=handle_associate_streams_command) + + # Dissociate streams command + dissociate_streams_parser = log_processing_subparsers.add_parser( + "dissociate-streams", help="Dissociate streams from a pipeline" + ) + dissociate_streams_parser.add_argument( + "--id", required=True, help="Pipeline ID" + ) + dissociate_streams_parser.add_argument( + "--streams", + required=True, + help="JSON array of stream objects or file path", + ) + dissociate_streams_parser.set_defaults( + func=handle_dissociate_streams_command + ) + + # Fetch associated pipeline command + fetch_associated_parser = log_processing_subparsers.add_parser( + "fetch-associated", help="Fetch pipeline associated with a stream" + ) + fetch_associated_parser.add_argument( + "--stream", + required=True, + help="Stream object as JSON string or file path", + ) + fetch_associated_parser.set_defaults(func=handle_fetch_associated_command) + + # Fetch sample logs command + fetch_sample_logs_parser = log_processing_subparsers.add_parser( + "fetch-sample-logs", help="Fetch sample logs by streams" + ) + fetch_sample_logs_parser.add_argument( + "--streams", + required=True, + help="JSON array of stream objects or file path", + ) + fetch_sample_logs_parser.add_argument( + "--count", type=int, help="Number of sample logs per stream (max 1000)" + ) + fetch_sample_logs_parser.set_defaults(func=handle_fetch_sample_logs_command) + + # Test pipeline command + test_parser = log_processing_subparsers.add_parser( + "test", help="Test a pipeline with input logs" + ) + test_parser.add_argument( + "--pipeline", + required=True, + help="Pipeline config as JSON or file path", + ) + test_parser.add_argument( + "--input-logs", + "--input_logs", + dest="input_logs", + required=True, + help="Input logs as JSON array or file path", + ) + test_parser.set_defaults(func=handle_test_command) + + +def handle_list_command(args, chronicle): + """Handle list log processing pipelines command.""" + try: + result = chronicle.list_log_processing_pipelines( + page_size=args.page_size, + page_token=args.page_token, + filter_expr=args.filter, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_get_command(args, chronicle): + """Handle get log processing pipeline command.""" + try: + result = chronicle.get_log_processing_pipeline(args.id) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_create_command(args, chronicle): + """Handle create log processing pipeline command.""" + try: + pipeline_config = load_json_or_file(args.pipeline) + + if not isinstance(pipeline_config, dict): + print("Error: pipeline must be a JSON object", file=sys.stderr) + sys.exit(1) + + result = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config, pipeline_id=args.id + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_update_command(args, chronicle): + """Handle update log processing pipeline command.""" + try: + pipeline_config = load_json_or_file(args.pipeline) + + if not isinstance(pipeline_config, dict): + print("Error: pipeline must be a JSON object", file=sys.stderr) + sys.exit(1) + + result = chronicle.patch_log_processing_pipeline( + pipeline_id=args.id, + pipeline=pipeline_config, + update_mask=args.update_mask, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_delete_command(args, chronicle): + """Handle delete log processing pipeline command.""" + try: + result = chronicle.delete_log_processing_pipeline( + pipeline_id=args.id, etag=args.etag + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_associate_streams_command(args, chronicle): + """Handle associate streams command.""" + try: + streams = load_json_or_file(args.streams) + + if not isinstance(streams, list): + print("Error: streams must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.associate_streams( + pipeline_id=args.id, streams=streams + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_dissociate_streams_command(args, chronicle): + """Handle dissociate streams command.""" + try: + streams = load_json_or_file(args.streams) + + if not isinstance(streams, list): + print("Error: streams must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.dissociate_streams( + pipeline_id=args.id, streams=streams + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_fetch_associated_command(args, chronicle): + """Handle fetch associated pipeline command.""" + try: + stream = load_json_or_file(args.stream) + + if not isinstance(stream, dict): + print("Error: stream must be a JSON object", file=sys.stderr) + sys.exit(1) + + result = chronicle.fetch_associated_pipeline(stream=stream) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_fetch_sample_logs_command(args, chronicle): + """Handle fetch sample logs by streams command.""" + try: + streams = load_json_or_file(args.streams) + + if not isinstance(streams, list): + print("Error: streams must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.fetch_sample_logs_by_streams( + streams=streams, sample_logs_count=args.count + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_test_command(args, chronicle): + """Handle test pipeline command.""" + try: + pipeline = load_json_or_file(args.pipeline) + input_logs = load_json_or_file(args.input_logs) + + if not isinstance(pipeline, dict): + print("Error: pipeline must be a JSON object", file=sys.stderr) + sys.exit(1) + + if not isinstance(input_logs, list): + print("Error: input_logs must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.test_pipeline( + pipeline=pipeline, input_logs=input_logs + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/src/secops/cli/utils/input_utils.py b/src/secops/cli/utils/input_utils.py new file mode 100644 index 00000000..f810c72a --- /dev/null +++ b/src/secops/cli/utils/input_utils.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI input utilities""" + +import json +import sys +from pathlib import Path +from typing import Any + + +def load_json_or_file(value: str) -> Any: + """Load JSON from string or file path. + + Args: + value: JSON string or file path + + Returns: + Parsed JSON object (dict, list, etc.) + + Raises: + SystemExit: If file not found or JSON parsing fails + """ + try: + file_path = Path(value) + if file_path.exists() and file_path.is_file(): + with open(file_path, encoding="utf-8") as f: + return json.load(f) + except json.JSONDecodeError as e: + print(f"Error parsing JSON from file: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Error reading file: {e}", file=sys.stderr) + sys.exit(1) + + try: + return json.loads(value) + except json.JSONDecodeError as e: + print( + f"Error: Not a valid JSON string or file path: {value}", + file=sys.stderr, + ) + print(f"JSON parse error: {e}", file=sys.stderr) + sys.exit(1) + + +def load_string_or_file(value: str) -> str: + """Load string content from direct value or file path. + + Args: + value: String content or file path + + Returns: + String content + + Raises: + SystemExit: If file exists but cannot be read + """ + try: + file_path = Path(value) + if file_path.exists() and file_path.is_file(): + with open(file_path, encoding="utf-8") as f: + return f.read() + except Exception as e: + print(f"Error reading file: {e}", file=sys.stderr) + sys.exit(1) + + return value From ed0babf727d61166c376d1048bdb65b496bf6f3a Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Fri, 12 Dec 2025 18:25:01 +0530 Subject: [PATCH 04/10] chore: fixed examples. --- examples/log_processing_pipelines_example.py | 78 ++++++++++++++++--- .../chronicle/log_processing_pipelines.py | 16 ++-- 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/examples/log_processing_pipelines_example.py b/examples/log_processing_pipelines_example.py index 9a6a1c02..97a7c5f4 100644 --- a/examples/log_processing_pipelines_example.py +++ b/examples/log_processing_pipelines_example.py @@ -2,9 +2,11 @@ """Example usage of the Google SecOps SDK for Log Processing Pipelines.""" import argparse +import base64 import json import time import uuid +from datetime import datetime, timezone from secops import SecOpsClient @@ -74,8 +76,11 @@ def example_create_and_get_pipeline(chronicle): "processors": [ { "filterProcessor": { - "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, - "errorMode": "ERROR_MODE_UNSPECIFIED", + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", } } ], @@ -143,8 +148,11 @@ def example_update_pipeline(chronicle): "processors": [ { "filterProcessor": { - "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, - "errorMode": "ERROR_MODE_UNSPECIFIED", + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", } } ], @@ -170,6 +178,7 @@ def example_update_pipeline(chronicle): "name": created_pipeline.get("name"), "displayName": f"Updated {display_name}", "description": "Updated description via SDK", + "processors": created_pipeline.get("processors"), } print("\nUpdating pipeline...") @@ -213,8 +222,11 @@ def example_stream_association(chronicle): "processors": [ { "filterProcessor": { - "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, - "errorMode": "ERROR_MODE_UNSPECIFIED", + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", } } ], @@ -282,17 +294,30 @@ def example_test_pipeline(chronicle): "processors": [ { "filterProcessor": { - "include": {"logMatchType": "LOG_MATCH_TYPE_UNSPECIFIED"}, - "errorMode": "ERROR_MODE_UNSPECIFIED", + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", } } ], } - # Sample input logs + # Sample input logs with proper Log resource structure + current_time = datetime.now(timezone.utc).isoformat() + input_logs = [ - {"logText": "Sample log entry 1"}, - {"logText": "Sample log entry 2"}, + { + "data": base64.b64encode(b"Sample log entry 1").decode("utf-8"), + "logEntryTime": current_time, + "collectionTime": current_time, + }, + { + "data": base64.b64encode(b"Sample log entry 2").decode("utf-8"), + "logEntryTime": current_time, + "collectionTime": current_time, + }, ] try: @@ -347,6 +372,34 @@ def example_fetch_associated_pipeline(chronicle): ) +def example_fetch_sample_logs(chronicle): + """Example 7: Fetch Sample Logs by Streams.""" + print("\n=== Example 7: Fetch Sample Logs by Streams ===") + + # Define streams to fetch sample logs from + # Note: Replace with actual log type or feed ID from your environment + streams = [{"logType": "WINEVTLOG"}] + + try: + print(f"\nFetching sample logs for streams: {json.dumps(streams)}") + result = chronicle.fetch_sample_logs_by_streams( + streams=streams, sample_logs_count=5 + ) + + logs = result.get("logs", []) + print(f"\nFetched {len(logs)} sample log(s)") + + if logs: + print("\nFirst sample log:") + print(json.dumps(logs[0], indent=2)) + else: + print("No sample logs available for the specified streams.") + + except Exception as e: + print(f"Error fetching sample logs: {e}") + print("Note: Make sure the streams exist and have ingested logs.") + + # Map of example functions EXAMPLES = { "1": example_list_pipelines, @@ -355,6 +408,7 @@ def example_fetch_associated_pipeline(chronicle): "4": example_stream_association, "5": example_test_pipeline, "6": example_fetch_associated_pipeline, + "7": example_fetch_sample_logs, } @@ -376,7 +430,7 @@ def main(): "--example", "-e", help=( - "Example number to run (1-6). " + "Example number to run (1-7). " "If not specified, runs all examples." ), ) diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py index 815f06b3..ccd73360 100644 --- a/src/secops/chronicle/log_processing_pipelines.py +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -150,10 +150,13 @@ def patch_log_processing_pipeline( Raises: APIError: If the API request fails. """ - url = ( - f"{client.base_url}/{client.instance_id}/" - f"logProcessingPipelines/{pipeline_id}" - ) + if "/projects/" not in pipeline_id: + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + else: + url = f"{client.base_url}/{pipeline_id}" params: dict[str, Any] = {} if update_mask: @@ -287,7 +290,10 @@ def fetch_associated_pipeline(client, stream: dict[str, Any]) -> dict[str, Any]: f"logProcessingPipelines:fetchAssociatedPipeline" ) - params = {"stream": stream} + # Pass stream fields as separate query parameters with stream. prefix + params = {} + for key, value in stream.items(): + params[f"stream.{key}"] = value response = client.session.get(url, params=params) if response.status_code != 200: From 757d7dfe87144dd5788fa1092e87dbe1cf557d2c Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Mon, 15 Dec 2025 14:46:56 +0530 Subject: [PATCH 05/10] chore: added unit tests. updated pipeline id handling. --- .../chronicle/log_processing_pipelines.py | 47 +- .../chronicle/test_log_processing_pipeline.py | 706 ++++++++++++++++++ 2 files changed, 735 insertions(+), 18 deletions(-) create mode 100644 tests/chronicle/test_log_processing_pipeline.py diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py index ccd73360..2b818d10 100644 --- a/src/secops/chronicle/log_processing_pipelines.py +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -75,10 +75,13 @@ def get_log_processing_pipeline(client, pipeline_id: str) -> dict[str, Any]: Raises: APIError: If the API request fails. """ - url = ( - f"{client.base_url}/{client.instance_id}/" - f"logProcessingPipelines/{pipeline_id}" - ) + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + else: + url = f"{client.base_url}/{pipeline_id}" response = client.session.get(url) if response.status_code != 200: @@ -150,7 +153,7 @@ def patch_log_processing_pipeline( Raises: APIError: If the API request fails. """ - if "/projects/" not in pipeline_id: + if not pipeline_id.startswith("projects/"): url = ( f"{client.base_url}/{client.instance_id}/" f"logProcessingPipelines/{pipeline_id}" @@ -188,10 +191,13 @@ def delete_log_processing_pipeline( Raises: APIError: If the API request fails. """ - url = ( - f"{client.base_url}/{client.instance_id}/" - f"logProcessingPipelines/{pipeline_id}" - ) + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + else: + url = f"{client.base_url}/{pipeline_id}" params: dict[str, Any] = {} if etag: @@ -224,11 +230,13 @@ def associate_streams( Raises: APIError: If the API request fails. """ - url = ( - f"{client.base_url}/{client.instance_id}/" - f"logProcessingPipelines/{pipeline_id}:associateStreams" - ) - + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}:associateStreams" + ) + else: + url = f"{client.base_url}/{pipeline_id}:associateStreams" body = {"streams": streams} response = client.session.post(url, json=body) @@ -256,10 +264,13 @@ def dissociate_streams( Raises: APIError: If the API request fails. """ - url = ( - f"{client.base_url}/{client.instance_id}/" - f"logProcessingPipelines/{pipeline_id}:dissociateStreams" - ) + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}:dissociateStreams" + ) + else: + url = f"{client.base_url}/{pipeline_id}:dissociateStreams" body = {"streams": streams} diff --git a/tests/chronicle/test_log_processing_pipeline.py b/tests/chronicle/test_log_processing_pipeline.py new file mode 100644 index 00000000..8e3f10f4 --- /dev/null +++ b/tests/chronicle/test_log_processing_pipeline.py @@ -0,0 +1,706 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for Chronicle log processing pipeline functions.""" + +import pytest +from unittest.mock import Mock, patch + +from secops.chronicle.client import ChronicleClient +from secops.chronicle.log_processing_pipelines import ( + list_log_processing_pipelines, + get_log_processing_pipeline, + create_log_processing_pipeline, + patch_log_processing_pipeline, + delete_log_processing_pipeline, + associate_streams, + dissociate_streams, + fetch_associated_pipeline, + fetch_sample_logs_by_streams, + test_pipeline as pipeline_test_function, +) +from secops.exceptions import APIError + + +@pytest.fixture +def chronicle_client(): + """Create a Chronicle client for testing.""" + with patch("secops.auth.SecOpsAuth") as mock_auth: + mock_session = Mock() + mock_session.headers = {} + mock_auth.return_value.session = mock_session + return ChronicleClient( + customer_id="test-customer", project_id="test-project" + ) + + +@pytest.fixture +def mock_response(): + """Create a mock API response.""" + mock = Mock() + mock.status_code = 200 + mock.json.return_value = { + "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", + "displayName": "Test Pipeline", + "description": "Test pipeline description", + "processors": [{"filterProcessor": {"include": {}}}], + } + return mock + + +@pytest.fixture +def mock_error_response(): + """Create a mock error API response.""" + mock = Mock() + mock.status_code = 400 + mock.text = "Error message" + return mock + + +def test_list_log_processing_pipelines(chronicle_client, mock_response): + """Test list_log_processing_pipelines function.""" + mock_response.json.return_value = { + "logProcessingPipelines": [ + {"name": "pipeline1"}, + {"name": "pipeline2"}, + ] + } + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = list_log_processing_pipelines(chronicle_client) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + params={}, + ) + assert result == mock_response.json.return_value + + +def test_list_log_processing_pipelines_with_params( + chronicle_client, mock_response +): + """Test list_log_processing_pipelines with pagination and filter.""" + mock_response.json.return_value = { + "logProcessingPipelines": [{"name": "pipeline1"}], + "nextPageToken": "token123", + } + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = list_log_processing_pipelines( + chronicle_client, + page_size=50, + page_token="prev_token", + filter_expr='displayName="Test"', + ) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + params={ + "pageSize": 50, + "pageToken": "prev_token", + "filter": 'displayName="Test"', + }, + ) + assert result == mock_response.json.return_value + + +def test_list_log_processing_pipelines_error( + chronicle_client, mock_error_response +): + """Test list_log_processing_pipelines with error response.""" + with patch.object( + chronicle_client.session, "get", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + list_log_processing_pipelines(chronicle_client) + + assert "Failed to list log processing pipelines" in str(exc_info.value) + + +def test_get_log_processing_pipeline(chronicle_client, mock_response): + """Test get_log_processing_pipeline function.""" + pipeline_id = "pipeline_12345" + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = get_log_processing_pipeline(chronicle_client, pipeline_id) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}" + ) + assert result == mock_response.json.return_value + + +def test_get_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test get_log_processing_pipeline with error response.""" + pipeline_id = "pipeline_12345" + + with patch.object( + chronicle_client.session, "get", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + get_log_processing_pipeline(chronicle_client, pipeline_id) + + assert "Failed to get log processing pipeline" in str(exc_info.value) + + +def test_create_log_processing_pipeline(chronicle_client, mock_response): + """Test create_log_processing_pipeline function.""" + pipeline_config = { + "displayName": "Test Pipeline", + "description": "Test description", + "processors": [{"filterProcessor": {"include": {}}}], + } + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = create_log_processing_pipeline( + chronicle_client, pipeline_config + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + json=pipeline_config, + params={}, + ) + assert result == mock_response.json.return_value + + +def test_create_log_processing_pipeline_with_id( + chronicle_client, mock_response +): + """Test create_log_processing_pipeline with custom pipeline ID.""" + pipeline_config = { + "displayName": "Test Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + pipeline_id = "custom_pipeline_id" + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = create_log_processing_pipeline( + chronicle_client, pipeline_config, pipeline_id=pipeline_id + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + json=pipeline_config, + params={"logProcessingPipelineId": pipeline_id}, + ) + assert result == mock_response.json.return_value + + +def test_create_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test create_log_processing_pipeline with error response.""" + pipeline_config = {"displayName": "Test Pipeline"} + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + create_log_processing_pipeline(chronicle_client, pipeline_config) + + assert "Failed to create log processing pipeline" in str(exc_info.value) + + +def test_patch_log_processing_pipeline(chronicle_client, mock_response): + """Test patch_log_processing_pipeline function.""" + pipeline_id = "pipeline_12345" + pipeline_config = { + "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", + "displayName": "Updated Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = patch_log_processing_pipeline( + chronicle_client, pipeline_id, pipeline_config + ) + + mock_patch.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + json=pipeline_config, + params={}, + ) + assert result == mock_response.json.return_value + + +def test_patch_log_processing_pipeline_with_update_mask( + chronicle_client, mock_response +): + """Test patch_log_processing_pipeline with update mask.""" + pipeline_id = "pipeline_12345" + pipeline_config = { + "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", + "displayName": "Updated Pipeline", + } + update_mask = "displayName,description" + + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = patch_log_processing_pipeline( + chronicle_client, + pipeline_id, + pipeline_config, + update_mask=update_mask, + ) + + mock_patch.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + json=pipeline_config, + params={"updateMask": update_mask}, + ) + assert result == mock_response.json.return_value + + +def test_patch_log_processing_pipeline_with_full_name( + chronicle_client, mock_response +): + """Test patch_log_processing_pipeline with full resource name.""" + full_name = "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345" + pipeline_config = { + "name": full_name, + "displayName": "Updated Pipeline", + } + + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = patch_log_processing_pipeline( + chronicle_client, full_name, pipeline_config + ) + + mock_patch.assert_called_once_with( + f"{chronicle_client.base_url}/{full_name}", + json=pipeline_config, + params={}, + ) + assert result == mock_response.json.return_value + + +def test_patch_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test patch_log_processing_pipeline with error response.""" + pipeline_id = "pipeline_12345" + pipeline_config = {"displayName": "Updated Pipeline"} + + with patch.object( + chronicle_client.session, "patch", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + patch_log_processing_pipeline( + chronicle_client, pipeline_id, pipeline_config + ) + + assert "Failed to patch log processing pipeline" in str(exc_info.value) + + +def test_delete_log_processing_pipeline(chronicle_client, mock_response): + """Test delete_log_processing_pipeline function.""" + pipeline_id = "pipeline_12345" + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "delete", return_value=mock_response + ) as mock_delete: + result = delete_log_processing_pipeline(chronicle_client, pipeline_id) + + mock_delete.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + params={}, + ) + assert result == {} + + +def test_delete_log_processing_pipeline_with_etag( + chronicle_client, mock_response +): + """Test delete_log_processing_pipeline with etag.""" + pipeline_id = "pipeline_12345" + etag = "etag_value_123" + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "delete", return_value=mock_response + ) as mock_delete: + result = delete_log_processing_pipeline( + chronicle_client, pipeline_id, etag=etag + ) + + mock_delete.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + params={"etag": etag}, + ) + assert result == {} + + +def test_delete_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test delete_log_processing_pipeline with error response.""" + pipeline_id = "pipeline_12345" + + with patch.object( + chronicle_client.session, "delete", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + delete_log_processing_pipeline(chronicle_client, pipeline_id) + + assert "Failed to delete log processing pipeline" in str(exc_info.value) + + +def test_associate_streams(chronicle_client, mock_response): + """Test associate_streams function.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}, {"feedId": "feed_123"}] + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = associate_streams(chronicle_client, pipeline_id, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}:associateStreams", + json={"streams": streams}, + ) + assert result == {} + + +def test_associate_streams_error(chronicle_client, mock_error_response): + """Test associate_streams with error response.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + associate_streams(chronicle_client, pipeline_id, streams) + + assert "Failed to associate streams" in str(exc_info.value) + + +def test_associate_streams_empty_list(chronicle_client, mock_response): + """Test associate_streams with empty streams list.""" + pipeline_id = "pipeline_12345" + streams = [] + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = associate_streams(chronicle_client, pipeline_id, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}:associateStreams", + json={"streams": []}, + ) + assert result == {} + + +def test_dissociate_streams(chronicle_client, mock_response): + """Test dissociate_streams function.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}, {"feedId": "feed_123"}] + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = dissociate_streams(chronicle_client, pipeline_id, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}:dissociateStreams", + json={"streams": streams}, + ) + assert result == {} + + +def test_dissociate_streams_error(chronicle_client, mock_error_response): + """Test dissociate_streams with error response.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + dissociate_streams(chronicle_client, pipeline_id, streams) + + assert "Failed to dissociate streams" in str(exc_info.value) + + +def test_fetch_associated_pipeline_with_log_type( + chronicle_client, mock_response +): + """Test fetch_associated_pipeline with logType.""" + stream = {"logType": "WINEVTLOG"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = fetch_associated_pipeline(chronicle_client, stream) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchAssociatedPipeline", + params={"stream.logType": "WINEVTLOG"}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_associated_pipeline_with_feed_id( + chronicle_client, mock_response +): + """Test fetch_associated_pipeline with feedId.""" + stream = {"feedId": "feed_123"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = fetch_associated_pipeline(chronicle_client, stream) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchAssociatedPipeline", + params={"stream.feedId": "feed_123"}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_associated_pipeline_with_multiple_fields( + chronicle_client, mock_response +): + """Test fetch_associated_pipeline with multiple stream fields.""" + stream = {"logType": "WINEVTLOG", "namespace": "test"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = fetch_associated_pipeline(chronicle_client, stream) + + mock_get.assert_called_once() + call_args = mock_get.call_args + assert "stream.logType" in call_args[1]["params"] + assert "stream.namespace" in call_args[1]["params"] + assert result == mock_response.json.return_value + + +def test_fetch_associated_pipeline_error(chronicle_client, mock_error_response): + """Test fetch_associated_pipeline with error response.""" + stream = {"logType": "WINEVTLOG"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + fetch_associated_pipeline(chronicle_client, stream) + + assert "Failed to fetch associated pipeline" in str(exc_info.value) + + +def test_fetch_sample_logs_by_streams(chronicle_client, mock_response): + """Test fetch_sample_logs_by_streams function.""" + streams = [{"logType": "WINEVTLOG"}, {"feedId": "feed_123"}] + mock_response.json.return_value = { + "logs": [{"data": "log1"}, {"data": "log2"}] + } + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = fetch_sample_logs_by_streams(chronicle_client, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchSampleLogsByStreams", + json={"streams": streams}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_sample_logs_by_streams_with_count( + chronicle_client, mock_response +): + """Test fetch_sample_logs_by_streams with sample count.""" + streams = [{"logType": "WINEVTLOG"}] + sample_logs_count = 50 + mock_response.json.return_value = {"logs": []} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = fetch_sample_logs_by_streams( + chronicle_client, streams, sample_logs_count=sample_logs_count + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchSampleLogsByStreams", + json={"streams": streams, "sampleLogsCount": sample_logs_count}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_sample_logs_by_streams_error( + chronicle_client, mock_error_response +): + """Test fetch_sample_logs_by_streams with error response.""" + streams = [{"logType": "WINEVTLOG"}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + fetch_sample_logs_by_streams(chronicle_client, streams) + + assert "Failed to fetch sample logs by streams" in str(exc_info.value) + + +def test_fetch_sample_logs_by_streams_empty_streams( + chronicle_client, mock_response +): + """Test fetch_sample_logs_by_streams with empty streams list.""" + streams = [] + mock_response.json.return_value = {"logs": []} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = fetch_sample_logs_by_streams(chronicle_client, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchSampleLogsByStreams", + json={"streams": []}, + ) + assert result == mock_response.json.return_value + + +def test_test_pipeline(chronicle_client, mock_response): + """Test test_pipeline function.""" + pipeline_config = { + "displayName": "Test Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + input_logs = [ + {"data": "bG9nMQ==", "logEntryTime": "2024-01-01T00:00:00Z"}, + {"data": "bG9nMg==", "logEntryTime": "2024-01-01T00:00:01Z"}, + ] + mock_response.json.return_value = {"logs": input_logs} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:testPipeline", + json={ + "logProcessingPipeline": pipeline_config, + "inputLogs": input_logs, + }, + ) + assert result == mock_response.json.return_value + + +def test_test_pipeline_error(chronicle_client, mock_error_response): + """Test test_pipeline with error response.""" + pipeline_config = {"displayName": "Test Pipeline"} + input_logs = [{"data": "bG9nMQ=="}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + assert "Failed to test pipeline" in str(exc_info.value) + + +def test_test_pipeline_empty_logs(chronicle_client, mock_response): + """Test test_pipeline with empty input logs.""" + pipeline_config = { + "displayName": "Test Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + input_logs = [] + mock_response.json.return_value = {"logs": []} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:testPipeline", + json={ + "logProcessingPipeline": pipeline_config, + "inputLogs": [], + }, + ) + assert result == mock_response.json.return_value + + +def test_test_pipeline_with_complex_processors(chronicle_client, mock_response): + """Test test_pipeline with complex processor configuration.""" + pipeline_config = { + "displayName": "Complex Pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*error.*"], + } + } + }, + { + "transformProcessor": { + "fields": [{"field": "message", "transformation": "upper"}] + } + }, + ], + } + input_logs = [{"data": "bG9nMQ==", "logEntryTime": "2024-01-01T00:00:00Z"}] + mock_response.json.return_value = {"logs": input_logs} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + mock_post.assert_called_once() + call_kwargs = mock_post.call_args[1] + assert call_kwargs["json"]["logProcessingPipeline"] == pipeline_config + assert result == mock_response.json.return_value From 345f714e10b970ec91254fe3d2c74432aef50548 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Tue, 16 Dec 2025 16:22:09 +0530 Subject: [PATCH 06/10] chore: added integration tests. Added documentation. --- CLI.md | 181 +++++ README.md | 246 +++++++ .../chronicle/log_processing_pipelines.py | 2 +- ...est_log_processing_pipeline_integration.py | 406 +++++++++++ tests/cli/test_log_processing_integration.py | 655 ++++++++++++++++++ 5 files changed, 1489 insertions(+), 1 deletion(-) create mode 100644 tests/chronicle/test_log_processing_pipeline_integration.py create mode 100644 tests/cli/test_log_processing_integration.py diff --git a/CLI.md b/CLI.md index 63a9efca..ba997e68 100644 --- a/CLI.md +++ b/CLI.md @@ -325,6 +325,187 @@ secops log generate-udm-mapping \ --compress-array-fields "false" ``` +### Log Processing Pipelines + +Chronicle log processing pipelines allow you to transform, filter, and enrich log data before it is stored in Chronicle. Common use cases include removing empty key-value pairs, redacting sensitive data, adding ingestion labels, filtering logs by field values, and extracting host information. Pipelines can be associated with log types (with optional collector IDs) and feeds, providing flexible control over your data ingestion workflow. + +The CLI provides comprehensive commands for managing pipelines, associating streams, testing configurations, and fetching sample logs. + +#### List pipelines + +```bash +# List all log processing pipelines +secops log-processing list + +# List with pagination +secops log-processing list --page-size 50 + +# List with filter expression +secops log-processing list --filter "displayName:production*" + +# List with pagination token +secops log-processing list --page-size 50 --page-token "next_page_token" +``` + +#### Get pipeline details + +```bash +# Get a specific pipeline by ID +secops log-processing get --id "1234567890" +``` + +#### Create a pipeline + +```bash +# Create from inline JSON +secops log-processing create --pipeline '{"displayName":"My Pipeline","description":"Filters error logs","processors":[{"filterProcessor":{"include":{"logMatchType":"REGEXP","logBodies":[".*error.*"]},"errorMode":"IGNORE"}}]}' +``` + +# Create from JSON file +secops log-processing create --pipeline pipeline_config.json + +Example `pipeline_config.json`: +```json +{ + "displayName": "Production Pipeline", + "description": "Filters and transforms production logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*error.*", ".*warning.*"] + }, + "errorMode": "IGNORE" + } + } + ], + "customMetadata": [ + {"key": "environment", "value": "production"}, + {"key": "team", "value": "security"} + ] +} +``` + +#### Update a pipeline + +```bash +# Update from JSON file with update mask +secops log-processing update --id "1234567890" --pipeline updated_config.json --update-mask "description" + +# Update from inline JSON +secops log-processing update --id "1234567890" --pipeline '{description":"Updated description"}' --update-mask "description" +``` + +#### Delete a pipeline + +```bash +# Delete a pipeline by ID +secops log-processing delete --id "1234567890" + +# Delete with etag for concurrency control +secops log-processing delete --id "1234567890" --etag "etag_value" +``` + +#### Associate streams with a pipeline + +Associate log streams (by log type or feed) with a pipeline: + +```bash +# Associate by log type (inline) +secops log-processing associate-streams --id "1234567890" --streams '[{"logType":"WINEVTLOG"},{"logType":"LINUX"}]' + +# Associate by feed ID +secops log-processing associate-streams --id "1234567890" --streams '[{"feed":"feed-uuid-1"},{"feed":"feed-uuid-2"}]' + +# Associate by log type (from file) +secops log-processing associate-streams --id "1234567890" --streams streams.json +``` + +Example `streams.json`: +```json +[ + {"logType": "WINEVTLOG"}, + {"logType": "LINUX"}, + {"logType": "OKTA"} +] +``` + +#### Dissociate streams from a pipeline + +```bash +# Dissociate streams (from file) +secops log-processing dissociate-streams --id "1234567890" --streams streams.json + +# Dissociate streams (inline) +secops log-processing dissociate-streams --id "1234567890" --streams '[{"logType":"WINEVTLOG"}]' +``` + +#### Fetch associated pipeline + +Find which pipeline is associated with a specific stream: + +```bash +# Find pipeline for a log type (inline) +secops log-processing fetch-associated --stream '{"logType":"WINEVTLOG"}' + +# Find pipeline for a feed +secops log-processing fetch-associated --stream '{"feed":"feed-uuid"}' + +# Find pipeline for a log type (from file) +secops log-processing fetch-associated --stream stream_query.json +``` + +Example `stream_query.json`: +```json +{ + "logType": "WINEVTLOG" +} +``` + +#### Fetch sample logs + +Retrieve sample logs for specific streams: + +```bash +# Fetch sample logs for log types (from file) +secops log-processing fetch-sample-logs --streams streams.json --count 10 + +# Fetch sample logs (inline) +secops log-processing fetch-sample-logs --streams '[{"logType":"WINEVTLOG"},{"logType":"LINUX"}]' --count 5 + +# Fetch sample logs for feeds +secops log-processing fetch-sample-logs --streams '[{"feed":"feed-uuid"}]' --count 10 +``` + +#### Test a pipeline + +Test a pipeline configuration against sample logs before deployment: + +```bash +# Test with inline JSON +secops log-processing test --pipeline '{"displayName":"Test","processors":[{"filterProcessor":{"include":{"logMatchType":"REGEXP","logBodies":[".*"]},"errorMode":"IGNORE"}}]}' --input-logs input_logs.json + +# Test with files +secops log-processing test --pipeline pipeline_config.json --input-logs test_logs.json +``` + +Example `input_logs.json` (logs must have base64-encoded data): +```json +[ + { + "data": "U2FtcGxlIGxvZyBlbnRyeQ==", + "logEntryTime": "2024-01-01T00:00:00Z", + "collectionTime": "2024-01-01T00:00:00Z" + }, + { + "data": "QW5vdGhlciBsb2cgZW50cnk=", + "logEntryTime": "2024-01-01T00:01:00Z", + "collectionTime": "2024-01-01T00:01:00Z" + } +] +``` + ### Parser Management Parsers in Chronicle are used to process and normalize raw log data into UDM (Unified Data Model) format. The CLI provides comprehensive parser management capabilities. diff --git a/README.md b/README.md index acaccf99..fe0a0a38 100644 --- a/README.md +++ b/README.md @@ -518,6 +518,252 @@ chronicle.delete_forwarder(forwarder_id="1234567890") print("Forwarder deleted successfully") ``` +### Log Processing Pipelines + +Chronicle log processing pipelines allow you to transform, filter, and enrich log data before it is stored in Chronicle. Common use cases include removing empty key-value pairs, redacting sensitive data, adding ingestion labels, filtering logs by field values, and extracting host information. Pipelines can be associated with log types (with optional collector IDs) and feeds, providing flexible control over your data ingestion workflow. + +The SDK provides comprehensive methods for managing pipelines, associating streams, testing configurations, and fetching sample logs. + +#### List pipelines + +Retrieve all log processing pipelines in your Chronicle instance: + +```python +# Get all pipelines +result = chronicle.list_log_processing_pipelines() +pipelines = result.get("logProcessingPipelines", []) + +for pipeline in pipelines: + pipeline_id = pipeline["name"].split("/")[-1] + print(f"Pipeline: {pipeline['displayName']} (ID: {pipeline_id})") + +# List with pagination +result = chronicle.list_log_processing_pipelines( + page_size=50, + page_token="next_page_token" +) +``` + +#### Get pipeline details + +Retrieve details about a specific pipeline: + +```python +# Get pipeline by ID +pipeline_id = "1234567890" +pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + +print(f"Name: {pipeline['displayName']}") +print(f"Description: {pipeline.get('description', 'N/A')}") +print(f"Processors: {len(pipeline.get('processors', []))}") +``` + +#### Create a pipeline + +Create a new log processing pipeline with processors: + +```python +# Define pipeline configuration +pipeline_config = { + "displayName": "My Custom Pipeline", + "description": "Filters and transforms application logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*error.*", ".*warning.*"], + }, + "errorMode": "IGNORE", + } + } + ], + "customMetadata": [ + {"key": "environment", "value": "production"}, + {"key": "team", "value": "security"} + ] +} + +# Create the pipeline (server generates ID) +created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config +) + +pipeline_id = created_pipeline["name"].split("/")[-1] +print(f"Created pipeline with ID: {pipeline_id}") +``` + +#### Update a pipeline + +Update an existing pipeline's configuration: + +```python +# Get the existing pipeline first +pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + +# Update specific fields +updated_config = { + "name": pipeline["name"], + "description": "Updated description", + "processors": pipeline["processors"] +} + +# Patch with update mask +updated_pipeline = chronicle.patch_log_processing_pipeline( + pipeline_id=pipeline_id, + pipeline=updated_config, + update_mask="description" +) + +print(f"Updated: {updated_pipeline['displayName']}") +``` + +#### Delete a pipeline + +Delete an existing pipeline: + +```python +# Delete by ID +chronicle.delete_log_processing_pipeline(pipeline_id) +print("Pipeline deleted successfully") + +# Delete with etag for concurrency control +chronicle.delete_log_processing_pipeline( + pipeline_id=pipeline_id, + etag="etag_value" +) +``` + +#### Associate streams with a pipeline + +Associate log streams (by log type or feed) with a pipeline: + +```python +# Associate by log type +streams = [ + {"logType": "WINEVTLOG"}, + {"logType": "LINUX"} +] + +chronicle.associate_streams( + pipeline_id=pipeline_id, + streams=streams +) +print("Streams associated successfully") + +# Associate by feed ID +feed_streams = [ + {"feed": "feed-uuid-1"}, + {"feed": "feed-uuid-2"} +] + +chronicle.associate_streams( + pipeline_id=pipeline_id, + streams=feed_streams +) +``` + +#### Dissociate streams from a pipeline + +Remove stream associations from a pipeline: + +```python +# Dissociate streams +streams = [{"logType": "WINEVTLOG"}] + +chronicle.dissociate_streams( + pipeline_id=pipeline_id, + streams=streams +) +print("Streams dissociated successfully") +``` + +#### Fetch associated pipeline + +Find which pipeline is associated with a specific stream: + +```python +# Find pipeline for a log type +stream_query = {"logType": "WINEVTLOG"} +associated = chronicle.fetch_associated_pipeline(stream=stream_query) + +if associated: + print(f"Associated pipeline: {associated['name']}") +else: + print("No pipeline associated with this stream") + +# Find pipeline for a feed +feed_query = {"feed": "feed-uuid"} +associated = chronicle.fetch_associated_pipeline(stream=feed_query) +``` + +#### Fetch sample logs + +Retrieve sample logs for specific streams: + +```python +# Fetch sample logs for log types +streams = [ + {"logType": "WINEVTLOG"}, + {"logType": "LINUX"} +] + +result = chronicle.fetch_sample_logs_by_streams( + streams=streams, + sample_logs_count=10 +) + +for log in result.get("logs", []): + print(f"Log: {log}") +``` + +#### Test a pipeline + +Test a pipeline configuration against sample logs before deployment: + +```python +import base64 +from datetime import datetime, timezone + +# Define pipeline to test +pipeline_config = { + "displayName": "Test Pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ] +} + +# Create test logs with base64-encoded data +current_time = datetime.now(timezone.utc).isoformat() +log_data = base64.b64encode(b"Sample log entry").decode("utf-8") + +input_logs = [ + { + "data": log_data, + "logEntryTime": current_time, + "collectionTime": current_time, + } +] + +# Test the pipeline +result = chronicle.test_pipeline( + pipeline=pipeline_config, + input_logs=input_logs +) + +print(f"Processed {len(result.get('logs', []))} logs") +for processed_log in result.get("logs", []): + print(f"Result: {processed_log}") +``` + 5. Use custom timestamps: ```python from datetime import datetime, timedelta, timezone diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py index 2b818d10..da33818a 100644 --- a/src/secops/chronicle/log_processing_pipelines.py +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -341,7 +341,7 @@ def fetch_sample_logs_by_streams( f"logProcessingPipelines:fetchSampleLogsByStreams" ) - body: dict[str, Any] = {"streams": streams} + body = {"streams": streams} if sample_logs_count is not None: body["sampleLogsCount"] = sample_logs_count diff --git a/tests/chronicle/test_log_processing_pipeline_integration.py b/tests/chronicle/test_log_processing_pipeline_integration.py new file mode 100644 index 00000000..ac49ca19 --- /dev/null +++ b/tests/chronicle/test_log_processing_pipeline_integration.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for log processing pipeline endpoints. + +These tests require valid credentials and API access. +""" +import pytest +import time +import uuid +import base64 +from datetime import datetime, timezone +from secops import SecOpsClient +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON +from secops.exceptions import APIError + + +@pytest.mark.integration +def test_log_processing_pipeline_crud_workflow(): + """Test CRUD workflow for log processing pipelines.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Integration test pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + created_pipeline = None + + try: + # Test CREATE + print(f"Creating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + # Extract pipeline ID from the name + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + + assert created_pipeline is not None + assert "name" in created_pipeline + assert created_pipeline.get("displayName") == display_name + print(f"Pipeline created: {created_pipeline['name']}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Test GET + print(f"Getting pipeline: {pipeline_id}") + retrieved_pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + assert retrieved_pipeline is not None + assert retrieved_pipeline.get("displayName") == display_name + print(f"Pipeline retrieved: {retrieved_pipeline['name']}") + + # Test LIST + print("Listing pipelines") + list_result = chronicle.list_log_processing_pipelines(page_size=10) + assert "logProcessingPipelines" in list_result + pipelines = list_result["logProcessingPipelines"] + pipeline_ids = [p["name"].split("/")[-1] for p in pipelines] + assert pipeline_id in pipeline_ids + print(f"Found {len(pipelines)} pipelines") + + # Test PATCH + updated_display_name = f"Updated Pipeline {unique_id}" + updated_config = { + "name": created_pipeline.get("name"), + "displayName": updated_display_name, + "description": "Updated description", + "processors": created_pipeline.get("processors"), + } + print(f"Updating pipeline: {pipeline_id}") + updated_pipeline = chronicle.patch_log_processing_pipeline( + pipeline_id=pipeline_id, + pipeline=updated_config, + update_mask="displayName,description", + ) + assert updated_pipeline is not None + assert updated_pipeline.get("displayName") == updated_display_name + print(f"Pipeline updated: {updated_pipeline['displayName']}") + + # Verify update + time.sleep(2) + verified_pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + assert verified_pipeline.get("displayName") == updated_display_name + print("Pipeline update verified") + + except APIError as e: + print(f"Pipeline CRUD test failed: {str(e)}") + pytest.fail(f"Pipeline CRUD test failed due to API error: {str(e)}") + + finally: + # Test DELETE - cleanup + if created_pipeline: + try: + print(f"Deleting pipeline: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + + # Verify deletion + time.sleep(2) + try: + chronicle.get_log_processing_pipeline(pipeline_id) + pytest.fail("Pipeline still exists after deletion") + except APIError: + print("Pipeline deletion verified") + + except APIError as e: + print(f"Warning: Failed to delete test pipeline: {str(e)}") + + +@pytest.mark.integration +def test_log_processing_pipeline_stream_operations(): + """Test stream association and dissociation workflow.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Stream Test Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Integration test for stream operations", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + created_pipeline = None + + try: + # Create pipeline + print(f"Creating pipeline for stream test: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + # Extract pipeline ID from the name + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + assert created_pipeline is not None + print(f"Pipeline created: {created_pipeline['name']}") + time.sleep(2) + + # Test ASSOCIATE STREAMS + streams = [{"logType": "WINEVTLOG"}] + print(f"Associating streams to pipeline: {pipeline_id}") + associate_result = chronicle.associate_streams( + pipeline_id=pipeline_id, streams=streams + ) + assert associate_result is not None + print("Streams associated successfully") + time.sleep(2) + + # Test FETCH ASSOCIATED PIPELINE + print("Fetching associated pipeline by stream") + stream_query = {"logType": "WINEVTLOG"} + associated_pipeline = chronicle.fetch_associated_pipeline( + stream=stream_query + ) + assert associated_pipeline is not None + print(f"Associated pipeline: {associated_pipeline.get('name', 'N/A')}") + + # Test DISSOCIATE STREAMS + print(f"Dissociating streams from pipeline: {pipeline_id}") + dissociate_result = chronicle.dissociate_streams( + pipeline_id=pipeline_id, streams=streams + ) + assert dissociate_result is not None + print("Streams dissociated successfully") + + except APIError as e: + print(f"Stream operations test failed: {str(e)}") + pytest.fail(f"Stream operations test failed due to API error: {str(e)}") + + finally: + # Cleanup + if created_pipeline: + try: + print(f"Deleting pipeline: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except APIError as e: + print(f"Warning: Failed to delete test pipeline: {str(e)}") + + +@pytest.mark.integration +def test_fetch_sample_logs_by_streams(): + """Test fetching sample logs by streams.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Sample Logs Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Pipeline for testing sample logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + pipeline_id = None + try: + # Create the pipeline + print(f"Creating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + pipeline_id = created_pipeline["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + # Associate CS_EDR log type with the pipeline + streams = [{"logType": "CS_EDR"}] + print(f"Associating streams: {streams}") + chronicle.associate_streams(pipeline_id=pipeline_id, streams=streams) + print("Streams associated successfully") + + # Wait briefly for association to propagate + time.sleep(10) + + # Fetch sample logs for the log type + print(f"Fetching sample logs for streams: {streams}") + result = chronicle.fetch_sample_logs_by_streams( + streams=streams, sample_logs_count=5 + ) + + assert result is not None + if not result or ("logs" not in result and "sampleLogs" not in result): + pytest.skip("No sample logs found for CS_EDR log type") + + logs = result.get("logs", result.get("sampleLogs", [])) + print(f"Fetched sample logs: {len(logs)} logs") + assert len(logs) > 0, "Expected at least one sample log" + + except APIError as e: + print(f"Fetch sample logs test failed: {str(e)}") + pytest.skip( + f"Fetch sample logs test skipped due to API error: {str(e)}" + ) + + finally: + # Cleanup: Delete the created pipeline + if pipeline_id: + try: + print(f"Deleting pipeline: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Test pipeline deleted successfully") + except APIError as e: + print(f"Warning: Failed to delete test pipeline: {str(e)}") + + +@pytest.mark.integration +def test_pipeline_testing_functionality(): + """Test the test_pipeline functionality.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Pipeline configuration for testing + pipeline_config = { + "displayName": "Test Pipeline Config", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + # Create test input logs + current_time = datetime.now(timezone.utc).isoformat() + log_data_1 = base64.b64encode(b"Sample log line 1").decode("utf-8") + log_data_2 = base64.b64encode(b"Sample log line 2").decode("utf-8") + + input_logs = [ + { + "data": log_data_1, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + { + "data": log_data_2, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + ] + + try: + print("Testing pipeline with input logs") + print(f"Pipeline: {pipeline_config['displayName']}") + print(f"Number of input logs: {len(input_logs)}") + + result = chronicle.test_pipeline( + pipeline=pipeline_config, input_logs=input_logs + ) + + assert result is not None + assert "logs" in result + + processed_logs = result.get("logs", []) + print(f"Pipeline test completed: {len(processed_logs)} logs processed") + + if processed_logs: + print("First processed log data present") + assert len(processed_logs) > 0 + + except APIError as e: + print(f"Test pipeline functionality failed: {str(e)}") + pytest.skip( + f"Test pipeline functionality skipped due to API error: {str(e)}" + ) + + +@pytest.mark.integration +def test_list_pipelines_with_pagination(): + """Test listing pipelines with pagination.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + try: + # Get first page with small page size + print("Fetching first page of pipelines") + first_page = chronicle.list_log_processing_pipelines(page_size=1) + + assert first_page is not None + assert "logProcessingPipelines" in first_page + pipelines = first_page.get("logProcessingPipelines", []) + print(f"First page: {len(pipelines)} pipelines") + + # If there's a next page token, fetch next page + next_token = first_page.get("nextPageToken") + if next_token: + print("Fetching second page of pipelines") + second_page = chronicle.list_log_processing_pipelines( + page_size=1, page_token=next_token + ) + assert second_page is not None + pipelines_2 = second_page.get("logProcessingPipelines", []) + print(f"Second page: {len(pipelines_2)} pipelines") + + # Verify pagination works correctly + if pipelines and pipelines_2: + assert pipelines[0].get("name") != pipelines_2[0].get("name") + print("Pagination verified successfully") + else: + print("No second page available for pagination test") + + except APIError as e: + print(f"List pipelines pagination test failed: {str(e)}") + pytest.skip( + f"List pipelines pagination test skipped due to API error: {str(e)}" + ) diff --git a/tests/cli/test_log_processing_integration.py b/tests/cli/test_log_processing_integration.py new file mode 100644 index 00000000..1493ef4b --- /dev/null +++ b/tests/cli/test_log_processing_integration.py @@ -0,0 +1,655 @@ +"""Integration tests for the SecOps CLI log processing commands.""" + +import base64 +import json +import os +import subprocess +import time +import uuid +from datetime import datetime, timezone + +import pytest + +from tests.config import CHRONICLE_CONFIG + + +@pytest.mark.integration +def test_cli_log_processing_crud_workflow(cli_env, common_args, tmp_path): + """Test the log processing pipeline create, update, and delete.""" + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + pipeline_config = { + "displayName": display_name, + "description": "CLI integration test pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + + try: + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(config_file), + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + assert create_result.returncode == 0 + + pipeline_data = json.loads(create_result.stdout) + assert "name" in pipeline_data + pipeline_id = pipeline_data["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + time.sleep(2) + + updated_display_name = f"Updated Pipeline {unique_id}" + updated_config = { + "name": pipeline_data.get("name"), + "displayName": updated_display_name, + "description": "Updated CLI integration test pipeline", + "processors": pipeline_data.get("processors"), + } + + updated_config_file = tmp_path / "updated_pipeline_config.json" + updated_config_file.write_text(json.dumps(updated_config)) + + update_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "update", + "--id", + pipeline_id, + "--pipeline", + str(updated_config_file), + "--update-mask", + "displayName,description", + ] + ) + + update_result = subprocess.run( + update_cmd, env=cli_env, capture_output=True, text=True + ) + + assert update_result.returncode == 0 + + updated_pipeline = json.loads(update_result.stdout) + assert updated_pipeline["displayName"] == updated_display_name + print(f"Updated pipeline to: {updated_display_name}") + + finally: + if pipeline_id: + delete_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "delete", + "--id", + pipeline_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print(f"Successfully deleted pipeline: {pipeline_id}") + else: + print(f"Failed to delete test pipeline: {delete_result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_stream_operations(cli_env, common_args, tmp_path): + """Test stream association and dissociation commands.""" + unique_id = str(uuid.uuid4())[:8] + display_name = f"Stream Test Pipeline {unique_id}" + + pipeline_config = { + "displayName": display_name, + "description": "CLI test for stream operations", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + + try: + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(config_file), + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + assert create_result.returncode == 0 + + pipeline_data = json.loads(create_result.stdout) + pipeline_id = pipeline_data["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + time.sleep(2) + + streams = [{"logType": "WINEVTLOG"}] + streams_file = tmp_path / "streams.json" + streams_file.write_text(json.dumps(streams)) + + associate_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "associate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + associate_result = subprocess.run( + associate_cmd, env=cli_env, capture_output=True, text=True + ) + + assert associate_result.returncode == 0 + print("Streams associated successfully") + + time.sleep(2) + + dissociate_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "dissociate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + dissociate_result = subprocess.run( + dissociate_cmd, env=cli_env, capture_output=True, text=True + ) + + assert dissociate_result.returncode == 0 + print("Streams dissociated successfully") + + finally: + if pipeline_id: + delete_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "delete", + "--id", + pipeline_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print(f"Successfully deleted pipeline: {pipeline_id}") + else: + print(f"Failed to delete test pipeline: {delete_result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_fetch_associated(cli_env, common_args, tmp_path): + """Test fetch associated pipeline command.""" + unique_id = str(uuid.uuid4())[:8] + display_name = f"Fetch Test Pipeline {unique_id}" + + pipeline_config = { + "displayName": display_name, + "description": "CLI test for fetch associated", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + + try: + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(config_file), + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + assert create_result.returncode == 0 + + pipeline_data = json.loads(create_result.stdout) + pipeline_id = pipeline_data["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + time.sleep(2) + + streams = [{"logType": "WINEVTLOG"}] + streams_file = tmp_path / "streams.json" + streams_file.write_text(json.dumps(streams)) + + associate_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "associate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + associate_result = subprocess.run( + associate_cmd, env=cli_env, capture_output=True, text=True + ) + + assert associate_result.returncode == 0 + print("Streams associated successfully") + + time.sleep(2) + + stream_query = {"logType": "WINEVTLOG"} + stream_file = tmp_path / "stream_query.json" + stream_file.write_text(json.dumps(stream_query)) + + fetch_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "fetch-associated", + "--stream", + str(stream_file), + ] + ) + + fetch_result = subprocess.run( + fetch_cmd, env=cli_env, capture_output=True, text=True + ) + + assert fetch_result.returncode == 0 + + associated_pipeline = json.loads(fetch_result.stdout) + assert "name" in associated_pipeline + print(f"Fetched associated pipeline: {associated_pipeline['name']}") + + finally: + if pipeline_id: + delete_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "delete", + "--id", + pipeline_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print(f"Successfully deleted pipeline: {pipeline_id}") + else: + print(f"Failed to delete test pipeline: {delete_result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_fetch_sample_logs(cli_env, common_args, tmp_path): + """Test fetch sample logs command.""" + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"CLI Test Sample Logs Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "CLI test pipeline for sample logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + pipeline_config_file = tmp_path / "pipeline_config.json" + pipeline_config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + try: + # Create pipeline + create_cmd = ( + ["secops"] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(pipeline_config_file), + ] + ) + + print(f"Creating pipeline: {display_name}") + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + if create_result.returncode != 0: + pytest.skip(f"Failed to create pipeline: {create_result.stderr}") + + created_pipeline = json.loads(create_result.stdout) + pipeline_id = created_pipeline["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + # Associate CS_EDR log type with pipeline + streams = [{"logType": "CS_EDR"}] + streams_file = tmp_path / "streams.json" + streams_file.write_text(json.dumps(streams)) + + associate_cmd = ( + ["secops"] + + common_args + + [ + "log-processing", + "associate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + print(f"Associating streams: {streams}") + associate_result = subprocess.run( + associate_cmd, env=cli_env, capture_output=True, text=True + ) + + if associate_result.returncode != 0: + pytest.skip( + f"Failed to associate streams: {associate_result.stderr}" + ) + + print("Streams associated successfully") + + # Wait for association to propagate + time.sleep(10) + + # Fetch sample logs + fetch_cmd = ( + ["secops"] + + common_args + + [ + "log-processing", + "fetch-sample-logs", + "--streams", + str(streams_file), + "--count", + "5", + ] + ) + + print(f"Fetching sample logs for streams: {streams}") + result = subprocess.run( + fetch_cmd, env=cli_env, capture_output=True, text=True + ) + + if result.returncode == 0: + output = json.loads(result.stdout) + if not output or ( + "logs" not in output and "sampleLogs" not in output + ): + pytest.skip("No sample logs available for CS_EDR log type") + + logs = output.get("logs", output.get("sampleLogs", [])) + print(f"Fetched sample logs: {len(logs)} logs") + assert len(logs) > 0, "Expected at least one sample log" + else: + pytest.skip(f"Fetch sample logs command skipped: {result.stderr}") + + finally: + # Cleanup: Delete the created pipeline + if pipeline_id: + delete_cmd = ( + ["secops"] + + common_args + + ["log-processing", "delete", "--id", pipeline_id] + ) + + print(f"Deleting pipeline: {pipeline_id}") + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print("Test pipeline deleted successfully") + else: + print( + f"Warning: Failed to delete test pipeline: " + f"{delete_result.stderr}" + ) + + +@pytest.mark.integration +def test_cli_log_processing_test_pipeline(cli_env, common_args, tmp_path): + """Test the test pipeline command.""" + pipeline_config = { + "displayName": "Test Pipeline Config", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + current_time = datetime.now(timezone.utc).isoformat() + log_data_1 = base64.b64encode(b"Sample log line 1").decode("utf-8") + log_data_2 = base64.b64encode(b"Sample log line 2").decode("utf-8") + + input_logs = [ + { + "data": log_data_1, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + { + "data": log_data_2, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + ] + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + logs_file = tmp_path / "input_logs.json" + logs_file.write_text(json.dumps(input_logs)) + + cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "test", + "--pipeline", + str(config_file), + "--input-logs", + str(logs_file), + ] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + if result.returncode == 0: + output = json.loads(result.stdout) + assert "logs" in output + print( + f"Pipeline test completed: {len(output.get('logs', []))} processed" + ) + else: + pytest.skip(f"Test pipeline command skipped: {result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_list_with_pagination(cli_env, common_args): + """Test listing pipelines with pagination.""" + cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "list", + "--page-size", + "1", + ] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + assert result.returncode == 0 + + output = json.loads(result.stdout) + assert "logProcessingPipelines" in output + pipelines = output.get("logProcessingPipelines", []) + print(f"First page: {len(pipelines)} pipelines") + + if "nextPageToken" in output: + next_page_token = output["nextPageToken"] + + next_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "list", + "--page-size", + "1", + "--page-token", + next_page_token, + ] + ) + + next_result = subprocess.run( + next_cmd, env=cli_env, capture_output=True, text=True + ) + + assert next_result.returncode == 0 + + next_output = json.loads(next_result.stdout) + assert "logProcessingPipelines" in next_output + next_pipelines = next_output.get("logProcessingPipelines", []) + print(f"Second page: {len(next_pipelines)} pipelines") From a32f025d91b309c0a51f5988929e9d68e7586ddf Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Tue, 16 Dec 2025 17:37:20 +0530 Subject: [PATCH 07/10] chore: updated doc string --- src/secops/chronicle/client.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index cca68320..2a4b97c1 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -1379,7 +1379,11 @@ def create_log_processing_pipeline( """Creates a new log processing pipeline. Args: - pipeline: Pipeline configuration dict. + pipeline: Pipeline configuration dict containing: + - displayName: Display name for the pipeline + - description: Optional description + - processors: List of processor configurations + - customMetadata: Optional custom metadata list pipeline_id: Optional ID for the pipeline. Returns: @@ -1400,7 +1404,11 @@ def patch_log_processing_pipeline( Args: pipeline_id: ID of the pipeline to update. - pipeline: Pipeline configuration with fields to update. + pipeline: Pipeline configuration with fields to update containing: + - displayName: Display name for the pipeline + - description: Optional description + - processors: List of processor configurations + - customMetadata: Optional custom metadata list update_mask: Optional comma-separated list of fields. Returns: From 71ddae5708339efd7b8ea0525d77c496ecd40a43 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Wed, 17 Dec 2025 12:10:35 +0530 Subject: [PATCH 08/10] chore: renamed patch to update. added changelog. updated project version. refactored and formatting. --- CHANGELOG.md | 14 ++++++++++ README.md | 2 +- examples/log_processing_pipelines_example.py | 2 +- pyproject.toml | 2 +- src/secops/chronicle/__init__.py | 4 +-- src/secops/chronicle/client.py | 6 ++--- .../chronicle/log_processing_pipelines.py | 26 +++++++++++-------- src/secops/cli/commands/log_processing.py | 2 +- src/secops/cli/utils/input_utils.py | 4 +-- .../chronicle/test_log_processing_pipeline.py | 26 +++++++++---------- ...est_log_processing_pipeline_integration.py | 2 +- 11 files changed, 54 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d985ea5..90bfd2f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.29.0] - 2025-12-17 +### Added +- Support for following log/data processing pipeline methods: + - List pipelines + - Create pipeline + - Get pipeline details + - Update pipeline + - Delete pipeline + - Associate stream to pipeline + - Dissociate stream from pipeline + - Fetch associated pipeline using stream + - Fetch sample logs by stream + - Test pipeline + ## [0.28.1] - 2025-12-11 ### Updated - CLI to show help when required sub-command/argument not provided. diff --git a/README.md b/README.md index fe0a0a38..d9fce04b 100644 --- a/README.md +++ b/README.md @@ -609,7 +609,7 @@ updated_config = { } # Patch with update mask -updated_pipeline = chronicle.patch_log_processing_pipeline( +updated_pipeline = chronicle.update_log_processing_pipeline( pipeline_id=pipeline_id, pipeline=updated_config, update_mask="description" diff --git a/examples/log_processing_pipelines_example.py b/examples/log_processing_pipelines_example.py index 97a7c5f4..4eff92ce 100644 --- a/examples/log_processing_pipelines_example.py +++ b/examples/log_processing_pipelines_example.py @@ -182,7 +182,7 @@ def example_update_pipeline(chronicle): } print("\nUpdating pipeline...") - updated_pipeline = chronicle.patch_log_processing_pipeline( + updated_pipeline = chronicle.update_log_processing_pipeline( pipeline_id=pipeline_id, pipeline=updated_pipeline_config, update_mask="displayName,description", diff --git a/pyproject.toml b/pyproject.toml index 4388a25d..897997ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.28.1" +version = "0.29.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 560211f7..95660c2e 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -87,7 +87,7 @@ fetch_sample_logs_by_streams, get_log_processing_pipeline, list_log_processing_pipelines, - patch_log_processing_pipeline, + update_log_processing_pipeline, test_pipeline, ) from secops.chronicle.models import ( @@ -320,7 +320,7 @@ "list_log_processing_pipelines", "get_log_processing_pipeline", "create_log_processing_pipeline", - "patch_log_processing_pipeline", + "update_log_processing_pipeline", "delete_log_processing_pipeline", "associate_streams", "dissociate_streams", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 2a4b97c1..92672228 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -139,7 +139,7 @@ list_log_processing_pipelines as _list_log_processing_pipelines, ) from secops.chronicle.log_processing_pipelines import ( - patch_log_processing_pipeline as _patch_log_processing_pipeline, + update_log_processing_pipeline as _update_log_processing_pipeline, ) from secops.chronicle.log_processing_pipelines import ( test_pipeline as _test_pipeline, @@ -1394,7 +1394,7 @@ def create_log_processing_pipeline( """ return _create_log_processing_pipeline(self, pipeline, pipeline_id) - def patch_log_processing_pipeline( + def update_log_processing_pipeline( self, pipeline_id: str, pipeline: dict[str, Any], @@ -1417,7 +1417,7 @@ def patch_log_processing_pipeline( Raises: APIError: If the API request fails. """ - return _patch_log_processing_pipeline( + return _update_log_processing_pipeline( self, pipeline_id, pipeline, update_mask ) diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py index da33818a..3d2779f7 100644 --- a/src/secops/chronicle/log_processing_pipelines.py +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -20,7 +20,7 @@ def list_log_processing_pipelines( - client, + client: "ChronicleClient", page_size: int | None = None, page_token: str | None = None, filter_expr: str | None = None, @@ -62,7 +62,9 @@ def list_log_processing_pipelines( return response.json() -def get_log_processing_pipeline(client, pipeline_id: str) -> dict[str, Any]: +def get_log_processing_pipeline( + client: "ChronicleClient", pipeline_id: str +) -> dict[str, Any]: """Gets a log processing pipeline by ID. Args: @@ -93,7 +95,7 @@ def get_log_processing_pipeline(client, pipeline_id: str) -> dict[str, Any]: def create_log_processing_pipeline( - client, + client: "ChronicleClient", pipeline: dict[str, Any], pipeline_id: str | None = None, ) -> dict[str, Any]: @@ -130,8 +132,8 @@ def create_log_processing_pipeline( return response.json() -def patch_log_processing_pipeline( - client, +def update_log_processing_pipeline( + client: "ChronicleClient", pipeline_id: str, pipeline: dict[str, Any], update_mask: str | None = None, @@ -175,7 +177,7 @@ def patch_log_processing_pipeline( def delete_log_processing_pipeline( - client, pipeline_id: str, etag: str | None = None + client: "ChronicleClient", pipeline_id: str, etag: str | None = None ) -> dict[str, Any]: """Deletes a log processing pipeline. @@ -213,7 +215,7 @@ def delete_log_processing_pipeline( def associate_streams( - client, pipeline_id: str, streams: list[dict[str, Any]] + client: "ChronicleClient", pipeline_id: str, streams: list[dict[str, Any]] ) -> dict[str, Any]: """Associates streams with a log processing pipeline. @@ -247,7 +249,7 @@ def associate_streams( def dissociate_streams( - client, pipeline_id: str, streams: list[dict[str, Any]] + client: "ChronicleClient", pipeline_id: str, streams: list[dict[str, Any]] ) -> dict[str, Any]: """Dissociates streams from a log processing pipeline. @@ -281,7 +283,9 @@ def dissociate_streams( return response.json() -def fetch_associated_pipeline(client, stream: dict[str, Any]) -> dict[str, Any]: +def fetch_associated_pipeline( + client: "ChronicleClient", stream: dict[str, Any] +) -> dict[str, Any]: """Fetches the pipeline associated with a specific stream. Args: @@ -314,7 +318,7 @@ def fetch_associated_pipeline(client, stream: dict[str, Any]) -> dict[str, Any]: def fetch_sample_logs_by_streams( - client, + client: "ChronicleClient", streams: list[dict[str, Any]], sample_logs_count: int | None = None, ) -> dict[str, Any]: @@ -355,7 +359,7 @@ def fetch_sample_logs_by_streams( def test_pipeline( - client, + client: "ChronicleClient", pipeline: dict[str, Any], input_logs: list[dict[str, Any]], ) -> dict[str, Any]: diff --git a/src/secops/cli/commands/log_processing.py b/src/secops/cli/commands/log_processing.py index 25b5ca7d..489c0710 100644 --- a/src/secops/cli/commands/log_processing.py +++ b/src/secops/cli/commands/log_processing.py @@ -226,7 +226,7 @@ def handle_update_command(args, chronicle): print("Error: pipeline must be a JSON object", file=sys.stderr) sys.exit(1) - result = chronicle.patch_log_processing_pipeline( + result = chronicle.update_log_processing_pipeline( pipeline_id=args.id, pipeline=pipeline_config, update_mask=args.update_mask, diff --git a/src/secops/cli/utils/input_utils.py b/src/secops/cli/utils/input_utils.py index f810c72a..8e666a6a 100644 --- a/src/secops/cli/utils/input_utils.py +++ b/src/secops/cli/utils/input_utils.py @@ -40,7 +40,7 @@ def load_json_or_file(value: str) -> Any: except json.JSONDecodeError as e: print(f"Error parsing JSON from file: {e}", file=sys.stderr) sys.exit(1) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught print(f"Error reading file: {e}", file=sys.stderr) sys.exit(1) @@ -72,7 +72,7 @@ def load_string_or_file(value: str) -> str: if file_path.exists() and file_path.is_file(): with open(file_path, encoding="utf-8") as f: return f.read() - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught print(f"Error reading file: {e}", file=sys.stderr) sys.exit(1) diff --git a/tests/chronicle/test_log_processing_pipeline.py b/tests/chronicle/test_log_processing_pipeline.py index 8e3f10f4..15a427ae 100644 --- a/tests/chronicle/test_log_processing_pipeline.py +++ b/tests/chronicle/test_log_processing_pipeline.py @@ -22,7 +22,7 @@ list_log_processing_pipelines, get_log_processing_pipeline, create_log_processing_pipeline, - patch_log_processing_pipeline, + update_log_processing_pipeline, delete_log_processing_pipeline, associate_streams, dissociate_streams, @@ -225,8 +225,8 @@ def test_create_log_processing_pipeline_error( assert "Failed to create log processing pipeline" in str(exc_info.value) -def test_patch_log_processing_pipeline(chronicle_client, mock_response): - """Test patch_log_processing_pipeline function.""" +def test_update_log_processing_pipeline(chronicle_client, mock_response): + """Test update_log_processing_pipeline function.""" pipeline_id = "pipeline_12345" pipeline_config = { "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", @@ -237,7 +237,7 @@ def test_patch_log_processing_pipeline(chronicle_client, mock_response): with patch.object( chronicle_client.session, "patch", return_value=mock_response ) as mock_patch: - result = patch_log_processing_pipeline( + result = update_log_processing_pipeline( chronicle_client, pipeline_id, pipeline_config ) @@ -249,10 +249,10 @@ def test_patch_log_processing_pipeline(chronicle_client, mock_response): assert result == mock_response.json.return_value -def test_patch_log_processing_pipeline_with_update_mask( +def test_update_log_processing_pipeline_with_update_mask( chronicle_client, mock_response ): - """Test patch_log_processing_pipeline with update mask.""" + """Test update_log_processing_pipeline with update mask.""" pipeline_id = "pipeline_12345" pipeline_config = { "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", @@ -263,7 +263,7 @@ def test_patch_log_processing_pipeline_with_update_mask( with patch.object( chronicle_client.session, "patch", return_value=mock_response ) as mock_patch: - result = patch_log_processing_pipeline( + result = update_log_processing_pipeline( chronicle_client, pipeline_id, pipeline_config, @@ -278,10 +278,10 @@ def test_patch_log_processing_pipeline_with_update_mask( assert result == mock_response.json.return_value -def test_patch_log_processing_pipeline_with_full_name( +def test_update_log_processing_pipeline_with_full_name( chronicle_client, mock_response ): - """Test patch_log_processing_pipeline with full resource name.""" + """Test update_log_processing_pipeline with full resource name.""" full_name = "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345" pipeline_config = { "name": full_name, @@ -291,7 +291,7 @@ def test_patch_log_processing_pipeline_with_full_name( with patch.object( chronicle_client.session, "patch", return_value=mock_response ) as mock_patch: - result = patch_log_processing_pipeline( + result = update_log_processing_pipeline( chronicle_client, full_name, pipeline_config ) @@ -303,10 +303,10 @@ def test_patch_log_processing_pipeline_with_full_name( assert result == mock_response.json.return_value -def test_patch_log_processing_pipeline_error( +def test_update_log_processing_pipeline_error( chronicle_client, mock_error_response ): - """Test patch_log_processing_pipeline with error response.""" + """Test update_log_processing_pipeline with error response.""" pipeline_id = "pipeline_12345" pipeline_config = {"displayName": "Updated Pipeline"} @@ -314,7 +314,7 @@ def test_patch_log_processing_pipeline_error( chronicle_client.session, "patch", return_value=mock_error_response ): with pytest.raises(APIError) as exc_info: - patch_log_processing_pipeline( + update_log_processing_pipeline( chronicle_client, pipeline_id, pipeline_config ) diff --git a/tests/chronicle/test_log_processing_pipeline_integration.py b/tests/chronicle/test_log_processing_pipeline_integration.py index ac49ca19..8781b9a6 100644 --- a/tests/chronicle/test_log_processing_pipeline_integration.py +++ b/tests/chronicle/test_log_processing_pipeline_integration.py @@ -99,7 +99,7 @@ def test_log_processing_pipeline_crud_workflow(): "processors": created_pipeline.get("processors"), } print(f"Updating pipeline: {pipeline_id}") - updated_pipeline = chronicle.patch_log_processing_pipeline( + updated_pipeline = chronicle.update_log_processing_pipeline( pipeline_id=pipeline_id, pipeline=updated_config, update_mask="displayName,description", From 7617b7a9ddaa35c798a97ad9fe98414b7f68c092 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Wed, 17 Dec 2025 12:19:10 +0530 Subject: [PATCH 09/10] chore: added API mapping --- api_module_mapping.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/api_module_mapping.md b/api_module_mapping.md index 900d3894..0ed9f0fc 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -300,6 +300,16 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |logTypes.patch |v1alpha| | | |logTypes.runParser |v1alpha|chronicle.parser.run_parser |secops parser run | |logTypes.updateLogTypeSetting |v1alpha| | | +|logProcessingPipelines.associateStreams |v1alpha|chronicle.log_processing_pipelines.associate_streams |secops log-processing associate-streams| +|logProcessingPipelines.create |v1alpha|chronicle.log_processing_pipelines.create_log_processing_pipeline|secops log-processing create | +|logProcessingPipelines.delete |v1alpha|chronicle.log_processing_pipelines.delete_log_processing_pipeline|secops log-processing delete | +|logProcessingPipelines.dissociateStreams |v1alpha|chronicle.log_processing_pipelines.dissociate_streams |secops log-processing dissociate-streams| +|logProcessingPipelines.fetchAssociatedPipeline |v1alpha|chronicle.log_processing_pipelines.fetch_associated_pipeline|secops log-processing fetch-associated | +|logProcessingPipelines.fetchSampleLogsByStreams |v1alpha|chronicle.log_processing_pipelines.fetch_sample_logs_by_streams|secops log-processing fetch-sample-logs| +|logProcessingPipelines.get |v1alpha|chronicle.log_processing_pipelines.get_log_processing_pipeline|secops log-processing get | +|logProcessingPipelines.list |v1alpha|chronicle.log_processing_pipelines.list_log_processing_pipelines|secops log-processing list | +|logProcessingPipelines.patch |v1alpha|chronicle.log_processing_pipelines.update_log_processing_pipeline|secops log-processing update | +|logProcessingPipelines.testPipeline |v1alpha|chronicle.log_processing_pipelines.test_pipeline |secops log-processing test | |logs.classify |v1alpha| | | | nativeDashboards.addChart | v1alpha |chronicle.dashboard.add_chart |secops dashboard add-chart | | nativeDashboards.create | v1alpha |chronicle.dashboard.create_dashboard |secops dashboard create | From 9f8f9ab8d7a5c70e5995af923638385630c863d3 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:07:22 +0530 Subject: [PATCH 10/10] chore: minor refactor --- examples/log_processing_pipelines_example.py | 14 ++++++++++ src/secops/chronicle/__init__.py | 28 ++++++++++---------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/examples/log_processing_pipelines_example.py b/examples/log_processing_pipelines_example.py index 4eff92ce..cbeff285 100644 --- a/examples/log_processing_pipelines_example.py +++ b/examples/log_processing_pipelines_example.py @@ -1,4 +1,18 @@ #!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# """Example usage of the Google SecOps SDK for Log Processing Pipelines.""" import argparse diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 95660c2e..50522d9b 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -72,12 +72,6 @@ list_forwarders, update_forwarder, ) -from secops.chronicle.log_types import ( - get_all_log_types, - get_log_type_description, - is_valid_log_type, - search_log_types, -) from secops.chronicle.log_processing_pipelines import ( associate_streams, create_log_processing_pipeline, @@ -87,8 +81,14 @@ fetch_sample_logs_by_streams, get_log_processing_pipeline, list_log_processing_pipelines, - update_log_processing_pipeline, test_pipeline, + update_log_processing_pipeline, +) +from secops.chronicle.log_types import ( + get_all_log_types, + get_log_type_description, + is_valid_log_type, + search_log_types, ) from secops.chronicle.models import ( AlertCount, @@ -149,18 +149,18 @@ from secops.chronicle.rule_retrohunt import create_retrohunt, get_retrohunt from secops.chronicle.rule_set import ( batch_update_curated_rule_set_deployments, - list_curated_rule_sets, - list_curated_rule_set_categories, - list_curated_rules, get_curated_rule, - get_curated_rule_set_category, + get_curated_rule_by_name, get_curated_rule_set, - list_curated_rule_set_deployments, + get_curated_rule_set_category, get_curated_rule_set_deployment, get_curated_rule_set_deployment_by_name, - get_curated_rule_by_name, - update_curated_rule_set_deployment, + list_curated_rule_set_categories, + list_curated_rule_set_deployments, + list_curated_rule_sets, + list_curated_rules, search_curated_detections, + update_curated_rule_set_deployment, ) from secops.chronicle.rule_validation import ValidationResult from secops.chronicle.search import search_udm