From 5717c32d0f302e36569e693ab77ff6be287cf21c Mon Sep 17 00:00:00 2001 From: Dan Dye Date: Thu, 21 Aug 2025 22:31:49 -0400 Subject: [PATCH] demo a fix for TargetEntity --- .../marketplace/googlechronicle.py | 224 ++++++++++-------- 1 file changed, 119 insertions(+), 105 deletions(-) diff --git a/server/secops-soar/secops_soar_mcp/marketplace/googlechronicle.py b/server/secops-soar/secops_soar_mcp/marketplace/googlechronicle.py index 5dcf31a5..f2732e7a 100644 --- a/server/secops-soar/secops_soar_mcp/marketplace/googlechronicle.py +++ b/server/secops-soar/secops_soar_mcp/marketplace/googlechronicle.py @@ -24,7 +24,7 @@ def register_tools(mcp: FastMCP): # This function registers all tools (actions) for the GoogleChronicle integration. @mcp.tool() - async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], rule_id: Annotated[str, Field(..., description="Specify the ID of the rule for which you want to fetch details.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], rule_id: Annotated[str, Field(..., description="Specify the ID of the rule for which you want to fetch details.")], target_entities: Annotated[ List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Fetch information about a rule in Google Chronicle. Returns: @@ -33,8 +33,9 @@ async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., d final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -50,7 +51,7 @@ async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., d final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -60,15 +61,15 @@ async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., d except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Rule ID"] = rule_id - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -84,7 +85,7 @@ async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., d "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -99,7 +100,7 @@ async def google_chronicle_get_rule_details(case_id: Annotated[str, Field(..., d return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], rule_id: Annotated[str, Field(..., description="Specify the ID of the rule, which is related to the detection.")], detection_id: Annotated[str, Field(..., description="Specify the ID of the detection for which you want to fetch details.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], rule_id: Annotated[str, Field(..., description="Specify the ID of the rule, which is related to the detection.")], detection_id: Annotated[str, Field(..., description="Specify the ID of the detection for which you want to fetch details.")], target_entities: Annotated[ List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Fetch information about a detection in Google Chronicle. Returns: @@ -108,8 +109,9 @@ async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(. final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -125,7 +127,7 @@ async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(. final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -135,16 +137,16 @@ async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(. except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Rule ID"] = rule_id script_params["Detection ID"] = detection_id - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -160,7 +162,7 @@ async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(. "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -175,7 +177,7 @@ async def google_chronicle_get_detection_details(case_id: Annotated[str, Field(. return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], query: Annotated[str, Field(..., description="Specify the query that needs to be executed in Chronicle.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Alert Time Till Now\" is selected, action will use start time of the alert as start time for the search and end time will be current time. If \"30 Minutes Around Alert Time\" is selected, action will search the alerts 30 minutes before the alert happened till the 30 minutes after the alert has happened. Same idea applies to \"1 Hour Around Alert Time\" and \"5 Minutes Around Alert Time\". If \"Custom\" is selected, you also need to provide \"Start Time\".")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \"Custom\" is selected for the \"Time Frame\" parameter. Format: ISO 8601. Note: The maximum time window (start time to end time) is 90 days.")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \"Custom\" is selected for the \"Time Frame\" parameter then this parameter will use current time. Note: The maximum time window (start time to end time) is 90 days.")], max_results_to_return: Annotated[str, Field(default=None, description="Specify how many results to return for the query. Default: 50. Maximum: 10000.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], query: Annotated[str, Field(..., description="Specify the query that needs to be executed in Chronicle.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Alert Time Till Now\" is selected, action will use start time of the alert as start time for the search and end time will be current time. If \"30 Minutes Around Alert Time\" is selected, action will search the alerts 30 minutes before the alert happened till the 30 minutes after the alert has happened. Same idea applies to \"1 Hour Around Alert Time\" and \"5 Minutes Around Alert Time\". If \"Custom\" is selected, you also need to provide \"Start Time\".")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \"Custom\" is selected for the \"Time Frame\" parameter. Format: ISO 8601. Note: The maximum time window (start time to end time) is 90 days.")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \"Custom\" is selected for the \"Time Frame\" parameter then this parameter will use current time. Note: The maximum time window (start time to end time) is 90 days.")], max_results_to_return: Annotated[str, Field(default=None, description="Specify how many results to return for the query. Default: 50. Maximum: 10000.")], target_entities: Annotated[ List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Execute custom UDM query in Google Chronicle. Note: 120 action executions are allowed per hour. Returns: @@ -184,8 +186,9 @@ async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -201,7 +204,7 @@ async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -211,12 +214,12 @@ async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Query"] = query if time_frame is not None: @@ -227,7 +230,7 @@ async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., script_params["End Time"] = end_time if max_results_to_return is not None: script_params["Max Results To Return"] = max_results_to_return - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -243,7 +246,7 @@ async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -258,7 +261,7 @@ async def google_chronicle_execute_udm_query(case_id: Annotated[str, Field(..., return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], reference_list_name: Annotated[str, Field(..., description="Specify the display name of the reference list that needs to be updated.")], values: Annotated[str, Field(..., description="Specify a comma-separated list of values that need to be removed from a reference list.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], reference_list_name: Annotated[str, Field(..., description="Specify the display name of the reference list that needs to be updated.")], values: Annotated[str, Field(..., description="Specify a comma-separated list of values that need to be removed from a reference list.")], target_entities: Annotated[ List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Remove values from a reference list in Google Chronicle. Returns: @@ -267,7 +270,7 @@ async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[ final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: # Specific target entities provided, ignore scope parameter final_target_entities = target_entities @@ -284,7 +287,7 @@ async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[ final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -294,16 +297,16 @@ async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[ except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Reference List Name"] = reference_list_name script_params["Values"] = values - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -319,7 +322,7 @@ async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[ "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -334,7 +337,7 @@ async def google_chronicle_remove_values_from_reference_list(case_id: Annotated[ return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], reference_list_name: Annotated[str, Field(..., description="Specify the display name of the reference list that needs to be updated")], values: Annotated[str, Field(..., description="Specify a comma-separated list of values that need to be added to a reference list")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], reference_list_name: Annotated[str, Field(..., description="Specify the display name of the reference list that needs to be updated")], values: Annotated[str, Field(..., description="Specify a comma-separated list of values that need to be added to a reference list")], target_entities: Annotated[ List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Add values to a reference list in Google Chronicle. Returns: @@ -343,8 +346,9 @@ async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -360,7 +364,7 @@ async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -370,16 +374,16 @@ async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Reference List Name"] = reference_list_name script_params["Values"] = values - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -395,7 +399,7 @@ async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -410,7 +414,7 @@ async def google_chronicle_add_values_to_reference_list(case_id: Annotated[str, return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], start_time: Annotated[str, Field(default=None, description="Fetches IOC Domain from the specified time. Value should be in RFC 3339 format (e.g. 2018-11-05T12:00:00Z). If not supplied, the default is the UTC time corresponding to 3 days earlier than current time.")], max_io_cs_to_fetch: Annotated[str, Field(default=None, description="Specify the maximum number of IoCs to return. You can specify between 1 and 10,000. The default is 50.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], start_time: Annotated[str, Field(default=None, description="Fetches IOC Domain from the specified time. Value should be in RFC 3339 format (e.g. 2018-11-05T12:00:00Z). If not supplied, the default is the UTC time corresponding to 3 days earlier than current time.")], max_io_cs_to_fetch: Annotated[str, Field(default=None, description="Specify the maximum number of IoCs to return. You can specify between 1 and 10,000. The default is 50.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """List all of the IoCs discovered within your enterprise within the specified time range. Returns: @@ -419,8 +423,9 @@ async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., descrip final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -436,7 +441,7 @@ async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., descrip final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -446,18 +451,18 @@ async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., descrip except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if start_time is not None: script_params["Start Time"] = start_time if max_io_cs_to_fetch is not None: script_params["Max IoCs to Fetch"] = max_io_cs_to_fetch - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -473,7 +478,7 @@ async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., descrip "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -488,7 +493,7 @@ async def google_chronicle_list_io_cs(case_id: Annotated[str, Field(..., descrip return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], lowest_suspicious_severity: Annotated[List[Any], Field(..., description="Specify the lowest severity that should be associated with IP in order to mark it suspicious.")], mark_suspicious_n_a_severity: Annotated[bool, Field(..., description="If enabled, action will mark the entity as suspicious, if information about severity is not available.")], create_insight: Annotated[bool, Field(default=None, description="If enabled, action will create an insight containing information about the entities.")], only_suspicious_insight: Annotated[bool, Field(default=None, description="If enabled, action will only create an insight for entities that are marked as suspicious. Note: \"Create Insight\" parameter needs to be enabled.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], lowest_suspicious_severity: Annotated[List[Any], Field(..., description="Specify the lowest severity that should be associated with IP in order to mark it suspicious.")], mark_suspicious_n_a_severity: Annotated[bool, Field(..., description="If enabled, action will mark the entity as suspicious, if information about severity is not available.")], create_insight: Annotated[bool, Field(default=None, description="If enabled, action will create an insight containing information about the entities.")], only_suspicious_insight: Annotated[bool, Field(default=None, description="If enabled, action will only create an insight for entities that are marked as suspicious. Note: \"Create Insight\" parameter needs to be enabled.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Enrich IP entities using information from IOCs in Google Chronicle. Supported entities: IP address. Returns: @@ -497,8 +502,9 @@ async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., descript final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -514,7 +520,7 @@ async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., descript final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -524,12 +530,12 @@ async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., descript except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if create_insight is not None: script_params["Create Insight"] = create_insight @@ -537,7 +543,7 @@ async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., descript script_params["Only Suspicious Insight"] = only_suspicious_insight script_params["Lowest Suspicious Severity"] = lowest_suspicious_severity script_params["Mark Suspicious N/A Severity"] = mark_suspicious_n_a_severity - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -553,7 +559,7 @@ async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., descript "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -568,7 +574,7 @@ async def google_chronicle_enrich_ip(case_id: Annotated[str, Field(..., descript return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_ping(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_ping(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Test connectivity to the Google Chronicle with parameters provided at the integration configuration page on the Marketplace tab. Returns: @@ -577,8 +583,9 @@ async def google_chronicle_ping(case_id: Annotated[str, Field(..., description=" final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -594,7 +601,7 @@ async def google_chronicle_ping(case_id: Annotated[str, Field(..., description=" final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -604,14 +611,14 @@ async def google_chronicle_ping(case_id: Annotated[str, Field(..., description=" except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -627,7 +634,7 @@ async def google_chronicle_ping(case_id: Annotated[str, Field(..., description=" "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -642,7 +649,7 @@ async def google_chronicle_ping(case_id: Annotated[str, Field(..., description=" return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_list_events(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], output: Annotated[List[Any], Field(..., description="Specify what should be the output for this action.")], event_types: Annotated[str, Field(default=None, description="Specify a comma-separated list of the event types that need to be returned. If nothing is provided, action will fetch all event types. Possible values: EVENTTYPE_UNSPECIFIED, PROCESS_UNCATEGORIZED, PROCESS_LAUNCH, PROCESS_INJECTION, PROCESS_PRIVILEGE_ESCALATION, PROCESS_TERMINATION, PROCESS_OPEN, PROCESS_MODULE_LOAD, REGISTRY_UNCATEGORIZED, REGISTRY_CREATION, REGISTRY_MODIFICATION, REGISTRY_DELETION, SETTING_UNCATEGORIZED, SETTING_CREATION, SETTING_MODIFICATION, SETTING_DELETION, MUTEX_UNCATEGORIZED, MUTEX_CREATION, FILE_UNCATEGORIZED, FILE_CREATION, FILE_DELETION, FILE_MODIFICATION, FILE_READ, FILE_COPY, FILE_OPEN, FILE_MOVE, FILE_SYNC, USER_UNCATEGORIZED, USER_LOGIN, USER_LOGOUT, USER_CREATION, USER_CHANGE_PASSWORD, USER_CHANGE_PERMISSIONS, USER_STATS, USER_BADGE_IN, USER_DELETION, USER_RESOURCE_CREATION, USER_RESOURCE_UPDATE_CONTENT, USER_RESOURCE_UPDATE_PERMISSIONS, USER_COMMUNICATION, USER_RESOURCE_ACCESS, USER_RESOURCE_DELETION, GROUP_UNCATEGORIZED, GROUP_CREATION, GROUP_DELETION, GROUP_MODIFICATION, EMAIL_UNCATEGORIZED, EMAIL_TRANSACTION, EMAIL_URL_CLICK, NETWORK_UNCATEGORIZED, NETWORK_FLOW, NETWORK_CONNECTION, NETWORK_FTP, NETWORK_DHCP, NETWORK_DNS, NETWORK_HTTP, NETWORK_SMTP, STATUS_UNCATEGORIZED, STATUS_HEARTBEAT, STATUS_STARTUP, STATUS_SHUTDOWN, STATUS_UPDATE, SCAN_UNCATEGORIZED, SCAN_FILE, SCAN_PROCESS_BEHAVIORS, SCAN_PROCESS, SCAN_HOST, SCAN_VULN_HOST, SCAN_VULN_NETWORK, SCAN_NETWORK, SCHEDULED_TASK_UNCATEGORIZED, SCHEDULED_TASK_CREATION, SCHEDULED_TASK_DELETION, SCHEDULED_TASK_ENABLE, SCHEDULED_TASK_DISABLE, SCHEDULED_TASK_MODIFICATION, SYSTEM_AUDIT_LOG_UNCATEGORIZED, SYSTEM_AUDIT_LOG_WIPE, SERVICE_UNSPECIFIED, SERVICE_CREATION, SERVICE_DELETION, SERVICE_START, SERVICE_STOP, SERVICE_MODIFICATION, GENERIC_EVENT, RESOURCE_CREATION, RESOURCE_DELETION, RESOURCE_PERMISSIONS_CHANGE, RESOURCE_READ, RESOURCE_WRITTEN, ANALYST_UPDATE_VERDICT, ANALYST_UPDATE_REPUTATION, ANALYST_UPDATE_SEVERITY_SCORE, ANALYST_UPDATE_STATUS, ANALYST_ADD_COMMENT")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Custom\" is selected, you also need to provide \"Start Time\".")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \"Custom\" is selected for the \"Time Frame\" parameter. Format: ISO 8601")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \"Custom\" is selected for the \"Time Frame\" parameter then this parameter will use current time. Note: value \"now\" can also be used.")], reference_time: Annotated[str, Field(default=None, description="Specify the reference time for the event search. Format: YYYY-MM-DDThh:mmTZD. Note: if nothing is provided, action will use end time as reference time.")], max_events_to_return: Annotated[str, Field(default=None, description="Specify how many events to process per entity type. Default: 100.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_list_events(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], output: Annotated[List[Any], Field(..., description="Specify what should be the output for this action.")], event_types: Annotated[str, Field(default=None, description="Specify a comma-separated list of the event types that need to be returned. If nothing is provided, action will fetch all event types. Possible values: EVENTTYPE_UNSPECIFIED, PROCESS_UNCATEGORIZED, PROCESS_LAUNCH, PROCESS_INJECTION, PROCESS_PRIVILEGE_ESCALATION, PROCESS_TERMINATION, PROCESS_OPEN, PROCESS_MODULE_LOAD, REGISTRY_UNCATEGORIZED, REGISTRY_CREATION, REGISTRY_MODIFICATION, REGISTRY_DELETION, SETTING_UNCATEGORIZED, SETTING_CREATION, SETTING_MODIFICATION, SETTING_DELETION, MUTEX_UNCATEGORIZED, MUTEX_CREATION, FILE_UNCATEGORIZED, FILE_CREATION, FILE_DELETION, FILE_MODIFICATION, FILE_READ, FILE_COPY, FILE_OPEN, FILE_MOVE, FILE_SYNC, USER_UNCATEGORIZED, USER_LOGIN, USER_LOGOUT, USER_CREATION, USER_CHANGE_PASSWORD, USER_CHANGE_PERMISSIONS, USER_STATS, USER_BADGE_IN, USER_DELETION, USER_RESOURCE_CREATION, USER_RESOURCE_UPDATE_CONTENT, USER_RESOURCE_UPDATE_PERMISSIONS, USER_COMMUNICATION, USER_RESOURCE_ACCESS, USER_RESOURCE_DELETION, GROUP_UNCATEGORIZED, GROUP_CREATION, GROUP_DELETION, GROUP_MODIFICATION, EMAIL_UNCATEGORIZED, EMAIL_TRANSACTION, EMAIL_URL_CLICK, NETWORK_UNCATEGORIZED, NETWORK_FLOW, NETWORK_CONNECTION, NETWORK_FTP, NETWORK_DHCP, NETWORK_DNS, NETWORK_HTTP, NETWORK_SMTP, STATUS_UNCATEGORIZED, STATUS_HEARTBEAT, STATUS_STARTUP, STATUS_SHUTDOWN, STATUS_UPDATE, SCAN_UNCATEGORIZED, SCAN_FILE, SCAN_PROCESS_BEHAVIORS, SCAN_PROCESS, SCAN_HOST, SCAN_VULN_HOST, SCAN_VULN_NETWORK, SCAN_NETWORK, SCHEDULED_TASK_UNCATEGORIZED, SCHEDULED_TASK_CREATION, SCHEDULED_TASK_DELETION, SCHEDULED_TASK_ENABLE, SCHEDULED_TASK_DISABLE, SCHEDULED_TASK_MODIFICATION, SYSTEM_AUDIT_LOG_UNCATEGORIZED, SYSTEM_AUDIT_LOG_WIPE, SERVICE_UNSPECIFIED, SERVICE_CREATION, SERVICE_DELETION, SERVICE_START, SERVICE_STOP, SERVICE_MODIFICATION, GENERIC_EVENT, RESOURCE_CREATION, RESOURCE_DELETION, RESOURCE_PERMISSIONS_CHANGE, RESOURCE_READ, RESOURCE_WRITTEN, ANALYST_UPDATE_VERDICT, ANALYST_UPDATE_REPUTATION, ANALYST_UPDATE_SEVERITY_SCORE, ANALYST_UPDATE_STATUS, ANALYST_ADD_COMMENT")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Custom\" is selected, you also need to provide \"Start Time\".")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \"Custom\" is selected for the \"Time Frame\" parameter. Format: ISO 8601")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \"Custom\" is selected for the \"Time Frame\" parameter then this parameter will use current time. Note: value \"now\" can also be used.")], reference_time: Annotated[str, Field(default=None, description="Specify the reference time for the event search. Format: YYYY-MM-DDThh:mmTZD. Note: if nothing is provided, action will use end time as reference time.")], max_events_to_return: Annotated[str, Field(default=None, description="Specify how many events to process per entity type. Default: 100.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """List events on the particular asset in the specified time frame. Supported entities: IP Address, Mac Address, Hostname. Note: action can only fetch 10000 events. Make sure to narrow down the timeframe for better results. Returns: @@ -651,8 +658,9 @@ async def google_chronicle_list_events(case_id: Annotated[str, Field(..., descri final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -668,7 +676,7 @@ async def google_chronicle_list_events(case_id: Annotated[str, Field(..., descri final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -678,12 +686,12 @@ async def google_chronicle_list_events(case_id: Annotated[str, Field(..., descri except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if event_types is not None: script_params["Event Types"] = event_types @@ -698,7 +706,7 @@ async def google_chronicle_list_events(case_id: Annotated[str, Field(..., descri script_params["Output"] = output if max_events_to_return is not None: script_params["Max Events To Return"] = max_events_to_return - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -714,7 +722,7 @@ async def google_chronicle_list_events(case_id: Annotated[str, Field(..., descri "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -729,7 +737,7 @@ async def google_chronicle_list_events(case_id: Annotated[str, Field(..., descri return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], rule_id: Annotated[str, Field(..., description="Specify the ID of the rule for which you want to run retrohunt.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \u201cAlert Time Till Now\u201d is selected, action will use start time of the alert as start time for the search and end time will be current time. If \u201c30 Minutes Around Alert Time\u201d is selected, action will search the alerts 30 minutes before the alert happened till the 30 minutes after the alert has happened. Same idea applies to \u201c1 Hour Around Alert Time\u201d and \u201c5 Minutes Around Alert Time\u201d. If \u201cCustom\u201d is selected, you also need to provide \u201cStart Time\u201d.")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \u201cCustom\u201d is selected for the \u201cTime Frame\u201d parameter. Format: ISO 8601.")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \u201cCustom\u201d is selected for the \u201cTime Frame\u201d parameter then this parameter will use current time.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], rule_id: Annotated[str, Field(..., description="Specify the ID of the rule for which you want to run retrohunt.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \u201cAlert Time Till Now\u201d is selected, action will use start time of the alert as start time for the search and end time will be current time. If \u201c30 Minutes Around Alert Time\u201d is selected, action will search the alerts 30 minutes before the alert happened till the 30 minutes after the alert has happened. Same idea applies to \u201c1 Hour Around Alert Time\u201d and \u201c5 Minutes Around Alert Time\u201d. If \u201cCustom\u201d is selected, you also need to provide \u201cStart Time\u201d.")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \u201cCustom\u201d is selected for the \u201cTime Frame\u201d parameter. Format: ISO 8601.")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \u201cCustom\u201d is selected for the \u201cTime Frame\u201d parameter then this parameter will use current time.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Execute a rule retrohunt in Google Chronicle. Returns: @@ -738,8 +746,9 @@ async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -755,7 +764,7 @@ async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -765,12 +774,12 @@ async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Rule ID"] = rule_id if time_frame is not None: @@ -779,7 +788,7 @@ async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., script_params["Start Time"] = start_time if end_time is not None: script_params["End Time"] = end_time - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -795,7 +804,7 @@ async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -810,7 +819,7 @@ async def google_chronicle_execute_retrohunt(case_id: Annotated[str, Field(..., return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], reference_list_names: Annotated[str, Field(..., description="Specify a comma-separated list of display names of the reference list that needs to be searched.")], values: Annotated[str, Field(..., description="Specify a comma-separated list of values that need to be searched in reference lists.")], case_insensitive_search: Annotated[bool, Field(default=None, description="If enabled, action will perform case insensitive matching.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], reference_list_names: Annotated[str, Field(..., description="Specify a comma-separated list of display names of the reference list that needs to be searched.")], values: Annotated[str, Field(..., description="Specify a comma-separated list of values that need to be searched in reference lists.")], case_insensitive_search: Annotated[bool, Field(default=None, description="If enabled, action will perform case insensitive matching.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Check, if provided values are found in reference lists in Google Chronicle. Returns: @@ -819,8 +828,9 @@ async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Fi final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -836,7 +846,7 @@ async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Fi final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -846,18 +856,18 @@ async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Fi except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} script_params["Reference List Names"] = reference_list_names script_params["Values"] = values if case_insensitive_search is not None: script_params["Case Insensitive Search"] = case_insensitive_search - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -873,7 +883,7 @@ async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Fi "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -888,7 +898,7 @@ async def google_chronicle_is_value_in_reference_list(case_id: Annotated[str, Fi return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], max_hours_backwards: Annotated[str, Field(default=None, description="Specify how many hours backwards to fetch the assets. Default: 1 hour.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Custom\" is selected, you also need to provide \"Start Time\". If the \"Max Hours Backwards\" parameter is provided then action will use the \"Max Hours Backwards\" parameter to provide a time filter. This is done for backwards compatibility.")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \"Custom\" is selected for the \"Time Frame\" parameter. Format: ISO 8601")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \"Custom\" is selected for the \"Time Frame\" parameter then this parameter will use current time.")], max_assets_to_return: Annotated[str, Field(default=None, description="Specify how many assets to return in the response.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], max_hours_backwards: Annotated[str, Field(default=None, description="Specify how many hours backwards to fetch the assets. Default: 1 hour.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Custom\" is selected, you also need to provide \"Start Time\". If the \"Max Hours Backwards\" parameter is provided then action will use the \"Max Hours Backwards\" parameter to provide a time filter. This is done for backwards compatibility.")], start_time: Annotated[str, Field(default=None, description="Specify the start time for the results. This parameter is mandatory, if \"Custom\" is selected for the \"Time Frame\" parameter. Format: ISO 8601")], end_time: Annotated[str, Field(default=None, description="Specify the end time for the results. Format: ISO 8601. If nothing is provided and \"Custom\" is selected for the \"Time Frame\" parameter then this parameter will use current time.")], max_assets_to_return: Annotated[str, Field(default=None, description="Specify how many assets to return in the response.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """List assets in Google Chronicle based on the related entities in the specified time frame. Supported entities: URL, IP Address, File hash. Only MD5, SHA-1 or SHA-256 hashes are supported. Returns: @@ -897,8 +907,9 @@ async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., descri final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -914,7 +925,7 @@ async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., descri final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -924,12 +935,12 @@ async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., descri except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if max_hours_backwards is not None: script_params["Max Hours Backwards"] = max_hours_backwards @@ -941,7 +952,7 @@ async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., descri script_params["End Time"] = end_time if max_assets_to_return is not None: script_params["Max Assets To Return"] = max_assets_to_return - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -957,7 +968,7 @@ async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., descri "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -972,7 +983,7 @@ async def google_chronicle_list_assets(case_id: Annotated[str, Field(..., descri return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], io_cs_assets: Annotated[str, Field(..., description="Specify a comma-separated list of IOCs or assets that you want to find in the alerts. Note: action will perform a different search for each item provided.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Alert Time Till Now\" is selected, action will use start time of the alert as start time for the search and end time will be current time. If \"30 Minutes Around Alert Time\" is selected, action will search the alerts 30 minutes before the alert happened till the 30 minutes after the alert has happened. Same idea applies to \"1 Hour Around Alert Time\" and \"5 Minutes Around Alert Time\".")], similarity_by: Annotated[List[Any], Field(default=None, description="Specify what attributes need to be used, when the action is to search for similar alerts. If \"Alert Name and Alert Type\" is selected, action will try to find all of the alerts that have the same alert name and IOCs/Assets for the underlying alert type. If \"Product\" is selected, then action will try to find all of the alerts that originate from the same product and have the same IOCs/Assets, action will search among both \"EXTERNAL\" and \"Rule\" alerts. If \"Only IOCs/Assets\" is enabled, action will match the similarity only based upon the items provided in the parameter \"IOCs/Assets\", action will search among both \"EXTERNAL\" and \"Rule\" alerts.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], io_cs_assets: Annotated[str, Field(..., description="Specify a comma-separated list of IOCs or assets that you want to find in the alerts. Note: action will perform a different search for each item provided.")], time_frame: Annotated[List[Any], Field(default=None, description="Specify a time frame for the results. If \"Alert Time Till Now\" is selected, action will use start time of the alert as start time for the search and end time will be current time. If \"30 Minutes Around Alert Time\" is selected, action will search the alerts 30 minutes before the alert happened till the 30 minutes after the alert has happened. Same idea applies to \"1 Hour Around Alert Time\" and \"5 Minutes Around Alert Time\".")], similarity_by: Annotated[List[Any], Field(default=None, description="Specify what attributes need to be used, when the action is to search for similar alerts. If \"Alert Name and Alert Type\" is selected, action will try to find all of the alerts that have the same alert name and IOCs/Assets for the underlying alert type. If \"Product\" is selected, then action will try to find all of the alerts that originate from the same product and have the same IOCs/Assets, action will search among both \"EXTERNAL\" and \"Rule\" alerts. If \"Only IOCs/Assets\" is enabled, action will match the similarity only based upon the items provided in the parameter \"IOCs/Assets\", action will search among both \"EXTERNAL\" and \"Rule\" alerts.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Lookup similar alerts in Google Chronicle. Supported Chronicle alert types: RULE, EXTERNAL, IOC. Note: this action can only work with alerts that come from the "Chronicle Alerts Connector". Note: action can only fetch 10000 alerts. Make sure to narrow down the timeframe for better results. Returns: @@ -981,8 +992,9 @@ async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(. final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -998,7 +1010,7 @@ async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(. final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -1008,19 +1020,19 @@ async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(. except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if time_frame is not None: script_params["Time Frame"] = time_frame script_params["IOCs / Assets"] = io_cs_assets if similarity_by is not None: script_params["Similarity By"] = similarity_by - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -1036,7 +1048,7 @@ async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(. "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -1051,7 +1063,7 @@ async def google_chronicle_lookup_similar_alerts(case_id: Annotated[str, Field(. return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], lowest_suspicious_severity: Annotated[List[Any], Field(..., description="Specify the lowest severity that should be associated with domain in order to mark it suspicious.")], mark_suspicious_n_a_severity: Annotated[bool, Field(..., description="If enabled, action will mark the entity as suspicious, if information about severity is not available.")], create_insight: Annotated[bool, Field(default=None, description="If enabled, action will create an insight containing information about the entities.")], only_suspicious_insight: Annotated[bool, Field(default=None, description="If enabled, action will only create an insight for entities that are marked as suspicious. Note: \"Create Insight\" parameter needs to be enabled.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], lowest_suspicious_severity: Annotated[List[Any], Field(..., description="Specify the lowest severity that should be associated with domain in order to mark it suspicious.")], mark_suspicious_n_a_severity: Annotated[bool, Field(..., description="If enabled, action will mark the entity as suspicious, if information about severity is not available.")], create_insight: Annotated[bool, Field(default=None, description="If enabled, action will create an insight containing information about the entities.")], only_suspicious_insight: Annotated[bool, Field(default=None, description="If enabled, action will only create an insight for entities that are marked as suspicious. Note: \"Create Insight\" parameter needs to be enabled.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Enrich domains using information from IOCs in Google Chronicle. Supported entities: Hostname, URL (action extracts domain part). Returns: @@ -1060,8 +1072,9 @@ async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., desc final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -1077,7 +1090,7 @@ async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., desc final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -1087,12 +1100,12 @@ async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., desc except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if create_insight is not None: script_params["Create Insight"] = create_insight @@ -1100,7 +1113,7 @@ async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., desc script_params["Only Suspicious Insight"] = only_suspicious_insight script_params["Lowest Suspicious Severity"] = lowest_suspicious_severity script_params["Mark Suspicious N/A Severity"] = mark_suspicious_n_a_severity - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -1116,7 +1129,7 @@ async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., desc "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION, @@ -1131,7 +1144,7 @@ async def google_chronicle_enrich_domain(case_id: Annotated[str, Field(..., desc return {"Status": "Failed", "Message": "No active instance found."} @mcp.tool() - async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], filter_key: Annotated[List[Any], Field(default=None, description="Specify the key that needs to be used to filter reference lists. Name option refers to a display name of the reference list.")], filter_logic: Annotated[List[Any], Field(default=None, description="Specify what filter logic should be applied.")], filter_value: Annotated[str, Field(default=None, description="Specify what value should be used in the filter. If \u201cEqual\u201c is selected, action will try to find the exact match among results and if \u201cContains\u201c is selected, action will try to find results that contain that substring. \u201cEqual\u201d works with \u201ctitle\u201d parameter, while \u201cContains\u201d works with all values in response. If nothing is provided in this parameter, the filter will not be applied.")], expanded_details: Annotated[bool, Field(default=None, description="If enabled, action will return detailed information about the reference lists.")], max_reference_lists_to_return: Annotated[str, Field(default=None, description="Specify how many reference lists to return. Default: 100.")], target_entities: Annotated[List[TargetEntity], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: + async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(..., description="The ID of the case.")], alert_group_identifiers: Annotated[List[str], Field(..., description="Identifiers for the alert groups.")], filter_key: Annotated[List[Any], Field(default=None, description="Specify the key that needs to be used to filter reference lists. Name option refers to a display name of the reference list.")], filter_logic: Annotated[List[Any], Field(default=None, description="Specify what filter logic should be applied.")], filter_value: Annotated[str, Field(default=None, description="Specify what value should be used in the filter. If \u201cEqual\u201c is selected, action will try to find the exact match among results and if \u201cContains\u201c is selected, action will try to find results that contain that substring. \u201cEqual\u201d works with \u201ctitle\u201d parameter, while \u201cContains\u201d works with all values in response. If nothing is provided in this parameter, the filter will not be applied.")], expanded_details: Annotated[bool, Field(default=None, description="If enabled, action will return detailed information about the reference lists.")], max_reference_lists_to_return: Annotated[str, Field(default=None, description="Specify how many reference lists to return. Default: 100.")], target_entities: Annotated[List[Dict[str, str]], Field(default_factory=list, description="Optional list of specific target entities (Identifier, EntityType) to run the action on.")], scope: Annotated[str, Field(default="All entities", description="Defines the scope for the action.")]) -> dict: """Get available reference lists in Google Chronicle. Returns: @@ -1140,8 +1153,9 @@ async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(... final_target_entities: Optional[List[TargetEntity]] = None final_scope: Optional[str] = None is_predefined_scope: Optional[bool] = None - + if target_entities: + target_entities = [TargetEntity(**entity) for entity in target_entities] # Specific target entities provided, ignore scope parameter final_target_entities = target_entities final_scope = None @@ -1157,7 +1171,7 @@ async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(... final_target_entities = [] # Pass empty list for entities when using scope final_scope = scope is_predefined_scope = True - + # Fetch integration instance identifier try: instance_response = await bindings.http_client.get( @@ -1167,12 +1181,12 @@ async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(... except Exception as e: print(f"Error fetching instance for GoogleChronicle: {e}") return {"Status": "Failed", "Message": f"Error fetching instance: {e}"} - + if instances: instance_identifier = instances[0].get("identifier") if not instance_identifier: return {"Status": "Failed", "Message": "Instance found but identifier is missing."} - + script_params = {} if filter_key is not None: script_params["Filter Key"] = filter_key @@ -1184,7 +1198,7 @@ async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(... script_params["Expanded Details"] = expanded_details if max_reference_lists_to_return is not None: script_params["Max Reference Lists To Return"] = max_reference_lists_to_return - + # Prepare data model for the API request action_data = ApiManualActionDataModel( alertGroupIdentifiers=alert_group_identifiers, @@ -1200,7 +1214,7 @@ async def google_chronicle_get_reference_lists(case_id: Annotated[str, Field(... "ScriptParametersEntityFields": json.dumps(script_params) } ) - + try: execution_response = await bindings.http_client.post( Endpoints.EXECUTE_MANUAL_ACTION,