Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:MISP/misp-stix
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Feb 5, 2024
2 parents daaee3a + 01fa04a commit e4dbf9d
Showing 1 changed file with 45 additions and 26 deletions.
71 changes: 45 additions & 26 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@
from pymisp import (
AbstractMISP, MISPEvent, MISPAttribute, MISPGalaxy, MISPGalaxyCluster,
MISPObject, MISPSighting)
from stix2 import TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE
from stix2.parsing import parse as stix2_parser
from stix2.v20.bundle import Bundle as Bundle_v20
from stix2.v20.common import MarkingDefinition as MarkingDefinition_v20
from stix2.v20.observables import NetworkTraffic as NetworkTraffic_v20
from stix2.v20.sdo import (
AttackPattern as AttackPattern_v20, Campaign as Campaign_v20,
CourseOfAction as CourseOfAction_v20, Identity as Identity_v20,
Indicator as Indicator_v20, IntrusionSet as IntrusionSet_v20,
Malware as Malware_v20, ObservedData as ObservedData_v20,
Report as Report_v20, ThreatActor as ThreatActor_v20, Tool as Tool_v20,
CourseOfAction as CourseOfAction_v20, CustomObject as CustomObject_v20,
Identity as Identity_v20, Indicator as Indicator_v20,
IntrusionSet as IntrusionSet_v20, Malware as Malware_v20,
ObservedData as ObservedData_v20, Report as Report_v20,
ThreatActor as ThreatActor_v20, Tool as Tool_v20,
Vulnerability as Vulnerability_v20)
from stix2.v20.sro import (
Relationship as Relationship_v20, Sighting as Sighting_v20)
Expand All @@ -53,13 +55,14 @@
EmailMessage, File, IPv4Address, IPv6Address, MACAddress, Mutex,
NetworkTraffic as NetworkTraffic_v21, Process, Software, URL, UserAccount,
WindowsRegistryKey, X509Certificate)
from stix2.v21.sdo import Grouping, MalwareAnalysis, Note, Opinion
from stix2.v21.sdo import Grouping, Location, MalwareAnalysis, Note, Opinion
from stix2.v21.sdo import (
AttackPattern as AttackPattern_v21, Campaign as Campaign_v21,
CourseOfAction as CourseOfAction_v21, Identity as Identity_v21,
Indicator as Indicator_v21, IntrusionSet as IntrusionSet_v21, Location,
Malware as Malware_v21, ObservedData as ObservedData_v21,
Report as Report_v21, ThreatActor as ThreatActor_v21, Tool as Tool_v21,
CourseOfAction as CourseOfAction_v21, CustomObject as CustomObject_v21,
Identity as Identity_v21, Indicator as Indicator_v21,
IntrusionSet as IntrusionSet_v21, Malware as Malware_v21,
ObservedData as ObservedData_v21, Report as Report_v21,
ThreatActor as ThreatActor_v21, Tool as Tool_v21,
Vulnerability as Vulnerability_v21)
from stix2.v21.sro import (
Relationship as Relationship_v21, Sighting as Sighting_v21)
Expand Down Expand Up @@ -171,8 +174,12 @@
Report_v20, Report_v21
]
_SDO_TYPING = Union[
Campaign_v20, Campaign_v21,
CustomObject_v20, CustomObject_v21,
Grouping,
Indicator_v20, Indicator_v21,
ObservedData_v20, ObservedData_v21,
Report_v20, Report_v21,
Vulnerability_v20, Vulnerability_v21
]
_SIGHTING_TYPING = Union[
Expand Down Expand Up @@ -587,14 +594,8 @@ def _handle_object(self, object_type: str, object_ref: str):

def _handle_misp_event_tags(
self, misp_event: MISPEvent, stix_object: _GROUPING_REPORT_TYPING):
if hasattr(stix_object, 'object_marking_refs'):
for marking_ref in stix_object.object_marking_refs:
try:
misp_event.add_tag(self._marking_definition[marking_ref])
except KeyError:
self._unknown_marking_ref_warning(marking_ref)
except AttributeError:
self._unknown_marking_object_warning(marking_ref)
for tag in self._handle_tags_from_stix_fields(stix_object):
misp_event.add_tag(tag)
if hasattr(stix_object, 'labels'):
self._fetch_tags_from_labels(misp_event, stix_object.labels)

Expand Down Expand Up @@ -1035,19 +1036,15 @@ def _add_misp_attribute(self, attribute: dict,
stix_object: _SDO_TYPING) -> MISPAttribute:
misp_attribute = MISPAttribute()
misp_attribute.from_dict(**attribute)
tags = tuple(self._handle_tags_from_stix_fields(stix_object))
if tags:
for tag in tags:
misp_attribute.add_tag(tag)
for tag in self._handle_tags_from_stix_fields(stix_object):
misp_attribute.add_tag(tag)
return self.misp_event.add_attribute(**misp_attribute)

def _add_misp_object(self, misp_object: MISPObject,
stix_object: _SDO_TYPING) -> MISPObject:
tags = tuple(self._handle_tags_from_stix_fields(stix_object))
if tags:
for tag in self._handle_tags_from_stix_fields(stix_object):
for attribute in misp_object.attributes:
for tag in tags:
attribute.add_tag(tag)
attribute.add_tag(tag)
return self.misp_event.add_object(misp_object)

def _create_attribute_dict(self, stix_object: _SDO_TYPING) -> dict:
Expand Down Expand Up @@ -1117,7 +1114,22 @@ def _handle_tags_from_stix_fields(self, stix_object: _SDO_TYPING):
if hasattr(stix_object, 'confidence'):
yield self._parse_confidence_level(stix_object.confidence)
if hasattr(stix_object, 'object_marking_refs'):
yield from self._parse_markings(stix_object.object_marking_refs)
for marking_ref in stix_object.object_marking_refs:
try:
marking_definition = self._get_stix_object(marking_ref)
except ObjectTypeLoadingError as error:
if self._is_tlp_marking(marking_ref):
yield self._get_stix_object(marking_ref)
else:
self._object_type_loading_error(error)
continue
except ObjectRefLoadingError as error:
if self._is_tlp_marking(marking_ref):
yield self._get_stix_object(marking_ref)
else:
self._object_ref_loading_error(error)
continue
yield marking_definition

############################################################################
# UTILITY METHODS. #
Expand All @@ -1134,6 +1146,13 @@ def _fetch_tags_from_labels(
if label.lower() != 'threat-report'):
misp_feature.add_tag(label)

def _is_tlp_marking(self, marking_ref: str) -> bool:
for marking in (TLP_WHITE, TLP_GREEN, TLP_AMBER, TLP_RED):
if marking_ref == marking.id:
self._load_marking_definition(marking)
return True
return False

@staticmethod
def _parse_AS_value(number: Union[int, str]) -> str:
if isinstance(number, int) or not number.startswith('AS'):
Expand Down

0 comments on commit e4dbf9d

Please sign in to comment.