forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtaxii21.py
373 lines (299 loc) · 10.4 KB
/
taxii21.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
"""
Import content from a TAXII 2.1 server.
"""
import collections
import itertools
import json
import misp_modules.lib.stix2misp
from pathlib import Path
import re
import stix2.v20
import taxii2client
import taxii2client.exceptions
import requests
class ConfigError(Exception):
"""
Represents an error in the config settings for one invocation of this
module.
"""
pass
misperrors = {'error': 'Error'}
moduleinfo = {'version': '0.1', 'author': 'Abc',
'description': 'Import content from a TAXII 2.1 server',
'module-type': ['import']}
mispattributes = {
'inputSource': [],
'output': ['MISP objects'],
'format': 'misp_standard',
}
userConfig = {
"url": {
"type": "String",
"message": "A TAXII 2.1 collection URL",
},
"added_after": {
"type": "String",
"message": "Lower bound on time the object was uploaded to the TAXII server"
},
"stix_id": {
"type": "String",
"message": "STIX ID(s) of objects"
},
"spec_version": { # TAXII 2.1 specific
"type": "String",
"message": "STIX version(s) of objects"
},
"type": {
"type": "String",
"message": "STIX type(s) of objects"
},
"version": {
"type": "String",
"message": 'Version timestamp(s), or "first"/"last"/"all"'
},
# Should we give some user control over this? It will not be allowed to
# exceed the admin setting.
"STIX object limit": {
"type": "Integer",
"message": "Maximum number of STIX objects to process"
},
"username": {
"type": "String",
"message": "Username for TAXII server authentication, if necessary"
},
"password": {
"type": "String",
"message": "Password for TAXII server authentication, if necessary"
}
}
# Paging will be handled transparently by this module, so user-defined
# paging-related filtering parameters will not be supported.
# This module will not process more than this number of STIX objects in total
# from a TAXII server in one module invocation (across all pages), to limit
# resource consumption.
moduleconfig = [
"stix_object_limit"
]
# In case there is neither an admin nor user setting given.
_DEFAULT_STIX_OBJECT_LIMIT = 1000
# Page size to use when paging TAXII results. Trades off the amount of
# hammering on TAXII servers and overhead of repeated requests, with the
# resource consumption of a single page. (Should be an admin setting too?)
_PAGE_SIZE = 100
_synonymsToTagNames_path = Path(__file__).parent / "../../lib/synonymsToTagNames.json"
# Collects module config information necessary to perform the TAXII query.
Config = collections.namedtuple("Config", [
"url",
"added_after",
"id",
"spec_version",
"type",
"version",
"stix_object_limit",
"username",
"password"
])
def _pymisp_to_json_serializable(obj):
"""
Work around a possible bug with PyMISP's
AbstractMisp.to_dict(json_format=True) method, which doesn't always produce
a JSON-serializable value (i.e. a value which is serializable with the
default JSON encoder).
:param obj: A PyMISP object
:return: A JSON-serializable version of the object
"""
# The workaround creates a JSON string and then parses it back to a
# JSON-serializable value.
json_ = obj.to_json()
json_serializable = json.loads(json_)
return json_serializable
def _normalize_multi_values(value):
"""
Some TAXII filters may contain multiple values separated by commas,
without spaces around the commas. Maybe give MISP users a little more
flexibility? This function normalizes a possible multi-valued value
(e.g. multiple values delimited by commas or spaces, all in the same
string) to TAXII-required format.
:param value: A MISP config value
:return: A normalized value
"""
if "," in value:
value = re.sub(r"\s*,\s*", ",", value)
else:
# Assume space delimiting; replace spaces with commas.
# I don't think we need to worry about spaces embedded in values.
value = re.sub(r"\s+", ",", value)
value = value.strip(",")
return value
def _get_config(config):
"""
Combine user, admin, and default config settings to produce a config
object with all settings together.
:param config: The misp-modules request's "config" value.
:return: A Config object
:raises ConfigError: if any config errors are detected
"""
# Strip whitespace from all config settings... except for password?
for key, val in config.items():
if isinstance(val, str) and key != "password":
config[key] = val.strip()
url = config.get("url")
added_after = config.get("added_after")
id_ = config.get("stix_id")
spec_version = config.get("spec_version")
type_ = config.get("type")
version_ = config.get("version")
username = config.get("username")
password = config.get("password")
admin_stix_object_limit = config.get("stix_object_limit")
user_stix_object_limit = config.get("STIX object limit")
if admin_stix_object_limit:
admin_stix_object_limit = int(admin_stix_object_limit)
else:
admin_stix_object_limit = _DEFAULT_STIX_OBJECT_LIMIT
if user_stix_object_limit:
user_stix_object_limit = int(user_stix_object_limit)
stix_object_limit = min(user_stix_object_limit, admin_stix_object_limit)
else:
stix_object_limit = admin_stix_object_limit
# How much of this should we sanity-check here before passing it off to the
# TAXII client (and thence, to the TAXII server)?
if not url:
raise ConfigError("A TAXII 2.1 collection URL is required.")
if admin_stix_object_limit < 1:
raise ConfigError(
"Invalid admin object limit: must be positive: "
+ str(admin_stix_object_limit)
)
if stix_object_limit < 1:
raise ConfigError(
"Invalid object limit: must be positive: "
+ str(stix_object_limit)
)
if id_:
id_ = _normalize_multi_values(id_)
if spec_version:
spec_version = _normalize_multi_values(spec_version)
if type_:
type_ = _normalize_multi_values(type_)
if version_:
version_ = _normalize_multi_values(version_)
# STIX->MISP converter currently only supports STIX 2.0, so let's force
# spec_version="2.0".
if not spec_version:
spec_version = "2.0"
elif spec_version != "2.0":
raise ConfigError('Only spec_version="2.0" is supported for now.')
if (username and not password) or (not username and password):
raise ConfigError(
'Both or neither of "username" and "password" are required.'
)
config_obj = Config(
url, added_after, id_, spec_version, type_, version_, stix_object_limit,
username, password
)
return config_obj
def _query_taxii(config):
"""
Query the TAXII server according to the given config, convert the STIX
results to MISP, and return a standard misp-modules response.
:param config: Module config information as a Config object
:return: A dict containing a misp-modules response
"""
collection = taxii2client.Collection(
config.url, user=config.username, password=config.password
)
# No point in asking for more than our overall limit.
page_size = min(_PAGE_SIZE, config.stix_object_limit)
kwargs = {
"per_request": page_size
}
if config.spec_version:
kwargs["spec_version"] = config.spec_version
if config.version:
kwargs["version"] = config.version
if config.id:
kwargs["id"] = config.id
if config.type:
kwargs["type"] = config.type
if config.added_after:
kwargs["added_after"] = config.added_after
pages = taxii2client.as_pages(
collection.get_objects,
**kwargs
)
# Chain all the objects from all pages together...
all_stix_objects = itertools.chain.from_iterable(
taxii_envelope.get("objects", [])
for taxii_envelope in pages
)
# And only take the first N objects from that.
limited_stix_objects = itertools.islice(
all_stix_objects, 0, config.stix_object_limit
)
# Collect into a list. This is... unfortunate, but I don't think the
# converter will work incrementally (will it?). It expects all objects to
# be given at once.
#
# It may also be desirable to have all objects available at once so that
# cross-references can be made where possible, but it results in increased
# memory usage.
stix_objects = list(limited_stix_objects)
# The STIX 2.0 converter wants a 2.0 bundle. (Hope the TAXII server isn't
# returning 2.1 objects!)
bundle20 = stix2.v20.Bundle(stix_objects, allow_custom=True)
converter = misp_modules.lib.stix2misp.ExternalStixParser()
converter.handler(
bundle20, None, [0, "event", str(_synonymsToTagNames_path)]
)
attributes = [
_pymisp_to_json_serializable(attr)
for attr in converter.misp_event.attributes
]
objects = [
_pymisp_to_json_serializable(obj)
for obj in converter.misp_event.objects
]
tags = [
_pymisp_to_json_serializable(tag)
for tag in converter.misp_event.tags
]
result = {
"results": {
"Attribute": attributes,
"Object": objects,
"Tag": tags
}
}
return result
def handler(q=False):
if q is False:
return False
request = json.loads(q)
result = None
config = None
try:
config = _get_config(request["config"])
except ConfigError as e:
result = misperrors
result["error"] = e.args[0]
if not result:
try:
result = _query_taxii(config)
except taxii2client.exceptions.TAXIIServiceException as e:
result = misperrors
result["error"] = str(e)
except requests.HTTPError as e:
# Let's give a better error message for auth issues.
if e.response.status_code in (401, 403):
result = misperrors
result["error"] = "Access was denied."
else:
raise
return result
def introspection():
mispattributes["userConfig"] = userConfig
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo