diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b8944c3..2892fdf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.22.0] - 2025-10-30 +### Added +- Support for entity import method + ## [0.21.2] - 2025-10-15 ### Added - Support for filter in list rule deployments method diff --git a/CLI.md b/CLI.md index 567093e9..e9e207d7 100644 --- a/CLI.md +++ b/CLI.md @@ -905,6 +905,12 @@ secops search --query "metadata.event_type = \"USER_LOGIN\" AND security_result. secops entity --value "192.168.1.100" --time-window 72 ``` +### Import entities: + +```bash +secops entity import --type "CUSTOM_LOG_TYPE" --file "/path/to/entities.json" +``` + ### Check for Critical IoCs ```bash diff --git a/README.md b/README.md index b0302910..59bf1b20 100644 --- a/README.md +++ b/README.md @@ -559,6 +559,47 @@ result = chronicle.ingest_udm(udm_events=[network_event, process_event]) print("Multiple events ingested successfully") ``` +Import entities into Chronicle: + +```python +# Create a sample entity +entity = { + "metadata": { + "collected_timestamp": "2025-01-01T00:00:00Z", + "vendor_name": "TestVendor", + "product_name": "TestProduct", + "entity_type": "USER", + }, + "entity": { + "user": { + "userid": "testuser", + } + }, +} + +# Import a single entity +result = chronicle.import_entities(entities=entity, log_type="TEST_LOG_TYPE") +print(f"Imported entity: {result}") + +# Import multiple entities +entity2 = { + "metadata": { + "collected_timestamp": "2025-01-01T00:00:00Z", + "vendor_name": "TestVendor", + "product_name": "TestProduct", + "entity_type": "ASSET", + }, + "entity": { + "asset": { + "hostname": "testhost", + } + }, +} +entities = [entity, entity2] +result = chronicle.import_entities(entities=entities, log_type="TEST_LOG_TYPE") +print(f"Imported entities: {result}") +``` + ### Data Export > **Note**: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality. diff --git a/api_module_mapping.md b/api_module_mapping.md index ab89dfd9..9ce0296c 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -147,7 +147,7 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |enrichmentControls.get |v1alpha| | | |enrichmentControls.list |v1alpha| | | |entities.get |v1alpha| | | -|entities.import |v1alpha| | | +|entities.import |v1alpha|chronicle.log_ingest.import_entities |secops entity import | |entities.modifyEntityRiskScore |v1alpha| | | |entities.queryEntityRiskScoreModifications |v1alpha| | | |entityRiskScores.query |v1alpha| | | diff --git a/examples/entity_example.py b/examples/entity_example.py new file mode 100644 index 00000000..1334294d --- /dev/null +++ b/examples/entity_example.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Example demonstrating entity import functionality with Chronicle.""" + +import argparse +import uuid +from typing import Any, Dict + +from secops import SecOpsClient +from secops.exceptions import APIError + + +def create_sample_user_entity() -> Dict[str, Any]: + """Create a sample user entity. + + Returns: + A dictionary representing a user entity in Chronicle format + """ + + # Generate a unique ID for this entity + user_id = f"user_{uuid.uuid4().hex[:8]}" + + # Create sample user entity + return { + "metadata": { + "collectedTimestamp": "1970-01-01T03:25:45.000000124Z", + "vendorName": "vendor", + "productName": "product", + "entityType": "USER", + }, + "entity": { + "user": {"userid": user_id, "productObjectId": "dev google"} + }, + } + + +def create_sample_file_entity() -> Dict[str, Any]: + """Create a sample file entity. + + Returns: + A dictionary representing a file entity in Chronicle format + """ + # Create sample file entity + return { + "metadata": { + "collected_timestamp": "1970-01-01T03:25:45.000000124Z", + "entity_type": "FILE", + "vendor_name": "Sample Vendor", + "product_name": "Entity Import Example", + }, + "entity": { + "file": { + "md5": "d41d8cd98f00b204e9800998ecf8427e", # MD5 of empty file + "sha1": "da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty file + "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", # SHA256 of empty file + "full_path": "/path/to/example.txt", + "size": "0", + "mimeType": "text/plain", + } + }, + } + + +def import_single_entity(chronicle_client): + """Demonstrate importing a single entity. + + Args: + chronicle_client: Initialized Chronicle client + """ + print("\n=== Importing a Single Entity (User) ===") + + # Create a sample user entity + user_entity = create_sample_user_entity() + user_id = user_entity["entity"]["user"]["userid"] + + print(f"Entity ID: {user_id}") + + try: + # Import the entity + result = chronicle_client.import_entities( + entities=user_entity, log_type="OKTA" + ) + + print("Entity successfully imported!") + print(f"API Response: {result}") + + except APIError as e: + print(f"Error importing entity: {e}") + + +def import_multiple_entities(chronicle_client): + """Demonstrate importing multiple entities of different types. + + Args: + chronicle_client: Initialized Chronicle client + """ + print("\n=== Importing Multiple Entities (Different Types) ===") + + # Create sample entities of different types + user_entity = create_sample_user_entity() + file_entity = create_sample_file_entity() + + entities = [user_entity, file_entity] + + print(f"Number of entities: {len(entities)}") + print(f"Entity Types: USER, FILE") + + try: + # Import multiple entities in a single API call + result = chronicle_client.import_entities( + entities=entities, log_type="OKTA" + ) + + print("All entities successfully imported!") + print(f"API Response: {result}") + + except APIError as e: + print(f"Error importing entities: {e}") + + +def main(): + """Run the example.""" + parser = argparse.ArgumentParser( + description="Example of entity import with Chronicle" + ) + parser.add_argument( + "--customer_id", required=True, help="Chronicle instance ID" + ) + parser.add_argument("--project_id", required=True, help="GCP project ID") + parser.add_argument("--region", default="us", help="Chronicle API region") + + args = parser.parse_args() + + # Initialize the client + client = SecOpsClient() + + # Configure Chronicle client + chronicle = client.chronicle( + customer_id=args.customer_id, + project_id=args.project_id, + region=args.region, + ) + + # Run examples + import_single_entity(chronicle) + import_multiple_entities(chronicle) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 2cba40d9..bc681962 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.21.2" +version = "0.22.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.7" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 37f157f6..58ea12bf 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -66,6 +66,7 @@ extract_forwarder_id, get_forwarder, get_or_create_forwarder, + import_entities, ingest_log, list_forwarders, update_forwarder, @@ -142,10 +143,8 @@ ) from secops.chronicle.udm_search import ( fetch_udm_search_csv, - find_udm_field_values, -) -from secops.chronicle.udm_search import ( fetch_udm_search_view, + find_udm_field_values, ) from secops.chronicle.validate import validate_query @@ -164,6 +163,7 @@ # Natural Language Search "translate_nl_to_udm", # Entity + "import_entities", "summarize_entity", # IoC "list_iocs", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 35ef6254..a9f1d640 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -101,6 +101,7 @@ ) from secops.chronicle.log_ingest import ingest_log as _ingest_log from secops.chronicle.log_ingest import ingest_udm as _ingest_udm +from secops.chronicle.log_ingest import import_entities as _import_entities from secops.chronicle.log_ingest import list_forwarders as _list_forwarders from secops.chronicle.log_ingest import update_forwarder as _update_forwarder from secops.chronicle.log_types import LogType @@ -1920,6 +1921,31 @@ def ingest_log( labels=labels, ) + def import_entities( + self, + entities: Union[Dict[str, Any], List[Dict[str, Any]]], + log_type: str, + ) -> Dict[str, Any]: + """Import entities into Chronicle. + + Args: + client: ChronicleClient instance + entities: An entity dictionary or a list of entity dictionaries + log_type: The log type of the log from which this entity is created + + Returns: + Dictionary containing the operation details for the ingestion + + Raises: + ValueError: If any required fields are missing or entities malformed + APIError: If the API request fails + """ + return _import_entities( + self, + entities=entities, + log_type=log_type, + ) + def create_forwarder( self, display_name: str, diff --git a/src/secops/chronicle/log_ingest.py b/src/secops/chronicle/log_ingest.py index 070a0ae4..262e4a43 100644 --- a/src/secops/chronicle/log_ingest.py +++ b/src/secops/chronicle/log_ingest.py @@ -1043,3 +1043,59 @@ def ingest_udm( response_data = {"raw_response": response.text} return response_data + + +def import_entities( + client: "ChronicleClient", + entities: Union[Dict[str, Any], List[Dict[str, Any]]], + log_type: str, +) -> Dict[str, Any]: + """Import entities into Chronicle. + + Args: + client: ChronicleClient instance + entities: An entity dictionary or a list of entity dictionaries + log_type: The log type of the log from which this entity is created + + Returns: + Dictionary containing the operation details for the ingestion + + Raises: + ValueError: If any required fields are missing or entities malformed + APIError: If the API request fails + """ + # Ensure we have a list of entities + if isinstance(entities, dict): + entities = [entities] + + if not entities: + raise ValueError("No entities provided") + + if not log_type: + raise ValueError("No log type provided") + + # Prepare the request + url = f"{client.base_url}/{client.instance_id}/entities:import" + + # Format the request body + body = {"inline_source": {"entities": entities, "log_type": log_type}} + + # Make the API request + response = client.session.post(url, json=body) + + # Check for errors + if response.status_code >= 400: + error_message = f"Failed to import entities: {response.text}" + raise APIError(error_message) + + response_data = {} + + # Parse response if it has content + if response.text.strip(): + try: + response_data = response.json() + except ValueError: + # If JSON parsing fails, provide the raw text in the return value + response_data = {"raw_response": response.text} + + return response_data diff --git a/src/secops/cli.py b/src/secops/cli.py index 0b4c2cc9..eb116987 100644 --- a/src/secops/cli.py +++ b/src/secops/cli.py @@ -691,8 +691,16 @@ def setup_entity_command(subparsers): entity_parser = subparsers.add_parser( "entity", help="Get entity information" ) + + # Create a subparser object + entity_subparsers = entity_parser.add_subparsers( + dest="entity_subcommand", help="Entity subcommands" + ) + + # Add arguments to the main entity parser + # These arguments are now optional since we'll check for them in the handler entity_parser.add_argument( - "--value", required=True, help="Entity value (IP, domain, hash, etc.)" + "--value", help="Entity value (IP, domain, hash, etc.)" ) entity_parser.add_argument( "--entity-type", @@ -703,9 +711,46 @@ def setup_entity_command(subparsers): add_time_range_args(entity_parser) entity_parser.set_defaults(func=handle_entity_command) + # Ingest entities command as a subcommand + entities_import_parser = entity_subparsers.add_parser( + "import", help="Import entities" + ) + entities_import_parser.add_argument( + "--file", + required=True, + help="File containing entity(s) (in JSON format)", + ) + entities_import_parser.add_argument( + "--type", required=True, help="Log type" + ) + entities_import_parser.set_defaults(func=handle_import_entities_command) + def handle_entity_command(args, chronicle): - """Handle the entity command.""" + """Handle the entity command. + + This function will check if a subcommand is used or if --value is provided + when using the entity command directly. + """ + # If a subcommand is specified, this function should not be called. + # However, if it is called with a subcommand, we should exit gracefully. + if hasattr(args, "entity_subcommand") and args.entity_subcommand: + print( + "Error: Unexpected command handling for subcommand " + f"{args.entity_subcommand}", + file=sys.stderr, + ) + sys.exit(1) + + # If no subcommand, --value is required + if not args.value: + print( + "Error: --value is required when using the entity " + "command without a subcommand", + file=sys.stderr, + ) + sys.exit(1) + start_time, end_time = get_time_range(args) try: @@ -947,6 +992,21 @@ def handle_udm_ingest_command(args, chronicle): sys.exit(1) +def handle_import_entities_command(args, chronicle): + """Handle import entities command.""" + try: + with open(args.file, "r", encoding="utf-8") as f: + entities = json.load(f) + + result = chronicle.import_entities( + entities=entities, log_type=args.type + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + def handle_log_types_command(args, chronicle): """Handle listing log types command.""" try: diff --git a/tests/chronicle/test_integration.py b/tests/chronicle/test_integration.py index 0dcf89e7..5bc47d8b 100644 --- a/tests/chronicle/test_integration.py +++ b/tests/chronicle/test_integration.py @@ -27,6 +27,7 @@ import json import re import time +import uuid @pytest.mark.integration @@ -1828,3 +1829,81 @@ def test_find_udm_field_values(): except Exception as e: print(f"Unexpected error in find_udm_field_values test: {str(e)}") raise + + +@pytest.mark.integration +def test_import_entities(): + """Test entity import method of chronicle.""" + # Create a SecOps client instance + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Get current time for entity metadata + current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + # Create a test entity with a unique identifier + entity_id = f"test_user_{uuid.uuid4().hex[:8]}" + test_entity = { + "metadata": { + "collected_timestamp": current_time, + "entity_type": "USER", + "vendor_name": "SecOps SDK Test", + "product_name": "Entity Integration Test", + }, + "entity": { + "user": { + "userid": entity_id, + "product_object_id": f"test_object_{uuid.uuid4().hex[:8]}", + } + }, + } + + try: + # Import the entity + print(f"\nImporting test entity with ID: {entity_id}") + result = chronicle.import_entities( + entities=test_entity, log_type="OKTA" + ) + + # Verify response + assert result is not None + assert result == {} + # An empty dict response indicates success + print(f"Import response: {result}") + + # Test with multiple entities + entity_id2 = f"test_user_{uuid.uuid4().hex[:8]}" + test_entity2 = { + "metadata": { + "collected_timestamp": current_time, + "entity_type": "USER", + "vendor_name": "SecOps SDK Test", + "product_name": "Entity Integration Test", + }, + "entity": { + "user": { + "userid": entity_id2, + "product_object_id": f"test_object_{uuid.uuid4().hex[:8]}", + } + }, + } + + # Import multiple entities + print( + f"\nImporting multiple test entities: {entity_id} and {entity_id2}" + ) + multi_result = chronicle.import_entities( + entities=[test_entity, test_entity2], log_type="OKTA" + ) + + # Verify multiple entity response + assert multi_result is not None + assert multi_result == {} + print(f"Multiple entity import response: {multi_result}") + + except APIError as e: + print(f"\nAPI Error details: {str(e)}") + # Skip the test rather than fail if permissions are not available + if "permission" in str(e).lower() or "not authorized" in str(e).lower(): + pytest.skip("Insufficient permissions to import entities") + raise diff --git a/tests/chronicle/test_log_ingest.py b/tests/chronicle/test_log_ingest.py index 8753c71a..3cadf89e 100644 --- a/tests/chronicle/test_log_ingest.py +++ b/tests/chronicle/test_log_ingest.py @@ -29,6 +29,7 @@ extract_forwarder_id, ingest_udm, delete_forwarder, + import_entities, ) from secops.exceptions import APIError @@ -969,3 +970,118 @@ def test_delete_forwarder_error(chronicle_client): with patch.object(chronicle_client.session, "delete", return_value=error_response): with pytest.raises(APIError, match="Failed to delete forwarder"): delete_forwarder(client=chronicle_client, forwarder_id="test-forwarder-id") + + +@pytest.fixture +def mock_entity(): + """Create a sample entity for testing.""" + return { + "metadata": { + "collected_timestamp": "2025-01-01T00:00:00Z", + "vendor_name": "TestVendor", + "product_name": "TestProduct", + "entity_type": "USER", + }, + "entity": { + "user": { + "userid": "testuser", + } + }, + } + + +@pytest.fixture +def mock_import_entities_response(): + """Create a mock import entities API response.""" + mock = Mock() + mock.status_code = 200 + mock.text = "{}" + mock.json.return_value = {} + return mock + + +def test_import_entities_single_entity( + chronicle_client, mock_entity, mock_import_entities_response +): + """Test importing a single entity.""" + with patch.object( + chronicle_client.session, "post", return_value=mock_import_entities_response + ): + result = import_entities( + client=chronicle_client, entities=mock_entity, log_type="TEST_LOG_TYPE" + ) + + call_args = chronicle_client.session.post.call_args + assert call_args is not None + + url = call_args[0][0] + assert ( + "projects/test-project/locations/us/instances/test-customer/entities:import" + in url + ) + + payload = call_args[1]["json"] + assert "inline_source" in payload + assert "entities" in payload["inline_source"] + assert len(payload["inline_source"]["entities"]) == 1 + assert ( + payload["inline_source"]["entities"][0]["entity"]["user"]["userid"] + == "testuser" + ) + assert payload["inline_source"]["log_type"] == "TEST_LOG_TYPE" + + assert isinstance(result, dict) + + +def test_import_entities_multiple_entities( + chronicle_client, mock_entity, mock_import_entities_response +): + """Test importing multiple entities.""" + entity2 = { + "metadata": { + "collected_timestamp": "2025-01-01T00:00:00Z", + "vendor_name": "TestVendor", + "product_name": "TestProduct", + "entity_type": "ASSET", + }, + "entity": { + "asset": { + "hostname": "testhost", + } + }, + } + entities = [mock_entity, entity2] + + with patch.object( + chronicle_client.session, "post", return_value=mock_import_entities_response + ): + import_entities( + client=chronicle_client, entities=entities, log_type="TEST_LOG_TYPE" + ) + + call_args = chronicle_client.session.post.call_args + assert call_args is not None + + payload = call_args[1]["json"] + assert len(payload["inline_source"]["entities"]) == 2 + + +def test_import_entities_api_error(chronicle_client, mock_entity): + """Test error handling when the API request fails.""" + error_response = Mock() + error_response.status_code = 400 + error_response.text = "Invalid request" + + with patch.object(chronicle_client.session, "post", return_value=error_response): + with pytest.raises(APIError, match="Failed to import entities"): + import_entities( + client=chronicle_client, + entities=mock_entity, + log_type="TEST_LOG_TYPE", + ) + + +def test_import_entities_validation_error_empty_entities(chronicle_client): + """Test validation error when no entities are provided.""" + with pytest.raises(ValueError, match="No entities provided"): + import_entities(client=chronicle_client, entities=[], log_type="TEST_LOG_TYPE") diff --git a/tests/cli/test_integration.py b/tests/cli/test_integration.py index b1734741..b730a976 100644 --- a/tests/cli/test_integration.py +++ b/tests/cli/test_integration.py @@ -6,6 +6,7 @@ import os import tempfile import time +import uuid from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import patch @@ -2758,3 +2759,99 @@ def test_cli_udm_field_values(cli_env, common_args): except json.JSONDecodeError: # If not valid JSON, fail the test assert False, f"Output is not valid JSON: {result.stdout}" + +@pytest.mark.integration +def test_cli_entity_import(cli_env, common_args): + """Test the entity import command using the CLI.""" + # Get current time for entity metadata + current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + # Create unique entity IDs for this test run + entity_id1 = f"test_user_{uuid.uuid4().hex[:8]}" + entity_id2 = f"test_user_{uuid.uuid4().hex[:8]}" + + # Create test entities + test_entities = [ + { + "metadata": { + "collected_timestamp": current_time, + "entity_type": "USER", + "vendor_name": "CLI Test", + "product_name": "Entity Import Test", + }, + "entity": { + "user": { + "userid": entity_id1, + "product_object_id": f"test_obj_{uuid.uuid4().hex[:8]}", + } + }, + }, + { + "metadata": { + "collected_timestamp": current_time, + "entity_type": "USER", + "vendor_name": "CLI Test", + "product_name": "Entity Import Test", + }, + "entity": { + "user": { + "userid": entity_id2, + "product_object_id": f"test_obj_{uuid.uuid4().hex[:8]}", + } + }, + }, + ] + + # Create a temporary file for the entities + entity_file_path = None + + try: + # Write entities to temporary file + with tempfile.NamedTemporaryFile( + suffix=".json", mode="w+", delete=False + ) as temp_file: + json.dump(test_entities, temp_file, indent=2) + entity_file_path = temp_file.name + + # Execute the entity import CLI command + cmd = ( + [ + "secops", + ] + + common_args + + [ + "entity", + "import", + "--file", + entity_file_path, + "--type", + "OKTA", + ] + ) + + print("\nRunning entity import command") + result = subprocess.run( + cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert result.returncode == 0, f"Command failed: {result.stderr}" + + # Check output format - should be JSON + try: + output = json.loads(result.stdout) + print(f"Command output: {output}") + # Empty dict response indicates success + assert output == {} + except json.JSONDecodeError: + # If not valid JSON, check for error messages + assert "Error:" not in result.stdout + assert "Error:" not in result.stderr + + print("Entity import command executed successfully") + + finally: + # Clean up the temporary entity file + if entity_file_path and os.path.exists(entity_file_path): + os.unlink(entity_file_path) + print(f"Cleaned up temporary entity file: {entity_file_path}")