Skip to content

Add MISP Advisories #1807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from vulnerabilities.pipelines import alpine_linux_importer
from vulnerabilities.pipelines import github_importer
from vulnerabilities.pipelines import gitlab_importer
from vulnerabilities.pipelines import misp_importer
from vulnerabilities.pipelines import nginx_importer
from vulnerabilities.pipelines import npm_importer
from vulnerabilities.pipelines import nvd_importer
Expand Down Expand Up @@ -78,6 +79,7 @@
nvd_importer.NVDImporterPipeline,
pysec_importer.PyPIImporterPipeline,
alpine_linux_importer.AlpineLinuxImporterPipeline,
misp_importer.MISPImporterPipeline,
]

IMPORTERS_REGISTRY = {
Expand Down
196 changes: 196 additions & 0 deletions vulnerabilities/pipelines/misp_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import logging
import re
from datetime import timezone
from typing import Iterable

import requests
from bs4 import BeautifulSoup
from dateutil import parser as dateparser
from packageurl import PackageURL
from univers.version_range import GenericVersionRange
from univers.version_range import VersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
from vulnerabilities.severity_systems import CVSSV3
from vulnerabilities.severity_systems import CVSSV31
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import get_item

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MISPImporterPipeline(VulnerableCodeBaseImporterPipeline):
"""Collect Advisories from MISP"""

pipeline_id = "misp_importer"
spdx_license_expression = "CC BY-SA 3.0"
license_url = "https://www.misp-project.org/license/"
root_url = "https://www.misp-project.org/security/"
importer_name = "MISP Importer"

def __init__(self):
super().__init__()

@classmethod
def steps(cls):
return (
cls.collect_and_store_advisories,
cls.import_new_advisories,
)

# num of advisories
def advisories_count(self) -> int:
return len(fetch_advisory_links(self.root_url))

# parse the response data
def collect_advisories(self) -> Iterable[AdvisoryData]:
advisory_links = fetch_advisory_links(self.root_url)

for link in advisory_links:
advisory_data = fetch_advisory_data(link)
yield to_advisory_data(advisory_data)


def fetch_advisory_links(url):
"""Fetches the advisory links listed on the URL,returns a list"""
r = fetch_response(url).content
soup = BeautifulSoup(r, "html.parser")

h2 = soup.find(id="advisories")
# Find the <ul> element
ul_element = h2.find_next_sibling()
# Extract all <li> elements
li_elements = ul_element.find_all("li")

advisory_links = []
# Extract and print the text content of each <li>
for li in li_elements:
link_text = li.find("a").text.lower()

# Extract the rest of the text
description = li.text.replace(link_text, "").strip()
advisory_links.append(f"https://cve.circl.lu/vuln/fkie_{link_text}")

return advisory_links


def fetch_advisory_data(url):
"""Fetches advisory data,returns a dict"""
r = fetch_response(url).content
soup = BeautifulSoup(r, "html.parser")

# Find the <pre> element containing the JSON data
pre_element = soup.find("pre", {"class": "json-container"})

# Extract the text content
json_text = pre_element.text

# Parse the cleaned text as JSON
json_data = json.loads(json_text)

# data
description = json_data["descriptions"][0]["value"]
cve_id = json_data["id"]
date_published = json_data["published"]
references = json_data["references"][0]["url"]

# metrics
metrics = json_data["metrics"]
metrics_keys = list(metrics.keys())
if "cvssMetricV31" in metrics_keys:
cve_score = {
"version": "cvssMetricV31",
"score": metrics["cvssMetricV31"][0]["cvssData"]["baseScore"],
}
else:
cve_score = {
"version": "cvssMetricV30",
"score": metrics["cvssMetricV30"][0]["cvssData"]["baseScore"],
}

# affected version
match = re.search(r"\b\d+\.\d+\.\d+\b", description)
affected_version = match.group(0)

return {
"description": description,
"alias": cve_id,
"date_published": date_published,
"references": references,
"cve_score": cve_score,
"affected_version": affected_version,
"url": url,
}


def to_advisory_data(raw_data) -> AdvisoryData:
"""Parses extracted data to Advisory Data"""
# alias
alias = get_item(raw_data, "alias")

# affected packages
affected_packages = []
affected_version = get_item(raw_data, "affected_version") # list of list of affected versions
affected_packages.append(
AffectedPackage(
package=PackageURL(type="misp", name="MISP"),
affected_version_range=VersionRange.from_string(f"vers:generic/={affected_version}"),
)
)

# score
if raw_data["cve_score"]["version"] == "cvssMetricV31":
severity = VulnerabilitySeverity(
system=CVSSV31,
value=raw_data["cve_score"]["score"],
scoring_elements="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
)
else:
severity = VulnerabilitySeverity(
system=CVSSV3,
value=raw_data["cve_score"]["score"],
scoring_elements="CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
)

# Reference
references = []
references.append(
Reference(
severities=[severity],
reference_id=alias,
url=get_item(raw_data, "references"),
)
)

# description
description = get_item(raw_data, "description")

# date published
date_published = get_item(raw_data, "date_published")
date_published = dateparser.parse(date_published, yearfirst=True).replace(tzinfo=timezone.utc)

# url
url = get_item(raw_data, "url")

return AdvisoryData(
aliases=alias,
summary=description,
affected_packages=affected_packages,
references=references,
url=url,
date_published=date_published,
)
174 changes: 174 additions & 0 deletions vulnerabilities/tests/pipelines/test_misp_importer_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import datetime
import json
from pathlib import Path
from unittest import mock
from unittest.mock import patch

from packageurl import PackageURL
from univers.version_constraint import VersionConstraint
from univers.version_range import GenericVersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import misp_importer
from vulnerabilities.severity_systems import Cvssv3ScoringSystem

EXPECTED_DATA = Path(__file__).parent.parent / "test_data" / "misp" / "misp_expected.json"
HTML_DATA = Path(__file__).parent.parent / "test_data" / "misp" / "misp_test.html"
ADVISORY_HTML_DATA = Path(__file__).parent.parent / "test_data" / "misp" / "misp_advisory_test.html"


def load_expected_data(file):
with open(file) as f:
return json.load(f)


@mock.patch("requests.get")
def test_fetch_advisory_links(mock_get):
"""Test fetching advisory links from MISP security advisories page."""

# Read mock HTML content
with open(HTML_DATA, "r", encoding="utf-8") as file:
mock_html_content = file.read()

# Mock HTTP response
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.content = mock_html_content.encode()

mock_get.return_value = mock_response

# Call function under test
links = misp_importer.fetch_advisory_links("https://www.misp-project.org/security/")

# Ensure the links are extracted correctly
assert isinstance(links, list)
assert len(links) > 0
assert "https://cve.circl.lu/vuln/fkie_cve-2015-5719" in links


@mock.patch("requests.get")
def test_fetch_advisory_data(mock_get):
"""Test fetching and parsing advisory data from the CVE page."""

# Load expected advisory data from JSON file
expected_data = load_expected_data(EXPECTED_DATA)

# Read mock HTML content
with open(ADVISORY_HTML_DATA, "r", encoding="utf-8") as file:
mock_html_content = file.read()

# Create a proper mock response object
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.text = mock_html_content
mock_response.content = mock_html_content.encode("utf-8")

mock_get.return_value = mock_response

# Call function under test
advisory_data = misp_importer.fetch_advisory_data(
"https://cve.circl.lu/vuln/fkie_cve-2015-5719"
)

# Validate extracted advisory data
assert advisory_data["alias"] == expected_data["aliases"]
assert advisory_data["description"] == expected_data["summary"]
assert advisory_data["date_published"] == expected_data["date_published"]


@mock.patch("requests.get")
def test_misp_importer_pipeline_collect_advisories(mock_get):
"""Test the `collect_advisories` method in `MISPImporterPipeline`."""

with open(ADVISORY_HTML_DATA, "r", encoding="utf-8") as file:
mock_html_content = file.read()

# Mock HTTP Response
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.text = mock_html_content
mock_response.content = mock_html_content.encode("utf-8")
mock_get.return_value = mock_response

# Initialize the pipeline
pipeline = misp_importer.MISPImporterPipeline()

with mock.patch(
"vulnerabilities.pipelines.misp_importer.fetch_advisory_links"
) as mock_links, mock.patch(
"vulnerabilities.pipelines.misp_importer.fetch_advisory_data"
) as mock_data:
mock_links.return_value = ["https://cve.circl.lu/vuln/fkie_cve-2015-5719"]
mock_data.return_value = {
"description": "app/Controller/TemplatesController.php in Malware Information Sharing Platform (MISP) before 2.3.92 does not properly restrict filenames under the tmp/files/ directory, which has unspecified impact and attack vectors.",
"alias": "CVE-2015-5719",
"date_published": "2016-09-03T20:59:00.153",
"references": "http://www.securityfocus.com/bid/92740",
"cve_score": {"version": "cvssMetricV30", "score": 9.8},
"affected_version": "2.3.92",
}
generator = pipeline.collect_advisories()
advisories = list(generator)

assert len(advisories) == 1
advisory = advisories[0]

assert advisory.aliases == advisory_data.aliases
assert advisory.date_published == advisory_data.date_published
assert advisory.summary == advisory_data.summary
assert advisory.affected_packages == advisory.affected_packages


advisory_data = AdvisoryData(
aliases="CVE-2015-5719",
summary="app/Controller/TemplatesController.php in Malware Information Sharing Platform (MISP) before 2.3.92 does not properly restrict filenames under the tmp/files/ directory, which has unspecified impact and attack vectors.",
affected_packages=[
AffectedPackage(
package=PackageURL(
type="misp", namespace=None, name="MISP", version=None, qualifiers={}, subpath=None
),
affected_version_range=GenericVersionRange(
constraints=(
VersionConstraint(comparator="=", version=SemverVersion(string="2.3.92")),
)
),
fixed_version=None,
)
],
references=[
Reference(
reference_id="CVE-2015-5719",
reference_type="",
url="http://www.securityfocus.com/bid/92740",
severities=[
VulnerabilitySeverity(
system=Cvssv3ScoringSystem(
identifier="cvssv3",
name="CVSSv3 Base Score",
url="https://www.first.org/cvss/v3-0/",
notes="CVSSv3 base score and vector",
),
value=9.8,
scoring_elements="CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
published_at=None,
)
],
)
],
date_published=datetime.datetime(2016, 9, 3, 20, 59, 0, 153000, tzinfo=datetime.timezone.utc),
weaknesses=[],
url="https://cve.circl.lu/vuln/fkie_cve-2015-5719",
)
Loading