Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.22.0] - 2025-10-30
### Added
- Support for entity import method

## [0.21.2] - 2025-10-15
### Added
- Support for filter in list rule deployments method
Expand Down
6 changes: 6 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,12 @@ secops search --query "metadata.event_type = \"USER_LOGIN\" AND security_result.
secops entity --value "192.168.1.100" --time-window 72
```

### Import entities:

```bash
secops entity import --type "CUSTOM_LOG_TYPE" --file "/path/to/entities.json"
```

### Check for Critical IoCs

```bash
Expand Down
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,47 @@ result = chronicle.ingest_udm(udm_events=[network_event, process_event])
print("Multiple events ingested successfully")
```

Import entities into Chronicle:

```python
# Create a sample entity
entity = {
"metadata": {
"collected_timestamp": "2025-01-01T00:00:00Z",
"vendor_name": "TestVendor",
"product_name": "TestProduct",
"entity_type": "USER",
},
"entity": {
"user": {
"userid": "testuser",
}
},
}

# Import a single entity
result = chronicle.import_entities(entities=entity, log_type="TEST_LOG_TYPE")
print(f"Imported entity: {result}")

# Import multiple entities
entity2 = {
"metadata": {
"collected_timestamp": "2025-01-01T00:00:00Z",
"vendor_name": "TestVendor",
"product_name": "TestProduct",
"entity_type": "ASSET",
},
"entity": {
"asset": {
"hostname": "testhost",
}
},
}
entities = [entity, entity2]
result = chronicle.import_entities(entities=entities, log_type="TEST_LOG_TYPE")
print(f"Imported entities: {result}")
```

### Data Export

> **Note**: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality.
Expand Down
2 changes: 1 addition & 1 deletion api_module_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/
|enrichmentControls.get |v1alpha| | |
|enrichmentControls.list |v1alpha| | |
|entities.get |v1alpha| | |
|entities.import |v1alpha| | |
|entities.import |v1alpha|chronicle.log_ingest.import_entities |secops entity import |
|entities.modifyEntityRiskScore |v1alpha| | |
|entities.queryEntityRiskScoreModifications |v1alpha| | |
|entityRiskScores.query |v1alpha| | |
Expand Down
164 changes: 164 additions & 0 deletions examples/entity_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Example demonstrating entity import functionality with Chronicle."""

import argparse
import uuid
from typing import Any, Dict

from secops import SecOpsClient
from secops.exceptions import APIError


def create_sample_user_entity() -> Dict[str, Any]:
"""Create a sample user entity.

Returns:
A dictionary representing a user entity in Chronicle format
"""

# Generate a unique ID for this entity
user_id = f"user_{uuid.uuid4().hex[:8]}"

# Create sample user entity
return {
"metadata": {
"collectedTimestamp": "1970-01-01T03:25:45.000000124Z",
"vendorName": "vendor",
"productName": "product",
"entityType": "USER",
},
"entity": {
"user": {"userid": user_id, "productObjectId": "dev google"}
},
}


def create_sample_file_entity() -> Dict[str, Any]:
"""Create a sample file entity.

Returns:
A dictionary representing a file entity in Chronicle format
"""
# Create sample file entity
return {
"metadata": {
"collected_timestamp": "1970-01-01T03:25:45.000000124Z",
"entity_type": "FILE",
"vendor_name": "Sample Vendor",
"product_name": "Entity Import Example",
},
"entity": {
"file": {
"md5": "d41d8cd98f00b204e9800998ecf8427e", # MD5 of empty file
"sha1": "da39a3ee5e6b4b0d3255bfef95601890afd80709", # SHA1 of empty file
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", # SHA256 of empty file
"full_path": "/path/to/example.txt",
"size": "0",
"mimeType": "text/plain",
}
},
}


def import_single_entity(chronicle_client):
"""Demonstrate importing a single entity.

Args:
chronicle_client: Initialized Chronicle client
"""
print("\n=== Importing a Single Entity (User) ===")

# Create a sample user entity
user_entity = create_sample_user_entity()
user_id = user_entity["entity"]["user"]["userid"]

print(f"Entity ID: {user_id}")

try:
# Import the entity
result = chronicle_client.import_entities(
entities=user_entity, log_type="OKTA"
)

print("Entity successfully imported!")
print(f"API Response: {result}")

except APIError as e:
print(f"Error importing entity: {e}")


def import_multiple_entities(chronicle_client):
"""Demonstrate importing multiple entities of different types.

Args:
chronicle_client: Initialized Chronicle client
"""
print("\n=== Importing Multiple Entities (Different Types) ===")

# Create sample entities of different types
user_entity = create_sample_user_entity()
file_entity = create_sample_file_entity()

entities = [user_entity, file_entity]

print(f"Number of entities: {len(entities)}")
print(f"Entity Types: USER, FILE")

try:
# Import multiple entities in a single API call
result = chronicle_client.import_entities(
entities=entities, log_type="OKTA"
)

print("All entities successfully imported!")
print(f"API Response: {result}")

except APIError as e:
print(f"Error importing entities: {e}")


def main():
"""Run the example."""
parser = argparse.ArgumentParser(
description="Example of entity import with Chronicle"
)
parser.add_argument(
"--customer_id", required=True, help="Chronicle instance ID"
)
parser.add_argument("--project_id", required=True, help="GCP project ID")
parser.add_argument("--region", default="us", help="Chronicle API region")

args = parser.parse_args()

# Initialize the client
client = SecOpsClient()

# Configure Chronicle client
chronicle = client.chronicle(
customer_id=args.customer_id,
project_id=args.project_id,
region=args.region,
)

# Run examples
import_single_entity(chronicle)
import_multiple_entities(chronicle)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "secops"
version = "0.21.2"
version = "0.22.0"
description = "Python SDK for wrapping the Google SecOps API for common use cases"
readme = "README.md"
requires-python = ">=3.7"
Expand Down
6 changes: 3 additions & 3 deletions src/secops/chronicle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
extract_forwarder_id,
get_forwarder,
get_or_create_forwarder,
import_entities,
ingest_log,
list_forwarders,
update_forwarder,
Expand Down Expand Up @@ -142,10 +143,8 @@
)
from secops.chronicle.udm_search import (
fetch_udm_search_csv,
find_udm_field_values,
)
from secops.chronicle.udm_search import (
fetch_udm_search_view,
find_udm_field_values,
)
from secops.chronicle.validate import validate_query

Expand All @@ -164,6 +163,7 @@
# Natural Language Search
"translate_nl_to_udm",
# Entity
"import_entities",
"summarize_entity",
# IoC
"list_iocs",
Expand Down
26 changes: 26 additions & 0 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
)
from secops.chronicle.log_ingest import ingest_log as _ingest_log
from secops.chronicle.log_ingest import ingest_udm as _ingest_udm
from secops.chronicle.log_ingest import import_entities as _import_entities
from secops.chronicle.log_ingest import list_forwarders as _list_forwarders
from secops.chronicle.log_ingest import update_forwarder as _update_forwarder
from secops.chronicle.log_types import LogType
Expand Down Expand Up @@ -1920,6 +1921,31 @@ def ingest_log(
labels=labels,
)

def import_entities(
self,
entities: Union[Dict[str, Any], List[Dict[str, Any]]],
log_type: str,
) -> Dict[str, Any]:
"""Import entities into Chronicle.

Args:
client: ChronicleClient instance
entities: An entity dictionary or a list of entity dictionaries
log_type: The log type of the log from which this entity is created

Returns:
Dictionary containing the operation details for the ingestion

Raises:
ValueError: If any required fields are missing or entities malformed
APIError: If the API request fails
"""
return _import_entities(
self,
entities=entities,
log_type=log_type,
)

def create_forwarder(
self,
display_name: str,
Expand Down
56 changes: 56 additions & 0 deletions src/secops/chronicle/log_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,3 +1043,59 @@ def ingest_udm(
response_data = {"raw_response": response.text}

return response_data


def import_entities(
client: "ChronicleClient",
entities: Union[Dict[str, Any], List[Dict[str, Any]]],
log_type: str,
) -> Dict[str, Any]:
"""Import entities into Chronicle.

Args:
client: ChronicleClient instance
entities: An entity dictionary or a list of entity dictionaries
log_type: The log type of the log from which this entity is created

Returns:
Dictionary containing the operation details for the ingestion

Raises:
ValueError: If any required fields are missing or entities malformed
APIError: If the API request fails
"""
# Ensure we have a list of entities
if isinstance(entities, dict):
entities = [entities]

if not entities:
raise ValueError("No entities provided")

if not log_type:
raise ValueError("No log type provided")

# Prepare the request
url = f"{client.base_url}/{client.instance_id}/entities:import"

# Format the request body
body = {"inline_source": {"entities": entities, "log_type": log_type}}

# Make the API request
response = client.session.post(url, json=body)

# Check for errors
if response.status_code >= 400:
error_message = f"Failed to import entities: {response.text}"
raise APIError(error_message)

response_data = {}

# Parse response if it has content
if response.text.strip():
try:
response_data = response.json()
except ValueError:
# If JSON parsing fails, provide the raw text in the return value
response_data = {"raw_response": response.text}

return response_data
Loading