forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcluster25_expand.py
executable file
·230 lines (202 loc) · 11.1 KB
/
cluster25_expand.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import json
import requests
import uuid
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
moduleinfo = {'version': '0.1',
'author': 'Milo Volpicelli',
'description': 'Module to query Cluster25CTI',
'module-type': ['expansion', 'hover']}
moduleconfig = ['api_id', 'apikey', 'base_url']
misperrors = {'error': 'Error'}
misp_type_in = ['domain', 'email-src', 'email-dst', 'filename', 'md5', 'sha1', 'sha256', 'ip-src', 'ip-dst', 'url',
'vulnerability', 'btc', 'xmr', 'ja3-fingerprint-md5']
mapping_out = { # mapping between the MISP attributes type and the compatible Cluster25 indicator types.
'domain': {'type': 'domain', 'to_ids': True},
'email-src': {'type': 'email-src', 'to_ids': True},
'email-dst': {'type': 'email-dst', 'to_ids': True},
'filename': {'type': 'filename', 'to_ids': True},
'md5': {'type': 'md5', 'to_ids': True},
'sha1': {'type': 'sha1', 'to_ids': True},
'sha256': {'type': 'sha256', 'to_ids': True},
'ip-src': {'type': 'ip-src', 'to_ids': True},
'ip-dst': {'type': 'ip-dst', 'to_ids': True},
'url': {'type': 'url', 'to_ids': True},
'cve': {'type': 'vulnerability', 'to_ids': True},
'btcaddress': {'type': 'btc', 'to_ids': True},
'xmraddress': {'type': 'xmr', 'to_ids': True},
'ja3': {'type': 'ja3-fingerprint-md5', 'to_ids': True},
}
misp_type_out = [item['type'] for item in mapping_out.values()]
misp_attributes = {'input': misp_type_in, 'format': 'misp_standard'}
def handler(q=False):
if q is False:
return False
request = json.loads(q)
# validate Cluster25 params
if request.get('config'):
if request['config'].get('apikey') is None:
misperrors['error'] = 'Cluster25 apikey is missing'
return misperrors
if request['config'].get('api_id') is None:
misperrors['error'] = 'Cluster25 api_id is missing'
return misperrors
if request['config'].get('base_url') is None:
misperrors['error'] = 'Cluster25 base_url is missing'
return misperrors
# validate attribute
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request.get('attribute')
if not any(input_type == attribute.get('type') for input_type in misp_type_in):
return {'error': 'Unsupported attribute type.'}
client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url'])
return lookup_indicator(client, request.get('attribute'))
def format_content(content):
if isinstance(content, str) or isinstance(content, bool) or isinstance(content, int) or isinstance(content, float):
return content
ret = ""
tmp_ret = []
if content is None:
return ret
is_dict = isinstance(content, dict)
is_list = isinstance(content, list)
for index, key in enumerate(content):
if is_dict:
if isinstance(content[key], dict):
ret = format_content(content[key])
elif isinstance(content[key], list):
for list_item in content[key]:
tmp_ret.append(format_content(list_item))
else:
tmp_ret.append(f"{key}: {content[key]}")
elif is_list:
if isinstance(content[index], str):
ret = ", ".join(content)
else:
ret = format_content(content)
if tmp_ret:
ret = " ".join(tmp_ret)
return ret
def lookup_indicator(client, attr):
result = client.investigate(attr)
if result.get('error'):
return result
misp_event = MISPEvent()
attribute = MISPAttribute()
attribute.from_dict(**attr)
misp_event.add_attribute(**attribute)
misp_object_g = MISPObject('c25_generic_info')
misp_object_g.template_uuid = uuid.uuid4()
misp_object_g.description = 'c25_generic_info'
setattr(misp_object_g, 'meta-category', 'network')
misp_objects = []
for ind, entry in enumerate(result):
if isinstance(result[entry], dict):
tmp_obj = MISPObject(f"c25_{entry}")
tmp_obj.template_uuid = uuid.uuid4()
tmp_obj.description = f"c25_{entry}"
setattr(tmp_obj, 'meta-category', 'network')
tmp_obj.add_reference(attribute['uuid'], 'related-to')
for key in result[entry]:
if isinstance(result[entry][key], dict):
for index, item in enumerate(result[entry][key]):
if result[entry][key][item]:
tmp_obj.add_attribute(f"{entry}_{key}_{item}", **{'type': 'text', 'value': format_content(
result[entry][key][item])})
elif isinstance(result[entry][key], list):
for index, item in enumerate(result[entry][key]):
if isinstance(item, dict):
tmp_obj_2 = MISPObject(f"c25_{entry}_{key}_{index + 1}")
tmp_obj_2.template_uuid = uuid.uuid4()
tmp_obj_2.description = f"c25_{entry}_{key}"
setattr(tmp_obj_2, 'meta-category', 'network')
tmp_obj_2.add_reference(attribute['uuid'], 'related-to')
for sub_key in item:
if isinstance(item[sub_key], list):
for sub_item in item[sub_key]:
if isinstance(sub_item, dict):
tmp_obj_3 = MISPObject(f"c25_{entry}_{sub_key}_{index + 1}")
tmp_obj_3.template_uuid = uuid.uuid4()
tmp_obj_3.description = f"c25_{entry}_{sub_key}"
setattr(tmp_obj_3, 'meta-category', 'network')
tmp_obj_3.add_reference(attribute['uuid'], 'related-to')
for sub_sub_key in sub_item:
if isinstance(sub_item[sub_sub_key], list):
for idx, sub_sub_item in enumerate(sub_item[sub_sub_key]):
if sub_sub_item.get("name"):
sub_sub_item = sub_sub_item.get("name")
tmp_obj_3.add_attribute(f"{sub_sub_key}_{idx + 1}",
**{'type': 'text',
'value': format_content(
sub_sub_item)})
else:
tmp_obj_3.add_attribute(sub_sub_key,
**{'type': 'text',
'value': format_content(
sub_item[sub_sub_key])})
misp_objects.append(tmp_obj_3)
else:
tmp_obj_2.add_attribute(sub_key, **{'type': 'text',
'value': format_content(sub_item)})
elif item[sub_key]:
tmp_obj_2.add_attribute(sub_key,
**{'type': 'text', 'value': format_content(item[sub_key])})
misp_objects.append(tmp_obj_2)
elif item is not None:
tmp_obj.add_attribute(f"{entry}_{key}", **{'type': 'text', 'value': format_content(item)})
elif result[entry][key] is not None:
tmp_obj.add_attribute(key, **{'type': 'text', 'value': result[entry][key]})
if tmp_obj.attributes:
misp_objects.append(tmp_obj)
elif isinstance(result[entry], list):
for index, key in enumerate(result[entry]):
if isinstance(key, dict):
tmp_obj = MISPObject(f"c25_{entry}_{index + 1}")
tmp_obj.template_uuid = uuid.uuid4()
tmp_obj.description = f"c25_{entry}_{index + 1}"
setattr(tmp_obj, 'meta-category', 'network')
tmp_obj.add_reference(attribute['uuid'], 'related-to')
for item in key:
if key[item]:
tmp_obj.add_attribute(item, **{'type': 'text', 'value': format_content(key[item])})
tmp_obj.add_reference(attribute['uuid'], 'related-to')
misp_objects.append(tmp_obj)
elif key is not None:
misp_object_g.add_attribute(f"{entry}_{index + 1}",
**{'type': 'text', 'value': format_content(key)})
else:
if result[entry]:
misp_object_g.add_attribute(entry, **{'type': 'text', 'value': result[entry]})
misp_object_g.add_reference(attribute['uuid'], 'related-to')
misp_event.add_object(misp_object_g)
for misp_object in misp_objects:
misp_event.add_object(misp_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object')}
return {'results': results}
def introspection():
return misp_attributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
class Cluster25CTI:
def __init__(self, customer_id=None, customer_key=None, base_url=None):
self.client_id = customer_id
self.client_secret = customer_key
self.base_url = base_url
self.current_token = self._get_cluster25_token()
self.headers = {"Authorization": f"Bearer {self.current_token}"}
def _get_cluster25_token(self):
payload = {"client_id": self.client_id, "client_secret": self.client_secret}
r = requests.post(url=f"{self.base_url}/token", json=payload, headers={"Content-Type": "application/json"})
if r.status_code != 200:
return {'error': f"Unable to retrieve the token from C25 platform, status {r.status_code}"}
return r.json()["data"]["token"]
def investigate(self, indicator) -> dict:
params = {'indicator': indicator.get('value')}
r = requests.get(url=f"{self.base_url}/investigate", params=params, headers=self.headers)
if r.status_code != 200:
return {'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' "
f"from C25 platform, status {r.status_code}"}
return r.json()["data"]