Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,6 @@
from soar_sdk.SiemplifyLogger import SiemplifyLogger


PUSH_FAILURE_INDICATORS = (
"pre-receive hook declined",
"not allowed to push",
"push rejected",
"failed to push",
"error: failed to push",
"! [rejected]",
"! [remote rejected]",
)

class Git:
"""GitManager"""

Expand Down Expand Up @@ -232,26 +222,13 @@ def push(self, force_push=False) -> None:
equivalent to 'git push --force'. Defaults to False.

"""
error_content = ""
try:
error_buffer = StringIO()
tee_stream = TeeStream(sys.stderr, error_buffer)

try:
porcelain.push(
self.repo,
refspecs=[self.local_branch_ref],
force=force_push,
errstream=tee_stream,
**self.connection_args,
)
finally:
tee_stream.flush()
error_content = error_buffer.getvalue().strip()
tee_stream.close()

self._raise_on_push_errors(error_content)

porcelain.push(
self.repo,
refspecs=[self.local_branch_ref],
force=force_push,
**self.connection_args,
)
except porcelain.DivergedBranches:
self.logger.error("Could not push updates to remote repository!")
self.logger.warn(
Expand All @@ -264,15 +241,6 @@ def push(self, force_push=False) -> None:
"Updates will be pushed in the next python script execution",
)

def _raise_on_push_errors(self, error_content: str |None) -> None:
"""Check for push failure indicators and raise exception."""
if not error_content:
return

if any(indicator in error_content for indicator in PUSH_FAILURE_INDICATORS):
self.logger.error(f"Push operation failed: {error_content}")
raise GitSyncException(f"Push operation failed: {error_content}")

def _checkout(self) -> None:
"""Checkout a branch

Expand Down Expand Up @@ -805,4 +773,4 @@ def _verify_and_decide(self, client, hostname, key) -> None:
else:
self.vendor.siemplify_logger.error("Fingerprint verification failed.")
raise paramiko.ssh_exception.SSHException(
f"Host key verification failed for {hostname}")
f"Host key verification failed for {hostname}")
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uuid
from typing import TYPE_CHECKING, Any
from TIPCommon.types import SingleJson
from TIPCommon.rest.soar_api import install_integration

from jinja2 import Template

Expand Down Expand Up @@ -306,7 +307,7 @@ def install_connector(self, connector: Connector) -> None:
"Please upgrade the connector.",
)
connector.raw_data["isUpdateAvailable"] = True
if connector.environment not in self.api.get_environment_names():
if connector.environment not in self.api.get_environment_names(self._siemplify):
self.logger.warn(
f"Connector is set to non-existing environment {connector.environment}. "
f"Using Default Environment instead",
Expand Down Expand Up @@ -345,16 +346,16 @@ def install_workflows(self, workflows: list[Workflow]) -> None:
"""
# Validate all playbook environments exist as environments or environment groups
environments = (
self.api.get_environment_names()
self.api.get_environment_names(self._siemplify)
+ self.api.get_environment_group_names()
+ [ALL_ENVIRONMENTS_IDENTIFIER]
)
for p in workflows:
invalid_environments = [x for x in p.environments if x not in environments]
if invalid_environments:
raise Exception(
f"Playbook '{p.name}' is assigned to environment(s) that don't exist: "
f"{', '.join(invalid_environments)}. "
f"Playbook '{p.name}' is assigned to environment(s) that don't "
f"exist: {', '.join(invalid_environments)}. "
f"Available environments: {', '.join(environments)}"
)

Expand Down Expand Up @@ -415,7 +416,14 @@ def install_job(self, job: Job) -> None:
if job_def_id:
job.raw_data["jobDefinitionId"] = job_def_id.get("id")

job_id = next((x for x in self.api.get_jobs() if x["name"] == job.name), None)
job_id = next(
(
x
for x in self.api.get_jobs(chronicle_soar=self._siemplify)
if x.get("displayName", x.get("name")) == job.name
),
None,
)
if job_id:
job.raw_data["id"] = job_id.get("id")
self.api.add_job(job.raw_data)
Expand Down Expand Up @@ -546,10 +554,12 @@ def install_marketplace_integration(self, integration_name: str) -> bool:
)
return False
try:
self.api.install_integration(
integration_name,
store_integration["version"],
store_integration["isCertified"],
install_integration(
chronicle_soar=self._siemplify,
integration_identifier=integration_name,
integration_name="",
version=store_integration["version"],
is_certified=store_integration["isCertified"],
)
Comment on lines +557 to 563
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The install_integration function is called with integration_name="". This seems suspicious. While integration_identifier is provided, passing an empty string for integration_name might have unintended consequences or could be a bug. Please verify if this is the intended usage of the new API.

self.logger.info(f"{integration_name} installed successfully")
return True
Expand Down Expand Up @@ -583,7 +593,7 @@ class WorkflowInstaller:

def __init__(
self,
chronicle_soar: ChronicleSOAR,
chronicle_soar: ChronicleSOAR, # type: ignore
api: SiemplifyApiClient,
logger: SiemplifyLogger,
mod_time_cache: Cache[str, int],
Expand Down Expand Up @@ -789,7 +799,8 @@ def _installed_playbooks(self) -> dict[str, dict[str, Any]]:
"""Currently installed playbooks and blocks"""
if "playbooks" not in self._cache:
self._cache["playbooks"] = {
x.get("name"): x for x in self.api.get_playbooks()
x.get("name"): x
for x in self.api.get_playbooks(chronicle_soar=self.chronicle_soar)
}
return self._cache.get("playbooks")

Expand Down Expand Up @@ -945,16 +956,18 @@ def _find_integration_instances_for_step(
"""
cache_key = f"integration_instances_{environment}"
if cache_key not in self._cache:
self._cache[cache_key] = self.api.get_integrations_instances(environment)
self._cache[cache_key] = self.api.get_integrations_instances(
chronicle_soar=self.chronicle_soar,
environment=environment,
)

instances = self._cache.get(cache_key)
instances.sort(key=lambda x: x.get("instanceName"))

return [
x
for x in instances
if x.get("integrationIdentifier") == integration_name
and x.get("isConfigured")
if x.integration_identifier == integration_name and x.is_configured
]

@staticmethod
Expand Down
Loading
Loading