diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d985ea5..90bfd2f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.29.0] - 2025-12-17 +### Added +- Support for following log/data processing pipeline methods: + - List pipelines + - Create pipeline + - Get pipeline details + - Update pipeline + - Delete pipeline + - Associate stream to pipeline + - Dissociate stream from pipeline + - Fetch associated pipeline using stream + - Fetch sample logs by stream + - Test pipeline + ## [0.28.1] - 2025-12-11 ### Updated - CLI to show help when required sub-command/argument not provided. diff --git a/CLI.md b/CLI.md index 63a9efca..ba997e68 100644 --- a/CLI.md +++ b/CLI.md @@ -325,6 +325,187 @@ secops log generate-udm-mapping \ --compress-array-fields "false" ``` +### Log Processing Pipelines + +Chronicle log processing pipelines allow you to transform, filter, and enrich log data before it is stored in Chronicle. Common use cases include removing empty key-value pairs, redacting sensitive data, adding ingestion labels, filtering logs by field values, and extracting host information. Pipelines can be associated with log types (with optional collector IDs) and feeds, providing flexible control over your data ingestion workflow. + +The CLI provides comprehensive commands for managing pipelines, associating streams, testing configurations, and fetching sample logs. + +#### List pipelines + +```bash +# List all log processing pipelines +secops log-processing list + +# List with pagination +secops log-processing list --page-size 50 + +# List with filter expression +secops log-processing list --filter "displayName:production*" + +# List with pagination token +secops log-processing list --page-size 50 --page-token "next_page_token" +``` + +#### Get pipeline details + +```bash +# Get a specific pipeline by ID +secops log-processing get --id "1234567890" +``` + +#### Create a pipeline + +```bash +# Create from inline JSON +secops log-processing create --pipeline '{"displayName":"My Pipeline","description":"Filters error logs","processors":[{"filterProcessor":{"include":{"logMatchType":"REGEXP","logBodies":[".*error.*"]},"errorMode":"IGNORE"}}]}' +``` + +# Create from JSON file +secops log-processing create --pipeline pipeline_config.json + +Example `pipeline_config.json`: +```json +{ + "displayName": "Production Pipeline", + "description": "Filters and transforms production logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*error.*", ".*warning.*"] + }, + "errorMode": "IGNORE" + } + } + ], + "customMetadata": [ + {"key": "environment", "value": "production"}, + {"key": "team", "value": "security"} + ] +} +``` + +#### Update a pipeline + +```bash +# Update from JSON file with update mask +secops log-processing update --id "1234567890" --pipeline updated_config.json --update-mask "description" + +# Update from inline JSON +secops log-processing update --id "1234567890" --pipeline '{description":"Updated description"}' --update-mask "description" +``` + +#### Delete a pipeline + +```bash +# Delete a pipeline by ID +secops log-processing delete --id "1234567890" + +# Delete with etag for concurrency control +secops log-processing delete --id "1234567890" --etag "etag_value" +``` + +#### Associate streams with a pipeline + +Associate log streams (by log type or feed) with a pipeline: + +```bash +# Associate by log type (inline) +secops log-processing associate-streams --id "1234567890" --streams '[{"logType":"WINEVTLOG"},{"logType":"LINUX"}]' + +# Associate by feed ID +secops log-processing associate-streams --id "1234567890" --streams '[{"feed":"feed-uuid-1"},{"feed":"feed-uuid-2"}]' + +# Associate by log type (from file) +secops log-processing associate-streams --id "1234567890" --streams streams.json +``` + +Example `streams.json`: +```json +[ + {"logType": "WINEVTLOG"}, + {"logType": "LINUX"}, + {"logType": "OKTA"} +] +``` + +#### Dissociate streams from a pipeline + +```bash +# Dissociate streams (from file) +secops log-processing dissociate-streams --id "1234567890" --streams streams.json + +# Dissociate streams (inline) +secops log-processing dissociate-streams --id "1234567890" --streams '[{"logType":"WINEVTLOG"}]' +``` + +#### Fetch associated pipeline + +Find which pipeline is associated with a specific stream: + +```bash +# Find pipeline for a log type (inline) +secops log-processing fetch-associated --stream '{"logType":"WINEVTLOG"}' + +# Find pipeline for a feed +secops log-processing fetch-associated --stream '{"feed":"feed-uuid"}' + +# Find pipeline for a log type (from file) +secops log-processing fetch-associated --stream stream_query.json +``` + +Example `stream_query.json`: +```json +{ + "logType": "WINEVTLOG" +} +``` + +#### Fetch sample logs + +Retrieve sample logs for specific streams: + +```bash +# Fetch sample logs for log types (from file) +secops log-processing fetch-sample-logs --streams streams.json --count 10 + +# Fetch sample logs (inline) +secops log-processing fetch-sample-logs --streams '[{"logType":"WINEVTLOG"},{"logType":"LINUX"}]' --count 5 + +# Fetch sample logs for feeds +secops log-processing fetch-sample-logs --streams '[{"feed":"feed-uuid"}]' --count 10 +``` + +#### Test a pipeline + +Test a pipeline configuration against sample logs before deployment: + +```bash +# Test with inline JSON +secops log-processing test --pipeline '{"displayName":"Test","processors":[{"filterProcessor":{"include":{"logMatchType":"REGEXP","logBodies":[".*"]},"errorMode":"IGNORE"}}]}' --input-logs input_logs.json + +# Test with files +secops log-processing test --pipeline pipeline_config.json --input-logs test_logs.json +``` + +Example `input_logs.json` (logs must have base64-encoded data): +```json +[ + { + "data": "U2FtcGxlIGxvZyBlbnRyeQ==", + "logEntryTime": "2024-01-01T00:00:00Z", + "collectionTime": "2024-01-01T00:00:00Z" + }, + { + "data": "QW5vdGhlciBsb2cgZW50cnk=", + "logEntryTime": "2024-01-01T00:01:00Z", + "collectionTime": "2024-01-01T00:01:00Z" + } +] +``` + ### Parser Management Parsers in Chronicle are used to process and normalize raw log data into UDM (Unified Data Model) format. The CLI provides comprehensive parser management capabilities. diff --git a/README.md b/README.md index acaccf99..d9fce04b 100644 --- a/README.md +++ b/README.md @@ -518,6 +518,252 @@ chronicle.delete_forwarder(forwarder_id="1234567890") print("Forwarder deleted successfully") ``` +### Log Processing Pipelines + +Chronicle log processing pipelines allow you to transform, filter, and enrich log data before it is stored in Chronicle. Common use cases include removing empty key-value pairs, redacting sensitive data, adding ingestion labels, filtering logs by field values, and extracting host information. Pipelines can be associated with log types (with optional collector IDs) and feeds, providing flexible control over your data ingestion workflow. + +The SDK provides comprehensive methods for managing pipelines, associating streams, testing configurations, and fetching sample logs. + +#### List pipelines + +Retrieve all log processing pipelines in your Chronicle instance: + +```python +# Get all pipelines +result = chronicle.list_log_processing_pipelines() +pipelines = result.get("logProcessingPipelines", []) + +for pipeline in pipelines: + pipeline_id = pipeline["name"].split("/")[-1] + print(f"Pipeline: {pipeline['displayName']} (ID: {pipeline_id})") + +# List with pagination +result = chronicle.list_log_processing_pipelines( + page_size=50, + page_token="next_page_token" +) +``` + +#### Get pipeline details + +Retrieve details about a specific pipeline: + +```python +# Get pipeline by ID +pipeline_id = "1234567890" +pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + +print(f"Name: {pipeline['displayName']}") +print(f"Description: {pipeline.get('description', 'N/A')}") +print(f"Processors: {len(pipeline.get('processors', []))}") +``` + +#### Create a pipeline + +Create a new log processing pipeline with processors: + +```python +# Define pipeline configuration +pipeline_config = { + "displayName": "My Custom Pipeline", + "description": "Filters and transforms application logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*error.*", ".*warning.*"], + }, + "errorMode": "IGNORE", + } + } + ], + "customMetadata": [ + {"key": "environment", "value": "production"}, + {"key": "team", "value": "security"} + ] +} + +# Create the pipeline (server generates ID) +created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config +) + +pipeline_id = created_pipeline["name"].split("/")[-1] +print(f"Created pipeline with ID: {pipeline_id}") +``` + +#### Update a pipeline + +Update an existing pipeline's configuration: + +```python +# Get the existing pipeline first +pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + +# Update specific fields +updated_config = { + "name": pipeline["name"], + "description": "Updated description", + "processors": pipeline["processors"] +} + +# Patch with update mask +updated_pipeline = chronicle.update_log_processing_pipeline( + pipeline_id=pipeline_id, + pipeline=updated_config, + update_mask="description" +) + +print(f"Updated: {updated_pipeline['displayName']}") +``` + +#### Delete a pipeline + +Delete an existing pipeline: + +```python +# Delete by ID +chronicle.delete_log_processing_pipeline(pipeline_id) +print("Pipeline deleted successfully") + +# Delete with etag for concurrency control +chronicle.delete_log_processing_pipeline( + pipeline_id=pipeline_id, + etag="etag_value" +) +``` + +#### Associate streams with a pipeline + +Associate log streams (by log type or feed) with a pipeline: + +```python +# Associate by log type +streams = [ + {"logType": "WINEVTLOG"}, + {"logType": "LINUX"} +] + +chronicle.associate_streams( + pipeline_id=pipeline_id, + streams=streams +) +print("Streams associated successfully") + +# Associate by feed ID +feed_streams = [ + {"feed": "feed-uuid-1"}, + {"feed": "feed-uuid-2"} +] + +chronicle.associate_streams( + pipeline_id=pipeline_id, + streams=feed_streams +) +``` + +#### Dissociate streams from a pipeline + +Remove stream associations from a pipeline: + +```python +# Dissociate streams +streams = [{"logType": "WINEVTLOG"}] + +chronicle.dissociate_streams( + pipeline_id=pipeline_id, + streams=streams +) +print("Streams dissociated successfully") +``` + +#### Fetch associated pipeline + +Find which pipeline is associated with a specific stream: + +```python +# Find pipeline for a log type +stream_query = {"logType": "WINEVTLOG"} +associated = chronicle.fetch_associated_pipeline(stream=stream_query) + +if associated: + print(f"Associated pipeline: {associated['name']}") +else: + print("No pipeline associated with this stream") + +# Find pipeline for a feed +feed_query = {"feed": "feed-uuid"} +associated = chronicle.fetch_associated_pipeline(stream=feed_query) +``` + +#### Fetch sample logs + +Retrieve sample logs for specific streams: + +```python +# Fetch sample logs for log types +streams = [ + {"logType": "WINEVTLOG"}, + {"logType": "LINUX"} +] + +result = chronicle.fetch_sample_logs_by_streams( + streams=streams, + sample_logs_count=10 +) + +for log in result.get("logs", []): + print(f"Log: {log}") +``` + +#### Test a pipeline + +Test a pipeline configuration against sample logs before deployment: + +```python +import base64 +from datetime import datetime, timezone + +# Define pipeline to test +pipeline_config = { + "displayName": "Test Pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ] +} + +# Create test logs with base64-encoded data +current_time = datetime.now(timezone.utc).isoformat() +log_data = base64.b64encode(b"Sample log entry").decode("utf-8") + +input_logs = [ + { + "data": log_data, + "logEntryTime": current_time, + "collectionTime": current_time, + } +] + +# Test the pipeline +result = chronicle.test_pipeline( + pipeline=pipeline_config, + input_logs=input_logs +) + +print(f"Processed {len(result.get('logs', []))} logs") +for processed_log in result.get("logs", []): + print(f"Result: {processed_log}") +``` + 5. Use custom timestamps: ```python from datetime import datetime, timedelta, timezone diff --git a/api_module_mapping.md b/api_module_mapping.md index 900d3894..0ed9f0fc 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -300,6 +300,16 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |logTypes.patch |v1alpha| | | |logTypes.runParser |v1alpha|chronicle.parser.run_parser |secops parser run | |logTypes.updateLogTypeSetting |v1alpha| | | +|logProcessingPipelines.associateStreams |v1alpha|chronicle.log_processing_pipelines.associate_streams |secops log-processing associate-streams| +|logProcessingPipelines.create |v1alpha|chronicle.log_processing_pipelines.create_log_processing_pipeline|secops log-processing create | +|logProcessingPipelines.delete |v1alpha|chronicle.log_processing_pipelines.delete_log_processing_pipeline|secops log-processing delete | +|logProcessingPipelines.dissociateStreams |v1alpha|chronicle.log_processing_pipelines.dissociate_streams |secops log-processing dissociate-streams| +|logProcessingPipelines.fetchAssociatedPipeline |v1alpha|chronicle.log_processing_pipelines.fetch_associated_pipeline|secops log-processing fetch-associated | +|logProcessingPipelines.fetchSampleLogsByStreams |v1alpha|chronicle.log_processing_pipelines.fetch_sample_logs_by_streams|secops log-processing fetch-sample-logs| +|logProcessingPipelines.get |v1alpha|chronicle.log_processing_pipelines.get_log_processing_pipeline|secops log-processing get | +|logProcessingPipelines.list |v1alpha|chronicle.log_processing_pipelines.list_log_processing_pipelines|secops log-processing list | +|logProcessingPipelines.patch |v1alpha|chronicle.log_processing_pipelines.update_log_processing_pipeline|secops log-processing update | +|logProcessingPipelines.testPipeline |v1alpha|chronicle.log_processing_pipelines.test_pipeline |secops log-processing test | |logs.classify |v1alpha| | | | nativeDashboards.addChart | v1alpha |chronicle.dashboard.add_chart |secops dashboard add-chart | | nativeDashboards.create | v1alpha |chronicle.dashboard.create_dashboard |secops dashboard create | diff --git a/examples/log_processing_pipelines_example.py b/examples/log_processing_pipelines_example.py new file mode 100644 index 00000000..cbeff285 --- /dev/null +++ b/examples/log_processing_pipelines_example.py @@ -0,0 +1,472 @@ +#!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Example usage of the Google SecOps SDK for Log Processing Pipelines.""" + +import argparse +import base64 +import json +import time +import uuid +from datetime import datetime, timezone + +from secops import SecOpsClient + + +def get_client(project_id, customer_id, region): + """Initialize and return the Chronicle client. + + Args: + project_id: Google Cloud Project ID. + customer_id: Chronicle Customer ID (UUID). + region: Chronicle region (us or eu). + + Returns: + Chronicle client instance. + """ + client = SecOpsClient() + chronicle = client.chronicle( + customer_id=customer_id, project_id=project_id, region=region + ) + return chronicle + + +def example_list_pipelines(chronicle): + """Example 1: List Log Processing Pipelines.""" + print("\n=== Example 1: List Log Processing Pipelines ===") + + try: + # List all pipelines + response = chronicle.list_log_processing_pipelines() + pipelines = response.get("logProcessingPipelines", []) + + print(f"\nFound {len(pipelines)} pipeline(s)") + + if pipelines: + print("\nSample pipeline details:") + sample_pipeline = pipelines[0] + print(f"Name: {sample_pipeline.get('name')}") + print(f"Display Name: {sample_pipeline.get('displayName')}") + print(f"Description: {sample_pipeline.get('description', 'N/A')}") + + # Extract pipeline ID from the name + pipeline_id = sample_pipeline.get("name", "").split("/")[-1] + print(f"Pipeline ID: {pipeline_id}") + + # Print processor count + processors = sample_pipeline.get("processors", []) + print(f"Number of processors: {len(processors)}") + else: + print("No pipelines found in your Chronicle instance.") + + except Exception as e: + print(f"Error listing pipelines: {e}") + + +def example_create_and_get_pipeline(chronicle): + """Example 2: Create and Get Pipeline.""" + print("\n=== Example 2: Create and Get Pipeline ===") + + # Generate unique pipeline name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Define a simple filter processor pipeline + pipeline_config = { + "displayName": display_name, + "description": "Example pipeline created by SDK", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + "customMetadata": [ + {"key": "environment", "value": "test"}, + {"key": "created_by", "value": "sdk_example"}, + ], + } + + created_pipeline = None + + try: + # Create the pipeline + print(f"\nCreating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + # Extract pipeline ID from the name + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + + print(f"Pipeline created successfully!") + print(f"Pipeline ID: {pipeline_id}") + print(f"Display Name: {created_pipeline.get('displayName')}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Get the pipeline to verify it was created + print(f"\nRetrieving pipeline details for ID: {pipeline_id}") + retrieved_pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + + print("Pipeline details retrieved:") + print(f"Name: {retrieved_pipeline.get('name')}") + print(f"Display Name: {retrieved_pipeline.get('displayName')}") + print(f"Description: {retrieved_pipeline.get('description', 'N/A')}") + + except Exception as e: + print(f"Error creating or getting pipeline: {e}") + + finally: + # Clean up: delete the pipeline if it was created + if created_pipeline: + try: + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"\nCleaning up: Deleting pipeline ID: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except Exception as e: + print(f"Warning: Failed to delete test pipeline: {e}") + + +def example_update_pipeline(chronicle): + """Example 3: Update (Patch) Pipeline.""" + print("\n=== Example 3: Update Pipeline ===") + + # Generate unique pipeline name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Initial pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Original description", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + created_pipeline = None + + try: + # Create the pipeline + print(f"\nCreating pipeline to update: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"Pipeline created with ID: {pipeline_id}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Update the pipeline with new display name and description + updated_pipeline_config = { + "name": created_pipeline.get("name"), + "displayName": f"Updated {display_name}", + "description": "Updated description via SDK", + "processors": created_pipeline.get("processors"), + } + + print("\nUpdating pipeline...") + updated_pipeline = chronicle.update_log_processing_pipeline( + pipeline_id=pipeline_id, + pipeline=updated_pipeline_config, + update_mask="displayName,description", + ) + + print("Pipeline updated successfully!") + print(f"New Display Name: {updated_pipeline.get('displayName')}") + print(f"New Description: {updated_pipeline.get('description', 'N/A')}") + + except Exception as e: + print(f"Error updating pipeline: {e}") + + finally: + # Clean up: delete the pipeline if it was created + if created_pipeline: + try: + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"\nCleaning up: Deleting pipeline ID: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except Exception as e: + print(f"Warning: Failed to delete test pipeline: {e}") + + +def example_stream_association(chronicle): + """Example 4: Associate and Dissociate Streams.""" + print("\n=== Example 4: Associate and Dissociate Streams ===") + + # Generate unique pipeline name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Pipeline for stream association example", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + created_pipeline = None + + try: + # Create the pipeline + print(f"\nCreating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"Pipeline created with ID: {pipeline_id}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Define streams to associate + # Note: Replace with actual log type or feed ID from environment + streams = [{"logType": "WINEVTLOG"}] + + print("\nAssociating streams with pipeline...") + print(f"Streams: {json.dumps(streams, indent=2)}") + + chronicle.associate_streams(pipeline_id=pipeline_id, streams=streams) + print("Streams associated successfully!") + + # Wait a moment + time.sleep(2) + + # Dissociate the streams + print("\nDissociating streams from pipeline...") + chronicle.dissociate_streams(pipeline_id=pipeline_id, streams=streams) + print("Streams dissociated successfully!") + + except Exception as e: + print(f"Error in stream association operations: {e}") + print( + "Note: Make sure the log type or feed ID exists " + "in your environment." + ) + + finally: + # Clean up: delete the pipeline if it was created + if created_pipeline: + try: + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + print(f"\nCleaning up: Deleting pipeline ID: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except Exception as e: + print(f"Warning: Failed to delete test pipeline: {e}") + + +def example_test_pipeline(chronicle): + """Example 5: Test Pipeline with Sample Logs.""" + print("\n=== Example 5: Test Pipeline ===") + + # Define a pipeline configuration to test + pipeline_config = { + "displayName": "Test Pipeline (Not Created)", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + # Sample input logs with proper Log resource structure + current_time = datetime.now(timezone.utc).isoformat() + + input_logs = [ + { + "data": base64.b64encode(b"Sample log entry 1").decode("utf-8"), + "logEntryTime": current_time, + "collectionTime": current_time, + }, + { + "data": base64.b64encode(b"Sample log entry 2").decode("utf-8"), + "logEntryTime": current_time, + "collectionTime": current_time, + }, + ] + + try: + print("\nTesting pipeline configuration...") + print(f"Pipeline: {pipeline_config['displayName']}") + print(f"Number of input logs: {len(input_logs)}") + + result = chronicle.test_pipeline( + pipeline=pipeline_config, input_logs=input_logs + ) + + processed_logs = result.get("logs", []) + print(f"\nProcessed {len(processed_logs)} log(s)") + + if processed_logs: + print("\nFirst processed log:") + print(json.dumps(processed_logs[0], indent=2)) + + except Exception as e: + print(f"Error testing pipeline: {e}") + print( + "Note: This example uses simplified log structure. " + "Actual logs may need more fields." + ) + + +def example_fetch_associated_pipeline(chronicle): + """Example 6: Fetch Pipeline Associated with a Stream.""" + print("\n=== Example 6: Fetch Associated Pipeline ===") + + # Define a stream to query + # Note: Replace with actual log type or feed ID from your environment + stream = {"logType": "WINEVTLOG"} + + try: + print(f"\nFetching pipeline for stream: {json.dumps(stream)}") + result = chronicle.fetch_associated_pipeline(stream=stream) + + if result: + print("\nAssociated pipeline found:") + print(f"Name: {result.get('name')}") + print(f"Display Name: {result.get('displayName')}") + print(f"Description: {result.get('description', 'N/A')}") + else: + print("No pipeline associated with this stream.") + + except Exception as e: + print(f"Error fetching associated pipeline: {e}") + print( + "Note: Make sure the stream exists and has an " + "associated pipeline." + ) + + +def example_fetch_sample_logs(chronicle): + """Example 7: Fetch Sample Logs by Streams.""" + print("\n=== Example 7: Fetch Sample Logs by Streams ===") + + # Define streams to fetch sample logs from + # Note: Replace with actual log type or feed ID from your environment + streams = [{"logType": "WINEVTLOG"}] + + try: + print(f"\nFetching sample logs for streams: {json.dumps(streams)}") + result = chronicle.fetch_sample_logs_by_streams( + streams=streams, sample_logs_count=5 + ) + + logs = result.get("logs", []) + print(f"\nFetched {len(logs)} sample log(s)") + + if logs: + print("\nFirst sample log:") + print(json.dumps(logs[0], indent=2)) + else: + print("No sample logs available for the specified streams.") + + except Exception as e: + print(f"Error fetching sample logs: {e}") + print("Note: Make sure the streams exist and have ingested logs.") + + +# Map of example functions +EXAMPLES = { + "1": example_list_pipelines, + "2": example_create_and_get_pipeline, + "3": example_update_pipeline, + "4": example_stream_association, + "5": example_test_pipeline, + "6": example_fetch_associated_pipeline, + "7": example_fetch_sample_logs, +} + + +def main(): + """Main function to run examples.""" + parser = argparse.ArgumentParser( + description="Run Chronicle Log Processing Pipeline examples" + ) + parser.add_argument( + "--project_id", required=True, help="Google Cloud Project ID" + ) + parser.add_argument( + "--customer_id", required=True, help="Chronicle Customer ID (UUID)" + ) + parser.add_argument( + "--region", default="us", help="Chronicle region (us or eu)" + ) + parser.add_argument( + "--example", + "-e", + help=( + "Example number to run (1-7). " + "If not specified, runs all examples." + ), + ) + + args = parser.parse_args() + + # Initialize the client + chronicle = get_client(args.project_id, args.customer_id, args.region) + + if args.example: + if args.example not in EXAMPLES: + print( + f"Invalid example number. Available examples: " + f"{', '.join(EXAMPLES.keys())}" + ) + return + EXAMPLES[args.example](chronicle) + else: + # Run all examples in order + for example_num in sorted(EXAMPLES.keys()): + EXAMPLES[example_num](chronicle) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 4388a25d..897997ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.28.1" +version = "0.29.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 403640df..50522d9b 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -72,6 +72,18 @@ list_forwarders, update_forwarder, ) +from secops.chronicle.log_processing_pipelines import ( + associate_streams, + create_log_processing_pipeline, + delete_log_processing_pipeline, + dissociate_streams, + fetch_associated_pipeline, + fetch_sample_logs_by_streams, + get_log_processing_pipeline, + list_log_processing_pipelines, + test_pipeline, + update_log_processing_pipeline, +) from secops.chronicle.log_types import ( get_all_log_types, get_log_type_description, @@ -137,18 +149,18 @@ from secops.chronicle.rule_retrohunt import create_retrohunt, get_retrohunt from secops.chronicle.rule_set import ( batch_update_curated_rule_set_deployments, - list_curated_rule_sets, - list_curated_rule_set_categories, - list_curated_rules, get_curated_rule, - get_curated_rule_set_category, + get_curated_rule_by_name, get_curated_rule_set, - list_curated_rule_set_deployments, + get_curated_rule_set_category, get_curated_rule_set_deployment, get_curated_rule_set_deployment_by_name, - get_curated_rule_by_name, - update_curated_rule_set_deployment, + list_curated_rule_set_categories, + list_curated_rule_set_deployments, + list_curated_rule_sets, + list_curated_rules, search_curated_detections, + update_curated_rule_set_deployment, ) from secops.chronicle.rule_validation import ValidationResult from secops.chronicle.search import search_udm @@ -304,4 +316,15 @@ "update_data_table", "update_data_table_rows", "replace_data_table_rows", + # Log Processing Pipelines + "list_log_processing_pipelines", + "get_log_processing_pipeline", + "create_log_processing_pipeline", + "update_log_processing_pipeline", + "delete_log_processing_pipeline", + "associate_streams", + "dissociate_streams", + "fetch_associated_pipeline", + "fetch_sample_logs_by_streams", + "test_pipeline", ] diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index d60a4d8d..92672228 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -114,6 +114,36 @@ ) from secops.chronicle.log_types import is_valid_log_type as _is_valid_log_type from secops.chronicle.log_types import search_log_types as _search_log_types +from secops.chronicle.log_processing_pipelines import ( + associate_streams as _associate_streams, +) +from secops.chronicle.log_processing_pipelines import ( + create_log_processing_pipeline as _create_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + delete_log_processing_pipeline as _delete_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + dissociate_streams as _dissociate_streams, +) +from secops.chronicle.log_processing_pipelines import ( + fetch_associated_pipeline as _fetch_associated_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + fetch_sample_logs_by_streams as _fetch_sample_logs_by_streams, +) +from secops.chronicle.log_processing_pipelines import ( + get_log_processing_pipeline as _get_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + list_log_processing_pipelines as _list_log_processing_pipelines, +) +from secops.chronicle.log_processing_pipelines import ( + update_log_processing_pipeline as _update_log_processing_pipeline, +) +from secops.chronicle.log_processing_pipelines import ( + test_pipeline as _test_pipeline, +) from secops.chronicle.models import ( APIVersion, CaseList, @@ -1302,6 +1332,200 @@ def delete_feed( """ return _delete_feed(self, feed_id, api_version) + # Log Processing Pipeline methods + + def list_log_processing_pipelines( + self, + page_size: int | None = None, + page_token: str | None = None, + filter_expr: str | None = None, + ) -> dict[str, Any]: + """Lists log processing pipelines. + + Args: + page_size: Maximum number of pipelines to return. + page_token: Page token for pagination. + filter_expr: Filter expression to restrict results. + + Returns: + Dictionary containing pipelines and pagination info. + + Raises: + APIError: If the API request fails. + """ + return _list_log_processing_pipelines( + self, page_size, page_token, filter_expr + ) + + def get_log_processing_pipeline(self, pipeline_id: str) -> dict[str, Any]: + """Gets a log processing pipeline by ID. + + Args: + pipeline_id: ID of the pipeline to retrieve. + + Returns: + Dictionary containing pipeline information. + + Raises: + APIError: If the API request fails. + """ + return _get_log_processing_pipeline(self, pipeline_id) + + def create_log_processing_pipeline( + self, + pipeline: dict[str, Any], + pipeline_id: str | None = None, + ) -> dict[str, Any]: + """Creates a new log processing pipeline. + + Args: + pipeline: Pipeline configuration dict containing: + - displayName: Display name for the pipeline + - description: Optional description + - processors: List of processor configurations + - customMetadata: Optional custom metadata list + pipeline_id: Optional ID for the pipeline. + + Returns: + Dictionary containing the created pipeline. + + Raises: + APIError: If the API request fails. + """ + return _create_log_processing_pipeline(self, pipeline, pipeline_id) + + def update_log_processing_pipeline( + self, + pipeline_id: str, + pipeline: dict[str, Any], + update_mask: str | None = None, + ) -> dict[str, Any]: + """Updates a log processing pipeline. + + Args: + pipeline_id: ID of the pipeline to update. + pipeline: Pipeline configuration with fields to update containing: + - displayName: Display name for the pipeline + - description: Optional description + - processors: List of processor configurations + - customMetadata: Optional custom metadata list + update_mask: Optional comma-separated list of fields. + + Returns: + Dictionary containing the updated pipeline. + + Raises: + APIError: If the API request fails. + """ + return _update_log_processing_pipeline( + self, pipeline_id, pipeline, update_mask + ) + + def delete_log_processing_pipeline( + self, pipeline_id: str, etag: str | None = None + ) -> dict[str, Any]: + """Deletes a log processing pipeline. + + Args: + pipeline_id: ID of the pipeline to delete. + etag: Optional etag for optimistic concurrency control. + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + return _delete_log_processing_pipeline(self, pipeline_id, etag) + + def associate_streams( + self, pipeline_id: str, streams: list[dict[str, Any]] + ) -> dict[str, Any]: + """Associates streams with a pipeline. + + Args: + pipeline_id: ID of the pipeline. + streams: List of stream dicts. + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + return _associate_streams(self, pipeline_id, streams) + + def dissociate_streams( + self, pipeline_id: str, streams: list[dict[str, Any]] + ) -> dict[str, Any]: + """Dissociates streams from a pipeline. + + Args: + pipeline_id: ID of the pipeline. + streams: List of stream dicts. + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + return _dissociate_streams(self, pipeline_id, streams) + + def fetch_associated_pipeline( + self, stream: dict[str, Any] + ) -> dict[str, Any]: + """Fetches the pipeline associated with a stream. + + Args: + stream: Stream dict (logType or feedId). + + Returns: + Dictionary containing the associated pipeline. + + Raises: + APIError: If the API request fails. + """ + return _fetch_associated_pipeline(self, stream) + + def fetch_sample_logs_by_streams( + self, + streams: list[dict[str, Any]], + sample_logs_count: int | None = None, + ) -> dict[str, Any]: + """Fetches sample logs for specified streams. + + Args: + streams: List of stream dicts. + sample_logs_count: Number of sample logs per stream. + + Returns: + Dictionary containing sample logs. + + Raises: + APIError: If the API request fails. + """ + return _fetch_sample_logs_by_streams(self, streams, sample_logs_count) + + def test_pipeline( + self, + pipeline: dict[str, Any], + input_logs: list[dict[str, Any]], + ) -> dict[str, Any]: + """Tests a pipeline with input logs. + + Args: + pipeline: Pipeline configuration to test. + input_logs: List of log objects to process. + + Returns: + Dictionary containing processed logs. + + Raises: + APIError: If the API request fails. + """ + return _test_pipeline(self, pipeline, input_logs) + def list_rules( self, view: str | None = "FULL", diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py new file mode 100644 index 00000000..3d2779f7 --- /dev/null +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -0,0 +1,391 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Provides log processing pipeline management for Chronicle.""" + +from typing import Any + +from secops.exceptions import APIError + + +def list_log_processing_pipelines( + client: "ChronicleClient", + page_size: int | None = None, + page_token: str | None = None, + filter_expr: str | None = None, +) -> dict[str, Any]: + """Lists log processing pipelines. + + Args: + client: ChronicleClient instance. + page_size: Maximum number of pipelines to return. If not + specified, server determines the number. + page_token: Page token from a previous list call to retrieve + the next page. + filter_expr: Filter expression (AIP-160) to restrict results. + + Returns: + Dictionary containing: + - logProcessingPipelines: List of pipeline dicts + - nextPageToken: Token for next page (if more results exist) + + Raises: + APIError: If the API request fails. + """ + url = f"{client.base_url}/{client.instance_id}/logProcessingPipelines" + + params: dict[str, Any] = {} + if page_size is not None: + params["pageSize"] = page_size + if page_token: + params["pageToken"] = page_token + if filter_expr: + params["filter"] = filter_expr + + response = client.session.get(url, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to list log processing pipelines: {response.text}" + ) + + return response.json() + + +def get_log_processing_pipeline( + client: "ChronicleClient", pipeline_id: str +) -> dict[str, Any]: + """Gets a log processing pipeline by ID. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to retrieve. + + Returns: + Dictionary containing pipeline information. + + Raises: + APIError: If the API request fails. + """ + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + else: + url = f"{client.base_url}/{pipeline_id}" + + response = client.session.get(url) + if response.status_code != 200: + raise APIError( + f"Failed to get log processing pipeline: {response.text}" + ) + + return response.json() + + +def create_log_processing_pipeline( + client: "ChronicleClient", + pipeline: dict[str, Any], + pipeline_id: str | None = None, +) -> dict[str, Any]: + """Creates a new log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline: LogProcessingPipeline configuration dict containing: + - displayName: Display name for the pipeline + - description: Optional description + - processors: List of processor configurations + - customMetadata: Optional custom metadata list + pipeline_id: Optional ID for the pipeline. If omitted, server + assigns a unique ID. + + Returns: + Dictionary containing the created pipeline. + + Raises: + APIError: If the API request fails. + """ + url = f"{client.base_url}/{client.instance_id}/logProcessingPipelines" + + params: dict[str, Any] = {} + if pipeline_id: + params["logProcessingPipelineId"] = pipeline_id + + response = client.session.post(url, json=pipeline, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to create log processing pipeline: {response.text}" + ) + + return response.json() + + +def update_log_processing_pipeline( + client: "ChronicleClient", + pipeline_id: str, + pipeline: dict[str, Any], + update_mask: str | None = None, +) -> dict[str, Any]: + """Updates a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to update. + pipeline: LogProcessingPipeline configuration dict with fields + to update. + update_mask: Optional comma-separated list of fields to update + (e.g., "displayName,description"). If not included, all + fields with default/non-default values will be overwritten. + + Returns: + Dictionary containing the updated pipeline. + + Raises: + APIError: If the API request fails. + """ + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + else: + url = f"{client.base_url}/{pipeline_id}" + + params: dict[str, Any] = {} + if update_mask: + params["updateMask"] = update_mask + + response = client.session.patch(url, json=pipeline, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to patch log processing pipeline: {response.text}" + ) + + return response.json() + + +def delete_log_processing_pipeline( + client: "ChronicleClient", pipeline_id: str, etag: str | None = None +) -> dict[str, Any]: + """Deletes a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to delete. + etag: Optional etag value. If provided, deletion only succeeds + if the resource's current etag matches this value. + + Returns: + Empty dictionary on successful deletion. + + Raises: + APIError: If the API request fails. + """ + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}" + ) + else: + url = f"{client.base_url}/{pipeline_id}" + + params: dict[str, Any] = {} + if etag: + params["etag"] = etag + + response = client.session.delete(url, params=params) + if response.status_code != 200: + raise APIError( + f"Failed to delete log processing pipeline: {response.text}" + ) + + return response.json() + + +def associate_streams( + client: "ChronicleClient", pipeline_id: str, streams: list[dict[str, Any]] +) -> dict[str, Any]: + """Associates streams with a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to associate streams with. + streams: List of stream dicts. Each stream can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}:associateStreams" + ) + else: + url = f"{client.base_url}/{pipeline_id}:associateStreams" + body = {"streams": streams} + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError(f"Failed to associate streams: {response.text}") + + return response.json() + + +def dissociate_streams( + client: "ChronicleClient", pipeline_id: str, streams: list[dict[str, Any]] +) -> dict[str, Any]: + """Dissociates streams from a log processing pipeline. + + Args: + client: ChronicleClient instance. + pipeline_id: ID of the pipeline to dissociate streams from. + streams: List of stream dicts. Each stream can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + + Returns: + Empty dictionary on success. + + Raises: + APIError: If the API request fails. + """ + if not pipeline_id.startswith("projects/"): + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines/{pipeline_id}:dissociateStreams" + ) + else: + url = f"{client.base_url}/{pipeline_id}:dissociateStreams" + + body = {"streams": streams} + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError(f"Failed to dissociate streams: {response.text}") + + return response.json() + + +def fetch_associated_pipeline( + client: "ChronicleClient", stream: dict[str, Any] +) -> dict[str, Any]: + """Fetches the pipeline associated with a specific stream. + + Args: + client: ChronicleClient instance. + stream: Stream dict, can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + + Returns: + Dictionary containing the associated pipeline. + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines:fetchAssociatedPipeline" + ) + + # Pass stream fields as separate query parameters with stream. prefix + params = {} + for key, value in stream.items(): + params[f"stream.{key}"] = value + + response = client.session.get(url, params=params) + if response.status_code != 200: + raise APIError(f"Failed to fetch associated pipeline: {response.text}") + + return response.json() + + +def fetch_sample_logs_by_streams( + client: "ChronicleClient", + streams: list[dict[str, Any]], + sample_logs_count: int | None = None, +) -> dict[str, Any]: + """Fetches sample logs for specified streams. + + Args: + client: ChronicleClient instance. + streams: List of stream dicts. Each stream can be: + - {"logType": "LOG_TYPE_NAME"} or + - {"feedId": "FEED_ID"} + sample_logs_count: Number of sample logs to fetch per stream. + Default is 100. Max is 1000 or 4MB per stream. + + Returns: + Dictionary containing: + - logs: List of log objects + - sampleLogs: List of base64-encoded log strings (deprecated) + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines:fetchSampleLogsByStreams" + ) + + body = {"streams": streams} + if sample_logs_count is not None: + body["sampleLogsCount"] = sample_logs_count + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError( + f"Failed to fetch sample logs by streams: {response.text}" + ) + + return response.json() + + +def test_pipeline( + client: "ChronicleClient", + pipeline: dict[str, Any], + input_logs: list[dict[str, Any]], +) -> dict[str, Any]: + """Tests a log processing pipeline with input logs. + + Args: + client: ChronicleClient instance. + pipeline: LogProcessingPipeline configuration to test. + input_logs: List of log objects to process through the pipeline. + + Returns: + Dictionary containing: + - logs: List of processed log objects + + Raises: + APIError: If the API request fails. + """ + url = ( + f"{client.base_url}/{client.instance_id}/" + f"logProcessingPipelines:testPipeline" + ) + + body = {"logProcessingPipeline": pipeline, "inputLogs": input_logs} + + response = client.session.post(url, json=body) + if response.status_code != 200: + raise APIError(f"Failed to test pipeline: {response.text}") + + return response.json() diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index 499949ec..f48a3f82 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -22,6 +22,9 @@ from secops.cli.commands.help import setup_help_command from secops.cli.commands.iocs import setup_iocs_command from secops.cli.commands.log import setup_log_command +from secops.cli.commands.log_processing import ( + setup_log_processing_command, +) from secops.cli.commands.parser import setup_parser_command from secops.cli.commands.parser_extension import setup_parser_extension_command from secops.cli.commands.reference_list import setup_reference_list_command @@ -158,6 +161,7 @@ def build_parser() -> argparse.ArgumentParser: setup_entity_command(subparsers) setup_iocs_command(subparsers) setup_log_command(subparsers) + setup_log_processing_command(subparsers) setup_parser_command(subparsers) setup_parser_extension_command(subparsers) setup_feed_command(subparsers) diff --git a/src/secops/cli/commands/log_processing.py b/src/secops/cli/commands/log_processing.py new file mode 100644 index 00000000..489c0710 --- /dev/null +++ b/src/secops/cli/commands/log_processing.py @@ -0,0 +1,342 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI log processing pipeline commands""" + +import sys + +from secops.cli.utils.formatters import output_formatter +from secops.cli.utils.input_utils import load_json_or_file + + +def setup_log_processing_command(subparsers): + """Set up the log-processing command parser.""" + log_processing_parser = subparsers.add_parser( + "log-processing", help="Manage log processing pipelines" + ) + log_processing_subparsers = log_processing_parser.add_subparsers( + dest="log_processing_command", help="Log processing command" + ) + log_processing_parser.set_defaults( + func=lambda args, _: log_processing_parser.print_help() + ) + + # List pipelines command + list_parser = log_processing_subparsers.add_parser( + "list", help="List log processing pipelines" + ) + list_parser.add_argument( + "--page-size", + "--page_size", + dest="page_size", + type=int, + help="Maximum number of pipelines to return", + ) + list_parser.add_argument( + "--page-token", + "--page_token", + dest="page_token", + help="Page token for pagination", + ) + list_parser.add_argument( + "--filter", help="Filter expression to restrict results" + ) + list_parser.set_defaults(func=handle_list_command) + + # Get pipeline command + get_parser = log_processing_subparsers.add_parser( + "get", help="Get a log processing pipeline" + ) + get_parser.add_argument("--id", required=True, help="Pipeline ID") + get_parser.set_defaults(func=handle_get_command) + + # Create pipeline command + create_parser = log_processing_subparsers.add_parser( + "create", help="Create a log processing pipeline" + ) + create_parser.add_argument( + "--pipeline", + required=True, + help="Pipeline config as JSON string or file path", + ) + create_parser.add_argument("--id", help="Optional pipeline ID") + create_parser.set_defaults(func=handle_create_command) + + # Update pipeline command + update_parser = log_processing_subparsers.add_parser( + "update", help="Update a log processing pipeline" + ) + update_parser.add_argument("--id", required=True, help="Pipeline ID") + update_parser.add_argument( + "--pipeline", + required=True, + help="Pipeline config as JSON string or file path", + ) + update_parser.add_argument( + "--update-mask", + "--update_mask", + dest="update_mask", + help="Comma-separated list of fields to update", + ) + update_parser.set_defaults(func=handle_update_command) + + # Delete pipeline command + delete_parser = log_processing_subparsers.add_parser( + "delete", help="Delete a log processing pipeline" + ) + delete_parser.add_argument("--id", required=True, help="Pipeline ID") + delete_parser.add_argument( + "--etag", help="Optional etag for concurrency control" + ) + delete_parser.set_defaults(func=handle_delete_command) + + # Associate streams command + associate_streams_parser = log_processing_subparsers.add_parser( + "associate-streams", help="Associate streams with a pipeline" + ) + associate_streams_parser.add_argument( + "--id", required=True, help="Pipeline ID" + ) + associate_streams_parser.add_argument( + "--streams", + required=True, + help="JSON array of stream objects or file path", + ) + associate_streams_parser.set_defaults(func=handle_associate_streams_command) + + # Dissociate streams command + dissociate_streams_parser = log_processing_subparsers.add_parser( + "dissociate-streams", help="Dissociate streams from a pipeline" + ) + dissociate_streams_parser.add_argument( + "--id", required=True, help="Pipeline ID" + ) + dissociate_streams_parser.add_argument( + "--streams", + required=True, + help="JSON array of stream objects or file path", + ) + dissociate_streams_parser.set_defaults( + func=handle_dissociate_streams_command + ) + + # Fetch associated pipeline command + fetch_associated_parser = log_processing_subparsers.add_parser( + "fetch-associated", help="Fetch pipeline associated with a stream" + ) + fetch_associated_parser.add_argument( + "--stream", + required=True, + help="Stream object as JSON string or file path", + ) + fetch_associated_parser.set_defaults(func=handle_fetch_associated_command) + + # Fetch sample logs command + fetch_sample_logs_parser = log_processing_subparsers.add_parser( + "fetch-sample-logs", help="Fetch sample logs by streams" + ) + fetch_sample_logs_parser.add_argument( + "--streams", + required=True, + help="JSON array of stream objects or file path", + ) + fetch_sample_logs_parser.add_argument( + "--count", type=int, help="Number of sample logs per stream (max 1000)" + ) + fetch_sample_logs_parser.set_defaults(func=handle_fetch_sample_logs_command) + + # Test pipeline command + test_parser = log_processing_subparsers.add_parser( + "test", help="Test a pipeline with input logs" + ) + test_parser.add_argument( + "--pipeline", + required=True, + help="Pipeline config as JSON or file path", + ) + test_parser.add_argument( + "--input-logs", + "--input_logs", + dest="input_logs", + required=True, + help="Input logs as JSON array or file path", + ) + test_parser.set_defaults(func=handle_test_command) + + +def handle_list_command(args, chronicle): + """Handle list log processing pipelines command.""" + try: + result = chronicle.list_log_processing_pipelines( + page_size=args.page_size, + page_token=args.page_token, + filter_expr=args.filter, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_get_command(args, chronicle): + """Handle get log processing pipeline command.""" + try: + result = chronicle.get_log_processing_pipeline(args.id) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_create_command(args, chronicle): + """Handle create log processing pipeline command.""" + try: + pipeline_config = load_json_or_file(args.pipeline) + + if not isinstance(pipeline_config, dict): + print("Error: pipeline must be a JSON object", file=sys.stderr) + sys.exit(1) + + result = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config, pipeline_id=args.id + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_update_command(args, chronicle): + """Handle update log processing pipeline command.""" + try: + pipeline_config = load_json_or_file(args.pipeline) + + if not isinstance(pipeline_config, dict): + print("Error: pipeline must be a JSON object", file=sys.stderr) + sys.exit(1) + + result = chronicle.update_log_processing_pipeline( + pipeline_id=args.id, + pipeline=pipeline_config, + update_mask=args.update_mask, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_delete_command(args, chronicle): + """Handle delete log processing pipeline command.""" + try: + result = chronicle.delete_log_processing_pipeline( + pipeline_id=args.id, etag=args.etag + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_associate_streams_command(args, chronicle): + """Handle associate streams command.""" + try: + streams = load_json_or_file(args.streams) + + if not isinstance(streams, list): + print("Error: streams must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.associate_streams( + pipeline_id=args.id, streams=streams + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_dissociate_streams_command(args, chronicle): + """Handle dissociate streams command.""" + try: + streams = load_json_or_file(args.streams) + + if not isinstance(streams, list): + print("Error: streams must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.dissociate_streams( + pipeline_id=args.id, streams=streams + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_fetch_associated_command(args, chronicle): + """Handle fetch associated pipeline command.""" + try: + stream = load_json_or_file(args.stream) + + if not isinstance(stream, dict): + print("Error: stream must be a JSON object", file=sys.stderr) + sys.exit(1) + + result = chronicle.fetch_associated_pipeline(stream=stream) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_fetch_sample_logs_command(args, chronicle): + """Handle fetch sample logs by streams command.""" + try: + streams = load_json_or_file(args.streams) + + if not isinstance(streams, list): + print("Error: streams must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.fetch_sample_logs_by_streams( + streams=streams, sample_logs_count=args.count + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_test_command(args, chronicle): + """Handle test pipeline command.""" + try: + pipeline = load_json_or_file(args.pipeline) + input_logs = load_json_or_file(args.input_logs) + + if not isinstance(pipeline, dict): + print("Error: pipeline must be a JSON object", file=sys.stderr) + sys.exit(1) + + if not isinstance(input_logs, list): + print("Error: input_logs must be a JSON array", file=sys.stderr) + sys.exit(1) + + result = chronicle.test_pipeline( + pipeline=pipeline, input_logs=input_logs + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/src/secops/cli/utils/input_utils.py b/src/secops/cli/utils/input_utils.py new file mode 100644 index 00000000..8e666a6a --- /dev/null +++ b/src/secops/cli/utils/input_utils.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI input utilities""" + +import json +import sys +from pathlib import Path +from typing import Any + + +def load_json_or_file(value: str) -> Any: + """Load JSON from string or file path. + + Args: + value: JSON string or file path + + Returns: + Parsed JSON object (dict, list, etc.) + + Raises: + SystemExit: If file not found or JSON parsing fails + """ + try: + file_path = Path(value) + if file_path.exists() and file_path.is_file(): + with open(file_path, encoding="utf-8") as f: + return json.load(f) + except json.JSONDecodeError as e: + print(f"Error parsing JSON from file: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error reading file: {e}", file=sys.stderr) + sys.exit(1) + + try: + return json.loads(value) + except json.JSONDecodeError as e: + print( + f"Error: Not a valid JSON string or file path: {value}", + file=sys.stderr, + ) + print(f"JSON parse error: {e}", file=sys.stderr) + sys.exit(1) + + +def load_string_or_file(value: str) -> str: + """Load string content from direct value or file path. + + Args: + value: String content or file path + + Returns: + String content + + Raises: + SystemExit: If file exists but cannot be read + """ + try: + file_path = Path(value) + if file_path.exists() and file_path.is_file(): + with open(file_path, encoding="utf-8") as f: + return f.read() + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error reading file: {e}", file=sys.stderr) + sys.exit(1) + + return value diff --git a/tests/chronicle/test_log_processing_pipeline.py b/tests/chronicle/test_log_processing_pipeline.py new file mode 100644 index 00000000..15a427ae --- /dev/null +++ b/tests/chronicle/test_log_processing_pipeline.py @@ -0,0 +1,706 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for Chronicle log processing pipeline functions.""" + +import pytest +from unittest.mock import Mock, patch + +from secops.chronicle.client import ChronicleClient +from secops.chronicle.log_processing_pipelines import ( + list_log_processing_pipelines, + get_log_processing_pipeline, + create_log_processing_pipeline, + update_log_processing_pipeline, + delete_log_processing_pipeline, + associate_streams, + dissociate_streams, + fetch_associated_pipeline, + fetch_sample_logs_by_streams, + test_pipeline as pipeline_test_function, +) +from secops.exceptions import APIError + + +@pytest.fixture +def chronicle_client(): + """Create a Chronicle client for testing.""" + with patch("secops.auth.SecOpsAuth") as mock_auth: + mock_session = Mock() + mock_session.headers = {} + mock_auth.return_value.session = mock_session + return ChronicleClient( + customer_id="test-customer", project_id="test-project" + ) + + +@pytest.fixture +def mock_response(): + """Create a mock API response.""" + mock = Mock() + mock.status_code = 200 + mock.json.return_value = { + "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", + "displayName": "Test Pipeline", + "description": "Test pipeline description", + "processors": [{"filterProcessor": {"include": {}}}], + } + return mock + + +@pytest.fixture +def mock_error_response(): + """Create a mock error API response.""" + mock = Mock() + mock.status_code = 400 + mock.text = "Error message" + return mock + + +def test_list_log_processing_pipelines(chronicle_client, mock_response): + """Test list_log_processing_pipelines function.""" + mock_response.json.return_value = { + "logProcessingPipelines": [ + {"name": "pipeline1"}, + {"name": "pipeline2"}, + ] + } + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = list_log_processing_pipelines(chronicle_client) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + params={}, + ) + assert result == mock_response.json.return_value + + +def test_list_log_processing_pipelines_with_params( + chronicle_client, mock_response +): + """Test list_log_processing_pipelines with pagination and filter.""" + mock_response.json.return_value = { + "logProcessingPipelines": [{"name": "pipeline1"}], + "nextPageToken": "token123", + } + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = list_log_processing_pipelines( + chronicle_client, + page_size=50, + page_token="prev_token", + filter_expr='displayName="Test"', + ) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + params={ + "pageSize": 50, + "pageToken": "prev_token", + "filter": 'displayName="Test"', + }, + ) + assert result == mock_response.json.return_value + + +def test_list_log_processing_pipelines_error( + chronicle_client, mock_error_response +): + """Test list_log_processing_pipelines with error response.""" + with patch.object( + chronicle_client.session, "get", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + list_log_processing_pipelines(chronicle_client) + + assert "Failed to list log processing pipelines" in str(exc_info.value) + + +def test_get_log_processing_pipeline(chronicle_client, mock_response): + """Test get_log_processing_pipeline function.""" + pipeline_id = "pipeline_12345" + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = get_log_processing_pipeline(chronicle_client, pipeline_id) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}" + ) + assert result == mock_response.json.return_value + + +def test_get_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test get_log_processing_pipeline with error response.""" + pipeline_id = "pipeline_12345" + + with patch.object( + chronicle_client.session, "get", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + get_log_processing_pipeline(chronicle_client, pipeline_id) + + assert "Failed to get log processing pipeline" in str(exc_info.value) + + +def test_create_log_processing_pipeline(chronicle_client, mock_response): + """Test create_log_processing_pipeline function.""" + pipeline_config = { + "displayName": "Test Pipeline", + "description": "Test description", + "processors": [{"filterProcessor": {"include": {}}}], + } + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = create_log_processing_pipeline( + chronicle_client, pipeline_config + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + json=pipeline_config, + params={}, + ) + assert result == mock_response.json.return_value + + +def test_create_log_processing_pipeline_with_id( + chronicle_client, mock_response +): + """Test create_log_processing_pipeline with custom pipeline ID.""" + pipeline_config = { + "displayName": "Test Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + pipeline_id = "custom_pipeline_id" + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = create_log_processing_pipeline( + chronicle_client, pipeline_config, pipeline_id=pipeline_id + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines", + json=pipeline_config, + params={"logProcessingPipelineId": pipeline_id}, + ) + assert result == mock_response.json.return_value + + +def test_create_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test create_log_processing_pipeline with error response.""" + pipeline_config = {"displayName": "Test Pipeline"} + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + create_log_processing_pipeline(chronicle_client, pipeline_config) + + assert "Failed to create log processing pipeline" in str(exc_info.value) + + +def test_update_log_processing_pipeline(chronicle_client, mock_response): + """Test update_log_processing_pipeline function.""" + pipeline_id = "pipeline_12345" + pipeline_config = { + "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", + "displayName": "Updated Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = update_log_processing_pipeline( + chronicle_client, pipeline_id, pipeline_config + ) + + mock_patch.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + json=pipeline_config, + params={}, + ) + assert result == mock_response.json.return_value + + +def test_update_log_processing_pipeline_with_update_mask( + chronicle_client, mock_response +): + """Test update_log_processing_pipeline with update mask.""" + pipeline_id = "pipeline_12345" + pipeline_config = { + "name": "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345", + "displayName": "Updated Pipeline", + } + update_mask = "displayName,description" + + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = update_log_processing_pipeline( + chronicle_client, + pipeline_id, + pipeline_config, + update_mask=update_mask, + ) + + mock_patch.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + json=pipeline_config, + params={"updateMask": update_mask}, + ) + assert result == mock_response.json.return_value + + +def test_update_log_processing_pipeline_with_full_name( + chronicle_client, mock_response +): + """Test update_log_processing_pipeline with full resource name.""" + full_name = "projects/test-project/locations/us/instances/test-customer/logProcessingPipelines/pipeline_12345" + pipeline_config = { + "name": full_name, + "displayName": "Updated Pipeline", + } + + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = update_log_processing_pipeline( + chronicle_client, full_name, pipeline_config + ) + + mock_patch.assert_called_once_with( + f"{chronicle_client.base_url}/{full_name}", + json=pipeline_config, + params={}, + ) + assert result == mock_response.json.return_value + + +def test_update_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test update_log_processing_pipeline with error response.""" + pipeline_id = "pipeline_12345" + pipeline_config = {"displayName": "Updated Pipeline"} + + with patch.object( + chronicle_client.session, "patch", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + update_log_processing_pipeline( + chronicle_client, pipeline_id, pipeline_config + ) + + assert "Failed to patch log processing pipeline" in str(exc_info.value) + + +def test_delete_log_processing_pipeline(chronicle_client, mock_response): + """Test delete_log_processing_pipeline function.""" + pipeline_id = "pipeline_12345" + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "delete", return_value=mock_response + ) as mock_delete: + result = delete_log_processing_pipeline(chronicle_client, pipeline_id) + + mock_delete.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + params={}, + ) + assert result == {} + + +def test_delete_log_processing_pipeline_with_etag( + chronicle_client, mock_response +): + """Test delete_log_processing_pipeline with etag.""" + pipeline_id = "pipeline_12345" + etag = "etag_value_123" + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "delete", return_value=mock_response + ) as mock_delete: + result = delete_log_processing_pipeline( + chronicle_client, pipeline_id, etag=etag + ) + + mock_delete.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}", + params={"etag": etag}, + ) + assert result == {} + + +def test_delete_log_processing_pipeline_error( + chronicle_client, mock_error_response +): + """Test delete_log_processing_pipeline with error response.""" + pipeline_id = "pipeline_12345" + + with patch.object( + chronicle_client.session, "delete", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + delete_log_processing_pipeline(chronicle_client, pipeline_id) + + assert "Failed to delete log processing pipeline" in str(exc_info.value) + + +def test_associate_streams(chronicle_client, mock_response): + """Test associate_streams function.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}, {"feedId": "feed_123"}] + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = associate_streams(chronicle_client, pipeline_id, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}:associateStreams", + json={"streams": streams}, + ) + assert result == {} + + +def test_associate_streams_error(chronicle_client, mock_error_response): + """Test associate_streams with error response.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + associate_streams(chronicle_client, pipeline_id, streams) + + assert "Failed to associate streams" in str(exc_info.value) + + +def test_associate_streams_empty_list(chronicle_client, mock_response): + """Test associate_streams with empty streams list.""" + pipeline_id = "pipeline_12345" + streams = [] + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = associate_streams(chronicle_client, pipeline_id, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}:associateStreams", + json={"streams": []}, + ) + assert result == {} + + +def test_dissociate_streams(chronicle_client, mock_response): + """Test dissociate_streams function.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}, {"feedId": "feed_123"}] + mock_response.json.return_value = {} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = dissociate_streams(chronicle_client, pipeline_id, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines/{pipeline_id}:dissociateStreams", + json={"streams": streams}, + ) + assert result == {} + + +def test_dissociate_streams_error(chronicle_client, mock_error_response): + """Test dissociate_streams with error response.""" + pipeline_id = "pipeline_12345" + streams = [{"logType": "WINEVTLOG"}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + dissociate_streams(chronicle_client, pipeline_id, streams) + + assert "Failed to dissociate streams" in str(exc_info.value) + + +def test_fetch_associated_pipeline_with_log_type( + chronicle_client, mock_response +): + """Test fetch_associated_pipeline with logType.""" + stream = {"logType": "WINEVTLOG"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = fetch_associated_pipeline(chronicle_client, stream) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchAssociatedPipeline", + params={"stream.logType": "WINEVTLOG"}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_associated_pipeline_with_feed_id( + chronicle_client, mock_response +): + """Test fetch_associated_pipeline with feedId.""" + stream = {"feedId": "feed_123"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = fetch_associated_pipeline(chronicle_client, stream) + + mock_get.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchAssociatedPipeline", + params={"stream.feedId": "feed_123"}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_associated_pipeline_with_multiple_fields( + chronicle_client, mock_response +): + """Test fetch_associated_pipeline with multiple stream fields.""" + stream = {"logType": "WINEVTLOG", "namespace": "test"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = fetch_associated_pipeline(chronicle_client, stream) + + mock_get.assert_called_once() + call_args = mock_get.call_args + assert "stream.logType" in call_args[1]["params"] + assert "stream.namespace" in call_args[1]["params"] + assert result == mock_response.json.return_value + + +def test_fetch_associated_pipeline_error(chronicle_client, mock_error_response): + """Test fetch_associated_pipeline with error response.""" + stream = {"logType": "WINEVTLOG"} + + with patch.object( + chronicle_client.session, "get", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + fetch_associated_pipeline(chronicle_client, stream) + + assert "Failed to fetch associated pipeline" in str(exc_info.value) + + +def test_fetch_sample_logs_by_streams(chronicle_client, mock_response): + """Test fetch_sample_logs_by_streams function.""" + streams = [{"logType": "WINEVTLOG"}, {"feedId": "feed_123"}] + mock_response.json.return_value = { + "logs": [{"data": "log1"}, {"data": "log2"}] + } + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = fetch_sample_logs_by_streams(chronicle_client, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchSampleLogsByStreams", + json={"streams": streams}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_sample_logs_by_streams_with_count( + chronicle_client, mock_response +): + """Test fetch_sample_logs_by_streams with sample count.""" + streams = [{"logType": "WINEVTLOG"}] + sample_logs_count = 50 + mock_response.json.return_value = {"logs": []} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = fetch_sample_logs_by_streams( + chronicle_client, streams, sample_logs_count=sample_logs_count + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchSampleLogsByStreams", + json={"streams": streams, "sampleLogsCount": sample_logs_count}, + ) + assert result == mock_response.json.return_value + + +def test_fetch_sample_logs_by_streams_error( + chronicle_client, mock_error_response +): + """Test fetch_sample_logs_by_streams with error response.""" + streams = [{"logType": "WINEVTLOG"}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + fetch_sample_logs_by_streams(chronicle_client, streams) + + assert "Failed to fetch sample logs by streams" in str(exc_info.value) + + +def test_fetch_sample_logs_by_streams_empty_streams( + chronicle_client, mock_response +): + """Test fetch_sample_logs_by_streams with empty streams list.""" + streams = [] + mock_response.json.return_value = {"logs": []} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = fetch_sample_logs_by_streams(chronicle_client, streams) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:fetchSampleLogsByStreams", + json={"streams": []}, + ) + assert result == mock_response.json.return_value + + +def test_test_pipeline(chronicle_client, mock_response): + """Test test_pipeline function.""" + pipeline_config = { + "displayName": "Test Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + input_logs = [ + {"data": "bG9nMQ==", "logEntryTime": "2024-01-01T00:00:00Z"}, + {"data": "bG9nMg==", "logEntryTime": "2024-01-01T00:00:01Z"}, + ] + mock_response.json.return_value = {"logs": input_logs} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:testPipeline", + json={ + "logProcessingPipeline": pipeline_config, + "inputLogs": input_logs, + }, + ) + assert result == mock_response.json.return_value + + +def test_test_pipeline_error(chronicle_client, mock_error_response): + """Test test_pipeline with error response.""" + pipeline_config = {"displayName": "Test Pipeline"} + input_logs = [{"data": "bG9nMQ=="}] + + with patch.object( + chronicle_client.session, "post", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + assert "Failed to test pipeline" in str(exc_info.value) + + +def test_test_pipeline_empty_logs(chronicle_client, mock_response): + """Test test_pipeline with empty input logs.""" + pipeline_config = { + "displayName": "Test Pipeline", + "processors": [{"filterProcessor": {"include": {}}}], + } + input_logs = [] + mock_response.json.return_value = {"logs": []} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + mock_post.assert_called_once_with( + f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logProcessingPipelines:testPipeline", + json={ + "logProcessingPipeline": pipeline_config, + "inputLogs": [], + }, + ) + assert result == mock_response.json.return_value + + +def test_test_pipeline_with_complex_processors(chronicle_client, mock_response): + """Test test_pipeline with complex processor configuration.""" + pipeline_config = { + "displayName": "Complex Pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*error.*"], + } + } + }, + { + "transformProcessor": { + "fields": [{"field": "message", "transformation": "upper"}] + } + }, + ], + } + input_logs = [{"data": "bG9nMQ==", "logEntryTime": "2024-01-01T00:00:00Z"}] + mock_response.json.return_value = {"logs": input_logs} + + with patch.object( + chronicle_client.session, "post", return_value=mock_response + ) as mock_post: + result = pipeline_test_function( + chronicle_client, pipeline_config, input_logs + ) + + mock_post.assert_called_once() + call_kwargs = mock_post.call_args[1] + assert call_kwargs["json"]["logProcessingPipeline"] == pipeline_config + assert result == mock_response.json.return_value diff --git a/tests/chronicle/test_log_processing_pipeline_integration.py b/tests/chronicle/test_log_processing_pipeline_integration.py new file mode 100644 index 00000000..8781b9a6 --- /dev/null +++ b/tests/chronicle/test_log_processing_pipeline_integration.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for log processing pipeline endpoints. + +These tests require valid credentials and API access. +""" +import pytest +import time +import uuid +import base64 +from datetime import datetime, timezone +from secops import SecOpsClient +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON +from secops.exceptions import APIError + + +@pytest.mark.integration +def test_log_processing_pipeline_crud_workflow(): + """Test CRUD workflow for log processing pipelines.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Integration test pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + created_pipeline = None + + try: + # Test CREATE + print(f"Creating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + # Extract pipeline ID from the name + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + + assert created_pipeline is not None + assert "name" in created_pipeline + assert created_pipeline.get("displayName") == display_name + print(f"Pipeline created: {created_pipeline['name']}") + + # Wait for pipeline to be fully created + time.sleep(2) + + # Test GET + print(f"Getting pipeline: {pipeline_id}") + retrieved_pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + assert retrieved_pipeline is not None + assert retrieved_pipeline.get("displayName") == display_name + print(f"Pipeline retrieved: {retrieved_pipeline['name']}") + + # Test LIST + print("Listing pipelines") + list_result = chronicle.list_log_processing_pipelines(page_size=10) + assert "logProcessingPipelines" in list_result + pipelines = list_result["logProcessingPipelines"] + pipeline_ids = [p["name"].split("/")[-1] for p in pipelines] + assert pipeline_id in pipeline_ids + print(f"Found {len(pipelines)} pipelines") + + # Test PATCH + updated_display_name = f"Updated Pipeline {unique_id}" + updated_config = { + "name": created_pipeline.get("name"), + "displayName": updated_display_name, + "description": "Updated description", + "processors": created_pipeline.get("processors"), + } + print(f"Updating pipeline: {pipeline_id}") + updated_pipeline = chronicle.update_log_processing_pipeline( + pipeline_id=pipeline_id, + pipeline=updated_config, + update_mask="displayName,description", + ) + assert updated_pipeline is not None + assert updated_pipeline.get("displayName") == updated_display_name + print(f"Pipeline updated: {updated_pipeline['displayName']}") + + # Verify update + time.sleep(2) + verified_pipeline = chronicle.get_log_processing_pipeline(pipeline_id) + assert verified_pipeline.get("displayName") == updated_display_name + print("Pipeline update verified") + + except APIError as e: + print(f"Pipeline CRUD test failed: {str(e)}") + pytest.fail(f"Pipeline CRUD test failed due to API error: {str(e)}") + + finally: + # Test DELETE - cleanup + if created_pipeline: + try: + print(f"Deleting pipeline: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + + # Verify deletion + time.sleep(2) + try: + chronicle.get_log_processing_pipeline(pipeline_id) + pytest.fail("Pipeline still exists after deletion") + except APIError: + print("Pipeline deletion verified") + + except APIError as e: + print(f"Warning: Failed to delete test pipeline: {str(e)}") + + +@pytest.mark.integration +def test_log_processing_pipeline_stream_operations(): + """Test stream association and dissociation workflow.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Stream Test Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Integration test for stream operations", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + created_pipeline = None + + try: + # Create pipeline + print(f"Creating pipeline for stream test: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + + # Extract pipeline ID from the name + pipeline_id = created_pipeline.get("name", "").split("/")[-1] + assert created_pipeline is not None + print(f"Pipeline created: {created_pipeline['name']}") + time.sleep(2) + + # Test ASSOCIATE STREAMS + streams = [{"logType": "WINEVTLOG"}] + print(f"Associating streams to pipeline: {pipeline_id}") + associate_result = chronicle.associate_streams( + pipeline_id=pipeline_id, streams=streams + ) + assert associate_result is not None + print("Streams associated successfully") + time.sleep(2) + + # Test FETCH ASSOCIATED PIPELINE + print("Fetching associated pipeline by stream") + stream_query = {"logType": "WINEVTLOG"} + associated_pipeline = chronicle.fetch_associated_pipeline( + stream=stream_query + ) + assert associated_pipeline is not None + print(f"Associated pipeline: {associated_pipeline.get('name', 'N/A')}") + + # Test DISSOCIATE STREAMS + print(f"Dissociating streams from pipeline: {pipeline_id}") + dissociate_result = chronicle.dissociate_streams( + pipeline_id=pipeline_id, streams=streams + ) + assert dissociate_result is not None + print("Streams dissociated successfully") + + except APIError as e: + print(f"Stream operations test failed: {str(e)}") + pytest.fail(f"Stream operations test failed due to API error: {str(e)}") + + finally: + # Cleanup + if created_pipeline: + try: + print(f"Deleting pipeline: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Pipeline deleted successfully") + except APIError as e: + print(f"Warning: Failed to delete test pipeline: {str(e)}") + + +@pytest.mark.integration +def test_fetch_sample_logs_by_streams(): + """Test fetching sample logs by streams.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Sample Logs Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "Pipeline for testing sample logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + pipeline_id = None + try: + # Create the pipeline + print(f"Creating pipeline: {display_name}") + created_pipeline = chronicle.create_log_processing_pipeline( + pipeline=pipeline_config + ) + pipeline_id = created_pipeline["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + # Associate CS_EDR log type with the pipeline + streams = [{"logType": "CS_EDR"}] + print(f"Associating streams: {streams}") + chronicle.associate_streams(pipeline_id=pipeline_id, streams=streams) + print("Streams associated successfully") + + # Wait briefly for association to propagate + time.sleep(10) + + # Fetch sample logs for the log type + print(f"Fetching sample logs for streams: {streams}") + result = chronicle.fetch_sample_logs_by_streams( + streams=streams, sample_logs_count=5 + ) + + assert result is not None + if not result or ("logs" not in result and "sampleLogs" not in result): + pytest.skip("No sample logs found for CS_EDR log type") + + logs = result.get("logs", result.get("sampleLogs", [])) + print(f"Fetched sample logs: {len(logs)} logs") + assert len(logs) > 0, "Expected at least one sample log" + + except APIError as e: + print(f"Fetch sample logs test failed: {str(e)}") + pytest.skip( + f"Fetch sample logs test skipped due to API error: {str(e)}" + ) + + finally: + # Cleanup: Delete the created pipeline + if pipeline_id: + try: + print(f"Deleting pipeline: {pipeline_id}") + chronicle.delete_log_processing_pipeline(pipeline_id) + print("Test pipeline deleted successfully") + except APIError as e: + print(f"Warning: Failed to delete test pipeline: {str(e)}") + + +@pytest.mark.integration +def test_pipeline_testing_functionality(): + """Test the test_pipeline functionality.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Pipeline configuration for testing + pipeline_config = { + "displayName": "Test Pipeline Config", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + # Create test input logs + current_time = datetime.now(timezone.utc).isoformat() + log_data_1 = base64.b64encode(b"Sample log line 1").decode("utf-8") + log_data_2 = base64.b64encode(b"Sample log line 2").decode("utf-8") + + input_logs = [ + { + "data": log_data_1, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + { + "data": log_data_2, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + ] + + try: + print("Testing pipeline with input logs") + print(f"Pipeline: {pipeline_config['displayName']}") + print(f"Number of input logs: {len(input_logs)}") + + result = chronicle.test_pipeline( + pipeline=pipeline_config, input_logs=input_logs + ) + + assert result is not None + assert "logs" in result + + processed_logs = result.get("logs", []) + print(f"Pipeline test completed: {len(processed_logs)} logs processed") + + if processed_logs: + print("First processed log data present") + assert len(processed_logs) > 0 + + except APIError as e: + print(f"Test pipeline functionality failed: {str(e)}") + pytest.skip( + f"Test pipeline functionality skipped due to API error: {str(e)}" + ) + + +@pytest.mark.integration +def test_list_pipelines_with_pagination(): + """Test listing pipelines with pagination.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + try: + # Get first page with small page size + print("Fetching first page of pipelines") + first_page = chronicle.list_log_processing_pipelines(page_size=1) + + assert first_page is not None + assert "logProcessingPipelines" in first_page + pipelines = first_page.get("logProcessingPipelines", []) + print(f"First page: {len(pipelines)} pipelines") + + # If there's a next page token, fetch next page + next_token = first_page.get("nextPageToken") + if next_token: + print("Fetching second page of pipelines") + second_page = chronicle.list_log_processing_pipelines( + page_size=1, page_token=next_token + ) + assert second_page is not None + pipelines_2 = second_page.get("logProcessingPipelines", []) + print(f"Second page: {len(pipelines_2)} pipelines") + + # Verify pagination works correctly + if pipelines and pipelines_2: + assert pipelines[0].get("name") != pipelines_2[0].get("name") + print("Pagination verified successfully") + else: + print("No second page available for pagination test") + + except APIError as e: + print(f"List pipelines pagination test failed: {str(e)}") + pytest.skip( + f"List pipelines pagination test skipped due to API error: {str(e)}" + ) diff --git a/tests/cli/test_log_processing_integration.py b/tests/cli/test_log_processing_integration.py new file mode 100644 index 00000000..1493ef4b --- /dev/null +++ b/tests/cli/test_log_processing_integration.py @@ -0,0 +1,655 @@ +"""Integration tests for the SecOps CLI log processing commands.""" + +import base64 +import json +import os +import subprocess +import time +import uuid +from datetime import datetime, timezone + +import pytest + +from tests.config import CHRONICLE_CONFIG + + +@pytest.mark.integration +def test_cli_log_processing_crud_workflow(cli_env, common_args, tmp_path): + """Test the log processing pipeline create, update, and delete.""" + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test Pipeline {unique_id}" + + pipeline_config = { + "displayName": display_name, + "description": "CLI integration test pipeline", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + + try: + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(config_file), + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + assert create_result.returncode == 0 + + pipeline_data = json.loads(create_result.stdout) + assert "name" in pipeline_data + pipeline_id = pipeline_data["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + time.sleep(2) + + updated_display_name = f"Updated Pipeline {unique_id}" + updated_config = { + "name": pipeline_data.get("name"), + "displayName": updated_display_name, + "description": "Updated CLI integration test pipeline", + "processors": pipeline_data.get("processors"), + } + + updated_config_file = tmp_path / "updated_pipeline_config.json" + updated_config_file.write_text(json.dumps(updated_config)) + + update_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "update", + "--id", + pipeline_id, + "--pipeline", + str(updated_config_file), + "--update-mask", + "displayName,description", + ] + ) + + update_result = subprocess.run( + update_cmd, env=cli_env, capture_output=True, text=True + ) + + assert update_result.returncode == 0 + + updated_pipeline = json.loads(update_result.stdout) + assert updated_pipeline["displayName"] == updated_display_name + print(f"Updated pipeline to: {updated_display_name}") + + finally: + if pipeline_id: + delete_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "delete", + "--id", + pipeline_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print(f"Successfully deleted pipeline: {pipeline_id}") + else: + print(f"Failed to delete test pipeline: {delete_result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_stream_operations(cli_env, common_args, tmp_path): + """Test stream association and dissociation commands.""" + unique_id = str(uuid.uuid4())[:8] + display_name = f"Stream Test Pipeline {unique_id}" + + pipeline_config = { + "displayName": display_name, + "description": "CLI test for stream operations", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + + try: + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(config_file), + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + assert create_result.returncode == 0 + + pipeline_data = json.loads(create_result.stdout) + pipeline_id = pipeline_data["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + time.sleep(2) + + streams = [{"logType": "WINEVTLOG"}] + streams_file = tmp_path / "streams.json" + streams_file.write_text(json.dumps(streams)) + + associate_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "associate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + associate_result = subprocess.run( + associate_cmd, env=cli_env, capture_output=True, text=True + ) + + assert associate_result.returncode == 0 + print("Streams associated successfully") + + time.sleep(2) + + dissociate_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "dissociate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + dissociate_result = subprocess.run( + dissociate_cmd, env=cli_env, capture_output=True, text=True + ) + + assert dissociate_result.returncode == 0 + print("Streams dissociated successfully") + + finally: + if pipeline_id: + delete_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "delete", + "--id", + pipeline_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print(f"Successfully deleted pipeline: {pipeline_id}") + else: + print(f"Failed to delete test pipeline: {delete_result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_fetch_associated(cli_env, common_args, tmp_path): + """Test fetch associated pipeline command.""" + unique_id = str(uuid.uuid4())[:8] + display_name = f"Fetch Test Pipeline {unique_id}" + + pipeline_config = { + "displayName": display_name, + "description": "CLI test for fetch associated", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + + try: + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(config_file), + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + assert create_result.returncode == 0 + + pipeline_data = json.loads(create_result.stdout) + pipeline_id = pipeline_data["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + time.sleep(2) + + streams = [{"logType": "WINEVTLOG"}] + streams_file = tmp_path / "streams.json" + streams_file.write_text(json.dumps(streams)) + + associate_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "associate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + associate_result = subprocess.run( + associate_cmd, env=cli_env, capture_output=True, text=True + ) + + assert associate_result.returncode == 0 + print("Streams associated successfully") + + time.sleep(2) + + stream_query = {"logType": "WINEVTLOG"} + stream_file = tmp_path / "stream_query.json" + stream_file.write_text(json.dumps(stream_query)) + + fetch_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "fetch-associated", + "--stream", + str(stream_file), + ] + ) + + fetch_result = subprocess.run( + fetch_cmd, env=cli_env, capture_output=True, text=True + ) + + assert fetch_result.returncode == 0 + + associated_pipeline = json.loads(fetch_result.stdout) + assert "name" in associated_pipeline + print(f"Fetched associated pipeline: {associated_pipeline['name']}") + + finally: + if pipeline_id: + delete_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "delete", + "--id", + pipeline_id, + ] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print(f"Successfully deleted pipeline: {pipeline_id}") + else: + print(f"Failed to delete test pipeline: {delete_result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_fetch_sample_logs(cli_env, common_args, tmp_path): + """Test fetch sample logs command.""" + # Generate unique display name + unique_id = str(uuid.uuid4())[:8] + display_name = f"CLI Test Sample Logs Pipeline {unique_id}" + + # Pipeline configuration + pipeline_config = { + "displayName": display_name, + "description": "CLI test pipeline for sample logs", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + pipeline_config_file = tmp_path / "pipeline_config.json" + pipeline_config_file.write_text(json.dumps(pipeline_config)) + + pipeline_id = None + try: + # Create pipeline + create_cmd = ( + ["secops"] + + common_args + + [ + "log-processing", + "create", + "--pipeline", + str(pipeline_config_file), + ] + ) + + print(f"Creating pipeline: {display_name}") + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + if create_result.returncode != 0: + pytest.skip(f"Failed to create pipeline: {create_result.stderr}") + + created_pipeline = json.loads(create_result.stdout) + pipeline_id = created_pipeline["name"].split("/")[-1] + print(f"Created pipeline with ID: {pipeline_id}") + + # Associate CS_EDR log type with pipeline + streams = [{"logType": "CS_EDR"}] + streams_file = tmp_path / "streams.json" + streams_file.write_text(json.dumps(streams)) + + associate_cmd = ( + ["secops"] + + common_args + + [ + "log-processing", + "associate-streams", + "--id", + pipeline_id, + "--streams", + str(streams_file), + ] + ) + + print(f"Associating streams: {streams}") + associate_result = subprocess.run( + associate_cmd, env=cli_env, capture_output=True, text=True + ) + + if associate_result.returncode != 0: + pytest.skip( + f"Failed to associate streams: {associate_result.stderr}" + ) + + print("Streams associated successfully") + + # Wait for association to propagate + time.sleep(10) + + # Fetch sample logs + fetch_cmd = ( + ["secops"] + + common_args + + [ + "log-processing", + "fetch-sample-logs", + "--streams", + str(streams_file), + "--count", + "5", + ] + ) + + print(f"Fetching sample logs for streams: {streams}") + result = subprocess.run( + fetch_cmd, env=cli_env, capture_output=True, text=True + ) + + if result.returncode == 0: + output = json.loads(result.stdout) + if not output or ( + "logs" not in output and "sampleLogs" not in output + ): + pytest.skip("No sample logs available for CS_EDR log type") + + logs = output.get("logs", output.get("sampleLogs", [])) + print(f"Fetched sample logs: {len(logs)} logs") + assert len(logs) > 0, "Expected at least one sample log" + else: + pytest.skip(f"Fetch sample logs command skipped: {result.stderr}") + + finally: + # Cleanup: Delete the created pipeline + if pipeline_id: + delete_cmd = ( + ["secops"] + + common_args + + ["log-processing", "delete", "--id", pipeline_id] + ) + + print(f"Deleting pipeline: {pipeline_id}") + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + if delete_result.returncode == 0: + print("Test pipeline deleted successfully") + else: + print( + f"Warning: Failed to delete test pipeline: " + f"{delete_result.stderr}" + ) + + +@pytest.mark.integration +def test_cli_log_processing_test_pipeline(cli_env, common_args, tmp_path): + """Test the test pipeline command.""" + pipeline_config = { + "displayName": "Test Pipeline Config", + "processors": [ + { + "filterProcessor": { + "include": { + "logMatchType": "REGEXP", + "logBodies": [".*"], + }, + "errorMode": "IGNORE", + } + } + ], + } + + current_time = datetime.now(timezone.utc).isoformat() + log_data_1 = base64.b64encode(b"Sample log line 1").decode("utf-8") + log_data_2 = base64.b64encode(b"Sample log line 2").decode("utf-8") + + input_logs = [ + { + "data": log_data_1, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + { + "data": log_data_2, + "logEntryTime": current_time, + "collectionTime": current_time, + }, + ] + + config_file = tmp_path / "pipeline_config.json" + config_file.write_text(json.dumps(pipeline_config)) + + logs_file = tmp_path / "input_logs.json" + logs_file.write_text(json.dumps(input_logs)) + + cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "test", + "--pipeline", + str(config_file), + "--input-logs", + str(logs_file), + ] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + if result.returncode == 0: + output = json.loads(result.stdout) + assert "logs" in output + print( + f"Pipeline test completed: {len(output.get('logs', []))} processed" + ) + else: + pytest.skip(f"Test pipeline command skipped: {result.stderr}") + + +@pytest.mark.integration +def test_cli_log_processing_list_with_pagination(cli_env, common_args): + """Test listing pipelines with pagination.""" + cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "list", + "--page-size", + "1", + ] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + assert result.returncode == 0 + + output = json.loads(result.stdout) + assert "logProcessingPipelines" in output + pipelines = output.get("logProcessingPipelines", []) + print(f"First page: {len(pipelines)} pipelines") + + if "nextPageToken" in output: + next_page_token = output["nextPageToken"] + + next_cmd = ( + [ + "secops", + ] + + common_args + + [ + "log-processing", + "list", + "--page-size", + "1", + "--page-token", + next_page_token, + ] + ) + + next_result = subprocess.run( + next_cmd, env=cli_env, capture_output=True, text=True + ) + + assert next_result.returncode == 0 + + next_output = json.loads(next_result.stdout) + assert "logProcessingPipelines" in next_output + next_pipelines = next_output.get("logProcessingPipelines", []) + print(f"Second page: {len(next_pipelines)} pipelines")