Skip to content

Commit 662c1ee

Browse files
michaelehabziadhany
authored andcommitted
Add Elixir Security Live V2 Importer Pipeline #1933
* Add Elixir Security Live V2 Importer * Add tests for the Elixir Security Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>
1 parent c354ac0 commit 662c1ee

3 files changed

Lines changed: 247 additions & 0 deletions

File tree

vulnerabilities/importers/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@
8282
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
8383
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
8484
from vulnerabilities.utils import create_registry
85+
from vulnerabilities.pipelines.v2_importers import (
86+
elixir_security_live_importer as elixir_security_live_importer_v2,
87+
)
8588

8689
IMPORTERS_REGISTRY = create_registry(
8790
[
@@ -196,3 +199,9 @@
196199
for key, value in IMPORTERS_REGISTRY.items()
197200
if issubclass(value, VulnerableCodeBaseImporterPipelineV2) and value.exclude_from_package_todo
198201
]
202+
203+
LIVE_IMPORTERS_REGISTRY = create_registry(
204+
[
205+
elixir_security_live_importer_v2.ElixirSecurityLiveImporterPipeline,
206+
]
207+
)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from typing import Iterable
11+
12+
import requests
13+
import yaml
14+
from packageurl import PackageURL
15+
from univers.versions import SemverVersion
16+
17+
from vulnerabilities.importer import AdvisoryData
18+
from vulnerabilities.pipelines.v2_importers.elixir_security_importer import (
19+
ElixirSecurityImporterPipeline,
20+
)
21+
22+
23+
class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline):
24+
"""
25+
Elixir Security Advisories Importer Pipeline
26+
27+
This pipeline imports security advisories for a single elixir PURL.
28+
"""
29+
30+
pipeline_id = "elixir_security_live_importer_v2"
31+
supported_types = ["hex"]
32+
33+
@classmethod
34+
def steps(cls):
35+
return (
36+
cls.get_purl_inputs,
37+
cls.collect_and_store_advisories,
38+
)
39+
40+
def get_purl_inputs(self):
41+
purl = self.inputs["purl"]
42+
if not purl:
43+
raise ValueError("PURL is required for ElixirSecurityLiveImporterPipeline")
44+
45+
if isinstance(purl, str):
46+
purl = PackageURL.from_string(purl)
47+
48+
if not isinstance(purl, PackageURL):
49+
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")
50+
51+
if purl.type not in self.supported_types:
52+
raise ValueError(
53+
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
54+
)
55+
56+
if not purl.version:
57+
raise ValueError(f"PURL: {purl!s} is expected to have a version")
58+
59+
self.purl = purl
60+
61+
def advisories_count(self) -> int:
62+
if self.purl.type != "hex":
63+
return 0
64+
65+
try:
66+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}"
67+
response = requests.get(directory_url)
68+
69+
if response.status_code != 200:
70+
return 0
71+
72+
yaml_files = [file for file in response.json() if file["name"].endswith(".yml")]
73+
return len(yaml_files)
74+
except Exception:
75+
return 0
76+
77+
def collect_advisories(self) -> Iterable[AdvisoryData]:
78+
if self.purl.type != "hex":
79+
self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer")
80+
return []
81+
82+
package_name = self.purl.name
83+
84+
try:
85+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
86+
response = requests.get(directory_url)
87+
88+
if response.status_code != 200:
89+
self.log(f"No advisories found for {package_name} in Elixir Security Database")
90+
return []
91+
92+
yaml_entries = [file for file in response.json() if file["name"].endswith(".yml")]
93+
94+
for entry in yaml_entries:
95+
# entry["path"] looks like: packages/<pkg>/<file>.yml
96+
file_path = entry["path"]
97+
content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
98+
content_response = requests.get(
99+
content_url, headers={"Accept": "application/vnd.github.v3.raw"}
100+
)
101+
102+
if content_response.status_code != 200:
103+
self.log(f"Failed to fetch file content for {file_path}")
104+
continue
105+
106+
advisory_text = content_response.text
107+
108+
try:
109+
yaml_file = yaml.safe_load(advisory_text) or {}
110+
except Exception as e:
111+
self.log(f"Failed to parse YAML for {file_path}: {e}")
112+
continue
113+
114+
for advisory in self.build_advisory_from_yaml(
115+
yaml_file=yaml_file, advisory_text=advisory_text, relative_path=file_path
116+
):
117+
if self.purl.version and not self._advisory_affects_version(advisory):
118+
continue
119+
yield advisory
120+
121+
except Exception as e:
122+
self.log(f"Error fetching advisories for {self.purl}: {str(e)}")
123+
return []
124+
125+
def _advisory_affects_version(self, advisory: AdvisoryData) -> bool:
126+
if not self.purl.version:
127+
return True
128+
129+
for affected_package in advisory.affected_packages:
130+
if affected_package.affected_version_range:
131+
try:
132+
purl_version = SemverVersion(self.purl.version)
133+
134+
if purl_version in affected_package.affected_version_range:
135+
return True
136+
except Exception as e:
137+
self.log(f"Failed to parse version {self.purl.version}: {str(e)}")
138+
return True
139+
140+
return False
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import shutil
11+
from pathlib import Path
12+
from unittest.mock import MagicMock
13+
from unittest.mock import patch
14+
15+
import pytest
16+
from packageurl import PackageURL
17+
18+
from vulnerabilities.importer import AdvisoryData
19+
from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import (
20+
ElixirSecurityLiveImporterPipeline,
21+
)
22+
23+
24+
@pytest.fixture
25+
def test_data_dir():
26+
return Path(__file__).parent.parent.parent / "test_data" / "elixir_security"
27+
28+
29+
@patch("requests.get")
30+
def test_package_first_mode_with_version_filter(mock_get, test_data_dir):
31+
directory_response = MagicMock()
32+
directory_response.status_code = 200
33+
directory_response.json.return_value = [
34+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
35+
]
36+
37+
advisory_file_path = test_data_dir / "test_file.yml"
38+
advisory_content = advisory_file_path.read_text()
39+
40+
content_response = MagicMock()
41+
content_response.status_code = 200
42+
content_response.text = advisory_content
43+
44+
mock_get.side_effect = [directory_response, content_response]
45+
46+
# Version affected
47+
purl = PackageURL(type="hex", name="coherence", version="0.5.1")
48+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
49+
importer.get_purl_inputs()
50+
advisories = list(importer.collect_advisories())
51+
assert len(advisories) == 1
52+
53+
# Version not affected
54+
mock_get.side_effect = [directory_response, content_response]
55+
purl = PackageURL(type="hex", name="coherence", version="0.5.2")
56+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
57+
importer.get_purl_inputs()
58+
advisories = list(importer.collect_advisories())
59+
assert len(advisories) == 0
60+
61+
62+
@patch("requests.get")
63+
def test_package_first_mode_no_advisories(mock_get):
64+
mock_response = MagicMock()
65+
mock_response.status_code = 404
66+
mock_get.return_value = mock_response
67+
68+
purl = PackageURL(type="hex", name="nonexistent-package")
69+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
70+
with pytest.raises(ValueError):
71+
importer.get_purl_inputs()
72+
73+
74+
@patch("requests.get")
75+
def test_package_first_mode_api_error(mock_get):
76+
directory_response = MagicMock()
77+
directory_response.status_code = 200
78+
directory_response.json.return_value = [
79+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
80+
]
81+
82+
content_response = MagicMock()
83+
content_response.status_code = 500
84+
85+
mock_get.side_effect = [directory_response, content_response]
86+
87+
purl = PackageURL(type="hex", name="coherence", version="0.5.1")
88+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
89+
importer.get_purl_inputs()
90+
advisories = list(importer.collect_advisories())
91+
assert len(advisories) == 0
92+
93+
94+
def test_package_first_mode_non_hex_purl():
95+
purl = PackageURL(type="npm", name="some-package")
96+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
97+
with pytest.raises(ValueError):
98+
importer.get_purl_inputs()

0 commit comments

Comments
 (0)