1010from typing import Iterable
1111
1212import requests
13- import yaml
1413from packageurl import PackageURL
1514from univers .versions import SemverVersion
1615
17- from vulnerabilities .importer import AdvisoryData
16+ from vulnerabilities .importer import AdvisoryDataV2
1817from vulnerabilities .pipelines .v2_importers .elixir_security_importer import (
1918 ElixirSecurityImporterPipeline ,
2019)
20+ from vulnerabilities .utils import fetch_yaml
2121
2222
2323class ElixirSecurityLiveImporterPipeline (ElixirSecurityImporterPipeline ):
@@ -53,34 +53,13 @@ def get_purl_inputs(self):
5353 f"PURL: { purl !s} is not among the supported package types { self .supported_types !r} "
5454 )
5555
56- if not purl .version :
57- raise ValueError (f"PURL: { purl !s} is expected to have a version" )
58-
5956 self .purl = purl
6057
6158 def advisories_count (self ) -> int :
62- if self .purl .type != "hex" :
63- return 0
64-
65- try :
66- directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{ self .purl .name } "
67- response = requests .get (directory_url )
68-
69- if response .status_code != 200 :
70- return 0
71-
72- yaml_files = [file for file in response .json () if file ["name" ].endswith (".yml" )]
73- return len (yaml_files )
74- except Exception :
75- return 0
76-
77- def collect_advisories (self ) -> Iterable [AdvisoryData ]:
78- if self .purl .type != "hex" :
79- self .log (f"PURL type { self .purl .type } is not supported by Elixir Security importer" )
80- return []
59+ return 0
8160
61+ def collect_advisories (self ) -> Iterable [AdvisoryDataV2 ]:
8262 package_name = self .purl .name
83-
8463 try :
8564 directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{ package_name } "
8665 response = requests .get (directory_url )
@@ -94,47 +73,46 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
9473 for entry in yaml_entries :
9574 # entry["path"] looks like: packages/<pkg>/<file>.yml
9675 file_path = entry ["path" ]
97- content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{ file_path } "
98- content_response = requests . get (
99- content_url , headers = {"Accept" : "application/vnd.github.v3.raw" }
76+ advisory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{ file_path } "
77+ advisory_text = fetch_yaml (
78+ advisory_url , headers = {"Accept" : "application/vnd.github.v3.raw" }
10079 )
10180
102- if content_response . status_code != 200 :
103- self . log ( f"Failed to fetch file content for { file_path } " )
104- continue
81+ path_segments = str ( file_path ). split ( "/" )
82+ # use the last two segments as the advisory ID
83+ advisory_id = "/" . join ( path_segments [ - 2 :]). replace ( ".yml" , "" )
10584
106- advisory_text = content_response .text
107-
108- try :
109- yaml_file = yaml .safe_load (advisory_text ) or {}
110- except Exception as e :
111- self .log (f"Failed to parse YAML for { file_path } : { e } " )
112- continue
113-
114- for advisory in self .build_advisory_from_yaml (
115- yaml_file = yaml_file , advisory_text = advisory_text , relative_path = file_path
85+ for advisory in self .build_advisory_from_text (
86+ advisory_id = advisory_id ,
87+ yaml_file = advisory_text ,
88+ advisory_url = advisory_url ,
11689 ):
117- if self .purl .version and not self ._advisory_affects_version (advisory ):
90+ if self .purl .version and not self .validate_advisory (advisory ):
11891 continue
11992 yield advisory
12093
12194 except Exception as e :
12295 self .log (f"Error fetching advisories for { self .purl } : { str (e )} " )
12396 return []
12497
125- def _advisory_affects_version (self , advisory : AdvisoryData ) -> bool :
98+ def validate_advisory (self , advisory : AdvisoryDataV2 ) -> bool :
12699 if not self .purl .version :
127100 return True
128101
129102 for affected_package in advisory .affected_packages :
130- if affected_package .affected_version_range :
131- try :
132- purl_version = SemverVersion (self .purl .version )
133-
134- if purl_version in affected_package .affected_version_range :
135- return True
136- except Exception as e :
137- self .log (f"Failed to parse version { self .purl .version } : { str (e )} " )
103+ try :
104+ purl_version = SemverVersion (self .purl .version )
105+ if (
106+ affected_package .affected_version_range
107+ and purl_version in affected_package .affected_version_range
108+ ) or (
109+ affected_package .fixed_version_range
110+ and purl_version in affected_package .fixed_version_range
111+ ):
138112 return True
139113
114+ except Exception as e :
115+ self .log (f"Failed to parse version { self .purl .version } : { str (e )} " )
116+ # Since we have a small package file, if we fail to parse the versions, we can just return all of them
117+ return True
140118 return False
0 commit comments