From 6ffc5d0bdae1d312d541bfd2e71022cbfbca9519 Mon Sep 17 00:00:00 2001 From: Mughees Ur Rehman <108094512+Mughees2001@users.noreply.github.com> Date: Sat, 8 Mar 2025 17:39:19 +0500 Subject: [PATCH 1/4] fix: update sources.rst - Changed label from anchore to anchore_nvd_overrides to fix duplicate label issue - Added anchore entry to the importer table Signed-off-by: Mughees Ur Rehman <108094512+Mughees2001@users.noreply.github.com> --- SOURCES.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/SOURCES.rst b/SOURCES.rst index bc0963a10..fdd4b091c 100644 --- a/SOURCES.rst +++ b/SOURCES.rst @@ -51,3 +51,5 @@ +----------------+------------------------------------------------------------------------------------------------------+----------------------------------------------------+ |mattermost | https://mattermost.com/security-updates/ |mattermost server, desktop and mobile apps | +----------------+------------------------------------------------------------------------------------------------------+----------------------------------------------------+ +|anchore | https://github.com/anchore/nvd-data-overrides |generic packages | ++----------------+------------------------------------------------------------------------------------------------------+----------------------------------------------------+ From 97e0f72675750b7d663d7de14ad8d00c6d500fe1 Mon Sep 17 00:00:00 2001 From: Mughees Ur Rehman <108094512+Mughees2001@users.noreply.github.com> Date: Sat, 8 Mar 2025 17:42:00 +0500 Subject: [PATCH 2/4] feat: added imports Signed-off-by: Mughees Ur Rehman <108094512+Mughees2001@users.noreply.github.com> --- vulnerabilities/importers/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 3f429f669..93112ff50 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -42,6 +42,7 @@ from vulnerabilities.pipelines import nvd_importer from vulnerabilities.pipelines import pypa_importer from vulnerabilities.pipelines import pysec_importer +from vulnerabilities.pipelines import anchore_importer IMPORTERS_REGISTRY = [ openssl.OpensslImporter, @@ -78,6 +79,7 @@ nvd_importer.NVDImporterPipeline, pysec_importer.PyPIImporterPipeline, alpine_linux_importer.AlpineLinuxImporterPipeline, + anchore_importer.AnchoreImporterPipeline, ] IMPORTERS_REGISTRY = { From 03ab167a700311887c643ade87ae3c52d4d37f4a Mon Sep 17 00:00:00 2001 From: Mughees Ur Rehman <108094512+Mughees2001@users.noreply.github.com> Date: Sat, 8 Mar 2025 17:43:57 +0500 Subject: [PATCH 3/4] feat: added anchore_importer Signed-off-by: Mughees Ur Rehman <108094512+Mughees2001@users.noreply.github.com> --- vulnerabilities/pipelines/anchore_importer.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 vulnerabilities/pipelines/anchore_importer.py diff --git a/vulnerabilities/pipelines/anchore_importer.py b/vulnerabilities/pipelines/anchore_importer.py new file mode 100644 index 000000000..e816d9bb4 --- /dev/null +++ b/vulnerabilities/pipelines/anchore_importer.py @@ -0,0 +1,85 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from datetime import datetime +from typing import Iterable + +import requests +import yaml +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importer import Reference +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline + + +class AnchoreImporterPipeline(VulnerableCodeBaseImporterPipeline): + + pipeline_id = "anchore_importer" + root_url = "https://github.com/anchore/nvd-data-overrides" + license_url = "https://github.com/anchore/nvd-data-overrides/blob/main/LICENSE" + spdx_license_expression = "CC0-1.0" + importer_name = "Anchore NVD Overrides Importer" + + @classmethod + def steps(cls): + return ( + cls.collect_and_store_advisories, + cls.import_new_advisories, + ) + + def advisories_count(self) -> int: + raw_data = self.fetch_data() + return len(raw_data) + + def collect_advisories(self) -> Iterable[AdvisoryData]: + raw_data = self.fetch_data() + for entry in raw_data: + yield self.parse_advisory_data(entry) + + def fetch_data(self): + url = "https://raw.githubusercontent.com/anchore/nvd-data-overrides/main/overrides.yaml" + response = requests.get(url) + response.raise_for_status() + return yaml.safe_load(response.text) + + def parse_advisory_data(self, raw_data) -> AdvisoryData: + if not all(key in raw_data for key in ["cve_id", "package_name", "affected_versions"]): + return None + + purl = PackageURL(type="generic", name=raw_data["package_name"]) + affected_version_range = raw_data["affected_versions"] + fixed_version = ( + SemverVersion(raw_data["fixed_version"]) if raw_data.get("fixed_version") else None + ) + + affected_package = AffectedPackage( + package=purl, + affected_version_range=affected_version_range, + fixed_version=fixed_version, + ) + + references = [ + Reference(url=url) for url in raw_data.get("references", []) if url + ] + date_published = ( + datetime.strptime(raw_data["published_date"], "%Y-%m-%d") + if raw_data.get("published_date") + else None + ) + + return AdvisoryData( + aliases=[raw_data["cve_id"]], + summary=raw_data.get("description", ""), + affected_packages=[affected_package], + references=references, + date_published=date_published, + ) From 2b31b86e324e7e58e637adead9fd152cfec740f7 Mon Sep 17 00:00:00 2001 From: Mughees2001 Date: Mon, 10 Mar 2025 02:44:10 -0400 Subject: [PATCH 4/4] feat: updated anchore_importer.py --- vulnerabilities/pipelines/anchore_importer.py | 201 ++++++++++++++---- 1 file changed, 163 insertions(+), 38 deletions(-) diff --git a/vulnerabilities/pipelines/anchore_importer.py b/vulnerabilities/pipelines/anchore_importer.py index e816d9bb4..68c05a24c 100644 --- a/vulnerabilities/pipelines/anchore_importer.py +++ b/vulnerabilities/pipelines/anchore_importer.py @@ -8,12 +8,12 @@ # from datetime import datetime -from typing import Iterable +from typing import Iterable, List, Dict, Any, Optional +import logging import requests -import yaml from packageurl import PackageURL -from univers.versions import SemverVersion +from univers.version_range import VersionRange from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -21,6 +21,9 @@ from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline +logger = logging.getLogger(__name__) + + class AnchoreImporterPipeline(VulnerableCodeBaseImporterPipeline): pipeline_id = "anchore_importer" @@ -37,49 +40,171 @@ def steps(cls): ) def advisories_count(self) -> int: - raw_data = self.fetch_data() - return len(raw_data) + """Return the count of all advisories available from the data source.""" + data_dirs = self._get_data_directories() + count = 0 + for dir_url in data_dirs: + files = self._get_json_files(dir_url) + count += len(files) + return count def collect_advisories(self) -> Iterable[AdvisoryData]: - raw_data = self.fetch_data() - for entry in raw_data: - yield self.parse_advisory_data(entry) + """Collect advisory data from the Anchore NVD Data Overrides repository.""" + data_dirs = self._get_data_directories() + for dir_url in data_dirs: + files = self._get_json_files(dir_url) + for file_url in files: + try: + raw_data = self._fetch_json_data(file_url) + if raw_data: + advisory = self.parse_advisory_data(raw_data) + if advisory: + yield advisory + except Exception as e: + logger.error(f"Error processing file {file_url}: {e}") - def fetch_data(self): - url = "https://raw.githubusercontent.com/anchore/nvd-data-overrides/main/overrides.yaml" - response = requests.get(url) + def _get_data_directories(self) -> List[str]: + """Get the list of year directories in the data folder.""" + contents_url = "https://api.github.com/repos/anchore/nvd-data-overrides/contents/data" + response = requests.get(contents_url) response.raise_for_status() - return yaml.safe_load(response.text) + + contents = response.json() + return [ + item["url"] + for item in contents + if item["type"] == "dir" + ] - def parse_advisory_data(self, raw_data) -> AdvisoryData: - if not all(key in raw_data for key in ["cve_id", "package_name", "affected_versions"]): - return None + def _get_json_files(self, dir_url: str) -> List[str]: + """Get the list of JSON files in a directory.""" + response = requests.get(dir_url) + response.raise_for_status() + + contents = response.json() + return [ + item["download_url"] + for item in contents + if item["type"] == "file" and item["name"].endswith(".json") + ] - purl = PackageURL(type="generic", name=raw_data["package_name"]) - affected_version_range = raw_data["affected_versions"] - fixed_version = ( - SemverVersion(raw_data["fixed_version"]) if raw_data.get("fixed_version") else None - ) + def _fetch_json_data(self, file_url: str) -> Dict[str, Any]: + """Fetch and parse JSON data from a file URL.""" + response = requests.get(file_url) + response.raise_for_status() + return response.json() - affected_package = AffectedPackage( - package=purl, - affected_version_range=affected_version_range, - fixed_version=fixed_version, - ) + def _extract_cpe_details(self, cpe_string: str) -> Optional[Dict[str, str]]: + """Extract vendor and product information from a CPE string. + + Example CPE: cpe:2.3:a:apache:http_server:*:*:*:*:*:*:*:* + """ + parts = cpe_string.split(":") + if len(parts) < 6: + return None + + return { + "vendor": parts[3], + "product": parts[4], + } - references = [ - Reference(url=url) for url in raw_data.get("references", []) if url - ] - date_published = ( - datetime.strptime(raw_data["published_date"], "%Y-%m-%d") - if raw_data.get("published_date") - else None - ) + def parse_advisory_data(self, raw_data: Dict[str, Any]) -> Optional[AdvisoryData]: + """Parse advisory data from the JSON data structure.""" + # Extract CVE ID from _annotation + annotation = raw_data.get("_annotation", {}) + cve_id = annotation.get("cve_id") + if not cve_id: + return None + # Extract summary/reason from _annotation + summary = annotation.get("reason", "") + + # Extract source reference + references = [] + source_url = annotation.get("generated_from") + if source_url: + references.append(Reference(url=source_url)) + + # Add repository URL as a reference + references.append(Reference(url=self.root_url)) + + # Extract affected packages from CPE matching information + affected_packages = [] + + try: + # Navigate through the nested structure to get to CPE matches + for config in raw_data.get("cve", {}).get("configurations", []): + for node in config.get("nodes", []): + for cpe_match in node.get("cpeMatch", []): + # Only process if marked as vulnerable + if not cpe_match.get("vulnerable", False): + continue + + # Extract CPE information + criteria = cpe_match.get("criteria") + if not criteria: + continue + + cpe_details = self._extract_cpe_details(criteria) + if not cpe_details: + continue + + # Create package URL + purl = PackageURL( + type="generic", + namespace=cpe_details["vendor"], + name=cpe_details["product"] + ) + + # Extract version constraints + version_constraints = {} + if "versionStartIncluding" in cpe_match: + version_constraints["min"] = cpe_match["versionStartIncluding"] + version_constraints["min_included"] = True + elif "versionStartExcluding" in cpe_match: + version_constraints["min"] = cpe_match["versionStartExcluding"] + version_constraints["min_included"] = False + + if "versionEndIncluding" in cpe_match: + version_constraints["max"] = cpe_match["versionEndIncluding"] + version_constraints["max_included"] = True + elif "versionEndExcluding" in cpe_match: + version_constraints["max"] = cpe_match["versionEndExcluding"] + version_constraints["max_included"] = False + + # Create version range string based on constraints + if version_constraints: + range_parts = [] + + if "min" in version_constraints: + operator = ">=" if version_constraints.get("min_included", False) else ">" + range_parts.append(f"{operator}{version_constraints['min']}") + + if "max" in version_constraints: + operator = "<=" if version_constraints.get("max_included", False) else "<" + range_parts.append(f"{operator}{version_constraints['max']}") + + affected_version_range = ",".join(range_parts) + + affected_package = AffectedPackage( + package=purl, + affected_version_range=affected_version_range, + fixed_version=None, # No explicit fixed version in this format + ) + affected_packages.append(affected_package) + except Exception as e: + logger.error(f"Error parsing CPE data for {cve_id}: {e}") + + # If we couldn't extract any package information, return None + if not affected_packages: + logger.warning(f"No affected packages found for {cve_id}") + return None + + # Create and return the advisory data return AdvisoryData( - aliases=[raw_data["cve_id"]], - summary=raw_data.get("description", ""), - affected_packages=[affected_package], + aliases=[cve_id], + summary=summary, + affected_packages=affected_packages, references=references, - date_published=date_published, - ) + date_published=None, # No publication date in this format + ) \ No newline at end of file