Skip to content

Commit c6a468e

Browse files
ambuj-1211Rishi-source
authored andcommitted
Add redundant code to utils
Signed-off-by: ambuj <[email protected]>
1 parent 38ff1df commit c6a468e

File tree

5 files changed

+40
-48
lines changed

5 files changed

+40
-48
lines changed

vulnerabilities/importers/apache_httpd.py

+8-20
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import requests
1515
from bs4 import BeautifulSoup
16-
from cwe2.database import Database
1716
from packageurl import PackageURL
1817
from univers.version_constraint import VersionConstraint
1918
from univers.version_range import ApacheVersionRange
@@ -25,7 +24,8 @@
2524
from vulnerabilities.importer import Reference
2625
from vulnerabilities.importer import VulnerabilitySeverity
2726
from vulnerabilities.severity_systems import APACHE_HTTPD
28-
from vulnerabilities.utils import get_cwe_id
27+
from vulnerabilities.utils import create_weaknesses_list
28+
from vulnerabilities.utils import cwe_regex
2929
from vulnerabilities.utils import get_item
3030

3131
logger = logging.getLogger(__name__)
@@ -234,33 +234,21 @@ def get_weaknesses(cve_data):
234234
>>> get_weaknesses(mock_cve_data2)
235235
[190, 200]
236236
"""
237-
238237
alias = get_item(cve_data, "CVE_data_meta", "ID")
239-
cwe_id = []
240-
db = Database()
238+
cwe_strings = []
241239
if alias:
242240
problemtype_data = get_item(cve_data, "problemtype", "problemtype_data") or []
243241
for problem in problemtype_data:
244-
for desc in problem["description"]:
242+
for desc in problem.get("description", []):
245243
value = desc.get("value", "")
246-
cwe_pattern = r"CWE-\d+"
247-
cwe_id_string_list = re.findall(cwe_pattern, value)
248-
for cwe_id_string in cwe_id_string_list:
249-
cwe_id.append(get_cwe_id(cwe_id_string))
250-
244+
cwe_id_string_list = re.findall(cwe_regex, value)
245+
cwe_strings.extend(cwe_id_string_list)
251246
else:
252247
problemTypes = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
253248
descriptions = problemTypes[0].get("descriptions", []) if len(problemTypes) > 0 else []
254249
for description in descriptions:
255250
cwe_id_string = description.get("cweId", "")
256-
cwe_id.append(get_cwe_id(cwe_id_string))
257-
258-
weaknesses = []
259-
for cwe in cwe_id:
260-
try:
261-
db.get(cwe)
262-
weaknesses.append(cwe)
263-
except Exception:
264-
logger.error("Invalid CWE id")
251+
cwe_strings.append(cwe_id_string)
265252

253+
weaknesses = create_weaknesses_list(cwe_strings)
266254
return weaknesses

vulnerabilities/importers/debian.py

+2-11
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
from vulnerabilities.importer import AffectedPackage
2525
from vulnerabilities.importer import Importer
2626
from vulnerabilities.importer import Reference
27+
from vulnerabilities.utils import create_weaknesses_list
2728
from vulnerabilities.utils import dedupe
28-
from vulnerabilities.utils import get_cwe_id
2929
from vulnerabilities.utils import get_item
3030

3131
logger = logging.getLogger(__name__)
@@ -178,14 +178,5 @@ def get_cwe_from_debian_advisory(record):
178178
description = record.get("description") or ""
179179
pattern = r"CWE-\d+"
180180
cwe_strings = re.findall(pattern, description)
181-
weaknesses = []
182-
db = Database()
183-
for cwe_string in cwe_strings:
184-
if cwe_string:
185-
cwe_id = get_cwe_id(cwe_string)
186-
try:
187-
db.get(cwe_id)
188-
weaknesses.append(cwe_id)
189-
except Exception:
190-
logger.error("Invalid CWE id")
181+
weaknesses = create_weaknesses_list(cwe_strings)
191182
return weaknesses

vulnerabilities/importers/fireeye.py

+4-16
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@
1212
from typing import Iterable
1313
from typing import List
1414

15-
from cwe2.database import Database
16-
1715
from vulnerabilities.importer import AdvisoryData
1816
from vulnerabilities.importer import Importer
1917
from vulnerabilities.importer import Reference
2018
from vulnerabilities.utils import build_description
19+
from vulnerabilities.utils import create_weaknesses_list
20+
from vulnerabilities.utils import cwe_regex
2121
from vulnerabilities.utils import dedupe
22-
from vulnerabilities.utils import get_cwe_id
2322

2423
logger = logging.getLogger(__name__)
2524

@@ -160,19 +159,8 @@ def get_weaknesses(cwe_data):
160159
"""
161160
cwe_list = []
162161
for line in cwe_data:
163-
cwe_ids = re.findall(r"CWE-\d+", line)
162+
cwe_ids = re.findall(cwe_regex, line)
164163
cwe_list.extend(cwe_ids)
165164

166-
weaknesses = []
167-
db = Database()
168-
169-
for cwe_string in cwe_list:
170-
171-
if cwe_string:
172-
cwe_id = get_cwe_id(cwe_string)
173-
try:
174-
db.get(cwe_id)
175-
weaknesses.append(cwe_id)
176-
except Exception:
177-
logger.error("Invalid CWE id")
165+
weaknesses = create_weaknesses_list(cwe_list)
178166
return weaknesses

vulnerabilities/tests/test_fireeye.py

-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ def test_get_weaknesses(self):
226226
"CWE-362: Concurrent Execution using Shared Resource with Improper Synchronization ('Race Condition')",
227227
]
228228
) == [379, 362]
229-
230229
assert (
231230
get_weaknesses(
232231
[

vulnerabilities/utils.py

+26
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import saneyaml
3030
import toml
3131
import urllib3
32+
from cwe2.database import Database
33+
from cwe2.database import InvalidCWEError
3234
from packageurl import PackageURL
3335
from packageurl.contrib.django.utils import without_empty_values
3436
from univers.version_range import RANGE_CLASS_BY_SCHEMES
@@ -42,6 +44,7 @@
4244
cve_regex = re.compile(r"CVE-[0-9]{4}-[0-9]{4,19}", re.IGNORECASE)
4345
is_cve = cve_regex.match
4446
find_all_cve = cve_regex.findall
47+
cwe_regex = r"CWE-\d+"
4548

4649

4750
@dataclasses.dataclass(order=True, frozen=True)
@@ -399,6 +402,29 @@ def get_cwe_id(cwe_string: str) -> int:
399402
return int(cwe_id)
400403

401404

405+
def create_weaknesses_list(cwe_strings: str):
406+
"""
407+
Convert the CWE string to CWE ids and store them to weaknesses list.
408+
>>> create_weaknesses_list(["CWE-125","CWE-379"])
409+
[125, 379]
410+
"""
411+
weaknesses = []
412+
db = Database()
413+
for cwe_string in cwe_strings:
414+
if not cwe_string:
415+
continue
416+
cwe_id = get_cwe_id(cwe_string)
417+
if not cwe_id:
418+
logger.error("Invalid CWE id: No CWE ID found")
419+
continue
420+
try:
421+
db.get(cwe_id)
422+
weaknesses.append(cwe_id)
423+
except InvalidCWEError as e:
424+
logger.error(f"Error: {e}")
425+
return weaknesses
426+
427+
402428
def clean_nginx_git_tag(tag):
403429
"""
404430
Return a cleaned ``version`` string from an nginx git tag.

0 commit comments

Comments
 (0)