@@ -136,6 +136,14 @@ class CVEDB:
136
136
UNIQUE(purl,cpe)
137
137
)
138
138
""" ,
139
+ "cve_cwe" : """
140
+ CREATE TABLE IF NOT EXISTS cve_cwe (
141
+ cve_number TEXT,
142
+ cwe TEXT,
143
+ data_source TEXT,
144
+ FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source)
145
+ )
146
+ """ ,
139
147
}
140
148
141
149
# This is mostly to make bandit happier because we won't be
@@ -148,6 +156,7 @@ class CVEDB:
148
156
"metrics" : "DROP TABLE metrics" ,
149
157
"mismatch" : "DROP TABLE mismatch" ,
150
158
"purl2cpe" : "DROP TABLE purl2cpe" ,
159
+ "cve_cwe" : "DROP TABLE cve_cwe" ,
151
160
}
152
161
153
162
INDEXES = {
@@ -163,6 +172,7 @@ class CVEDB:
163
172
"metrics" : "SELECT * FROM metrics WHERE 1=0" ,
164
173
"mismatch" : "SELECT * FROM mismatch WHERE 1=0" ,
165
174
"purl2cpe" : "SELECT * FROM purl2cpe WHERE 1=0" ,
175
+ "cve_cwe" : "SELECT * FROM cve_cwe WHERE 1=0"
166
176
}
167
177
168
178
INSERT_QUERIES = {
@@ -217,6 +227,14 @@ class CVEDB:
217
227
)
218
228
VALUES (?, ?)
219
229
""" ,
230
+ "insert_cve_cwe" : """
231
+ INSERT or REPLACE INTO cve_cwe (
232
+ cve_number,
233
+ cwe,
234
+ data_source
235
+ )
236
+ VALUES (?,?,?)
237
+ """ ,
220
238
}
221
239
222
240
def __init__ (
@@ -333,6 +351,7 @@ def get_cvelist_if_stale(self) -> None:
333
351
or not self .latest_schema (
334
352
"cve_exploited" , self .TABLE_SCHEMAS ["cve_exploited" ]
335
353
)
354
+ or not self .latest_schema ("cve_cwe" , self .TABLE_SCHEMAS ["cve_cwe" ])
336
355
):
337
356
self .refresh_cache_and_update_db ()
338
357
self .time_of_last_update = datetime .datetime .today ()
@@ -513,6 +532,7 @@ def populate_db(self) -> None:
513
532
severity_data , cursor , data_source = source_name
514
533
)
515
534
self .populate_cve_metrics (severity_data , cursor )
535
+ self .populate_cve_cwe (severity_data , cursor ,data_source = source_name )
516
536
if affected_data is not None :
517
537
self .populate_affected (
518
538
affected_data ,
@@ -599,6 +619,27 @@ def populate_cve_metrics(self, severity_data, cursor):
599
619
except Exception as e :
600
620
LOGGER .info (f"Unable to insert data for { e } \n { cve } " )
601
621
622
+ def populate_cve_cwe (self , severity_data , cursor , data_source ):
623
+ """Adds data into CVE CWE table."""
624
+ insert_cve_cwe = self .INSERT_QUERIES ["insert_cve_cwe" ]
625
+
626
+ for cve in severity_data :
627
+ try :
628
+ if "cwes" in cve and len (cve ["cwes" ]) > 0 :
629
+ cursor .executemany (
630
+ insert_cve_cwe ,
631
+ [
632
+ (
633
+ cve ["ID" ],
634
+ cwe ,
635
+ data_source ,
636
+ )
637
+ for cwe in cve ["cwes" ]
638
+ ],
639
+ )
640
+ except Exception as e :
641
+ LOGGER .info (f"Unable to insert data for { e } \n { cve } " )
642
+
602
643
def populate_affected (self , affected_data , cursor , data_source ):
603
644
"""Populate database with affected versions."""
604
645
insert_cve_range = self .INSERT_QUERIES ["insert_cve_range" ]
@@ -899,6 +940,7 @@ def delete_old_files_if_exists(self, path):
899
940
"cve_severity" ,
900
941
"cve_metrics" ,
901
942
"metrics" ,
943
+ "cve_cwe" ,
902
944
]
903
945
for directory in DIRECTORIES :
904
946
if (path / directory ).exists ():
@@ -1058,6 +1100,8 @@ def json_to_db(self, cursor, db_column, json_data):
1058
1100
cursor .executemany (self .INSERT_QUERIES ["insert_cve_metrics" ], values )
1059
1101
elif db_column == "metrics" :
1060
1102
cursor .executemany (self .INSERT_QUERIES ["insert_metrics" ], values )
1103
+ elif db_column == "cve_cwe" :
1104
+ cursor .executemany (self .INSERT_QUERIES ["insert_cve_cwe" ], values )
1061
1105
1062
1106
def json_to_db_wrapper (self , path , pubkey , ignore_signature , log_signature_error ):
1063
1107
"""Initialize the process wrapper to insert records into database from JSON."""
@@ -1083,6 +1127,7 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error
1083
1127
cursor .execute (self .TABLE_SCHEMAS ["cve_exploited" ])
1084
1128
cursor .execute (self .TABLE_SCHEMAS ["cve_metrics" ])
1085
1129
cursor .execute (self .TABLE_SCHEMAS ["metrics" ])
1130
+ cursor .execute (self .TABLE_SCHEMAS ["cve_cwe" ])
1086
1131
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
1087
1132
cursor .execute (index_range )
1088
1133
metadata_fd = open (path / "metadata.json" )
0 commit comments