Skip to content

Commit cbda0ca

Browse files
keshav-spaceTG1999
andauthored
Fast content ID migration (#1795)
Reference: #1796 Recompute unique content ID for advisories and dedupe the advisories * Add new content ID function Signed-off-by: Tushar Goel <[email protected]> * Add tests and address review comments Signed-off-by: Tushar Goel <[email protected]> * New content ID pipeline Signed-off-by: Tushar Goel <[email protected]> * New content ID pipeline Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Address review comments Signed-off-by: Tushar Goel <[email protected]> * Remove unique content ID from unqiue together Signed-off-by: Tushar Goel <[email protected]> * Remove unique together from advisories Signed-off-by: Tushar Goel <[email protected]> * Fix migrations Signed-off-by: Tushar Goel <[email protected]> * Fix pipeline errors Signed-off-by: Tushar Goel <[email protected]> * Add filter for fast itreation Signed-off-by: Tushar Goel <[email protected]> * Increase batch size Signed-off-by: Tushar Goel <[email protected]> * Fix error Signed-off-by: Tushar Goel <[email protected]> * Add logs Signed-off-by: Tushar Goel <[email protected]> * Defer db indexing for content id Signed-off-by: Keshav Priyadarshi <[email protected]> * Ensure the reference id is always a string Signed-off-by: Keshav Priyadarshi <[email protected]> * Alternate content id migration pipeline Signed-off-by: Keshav Priyadarshi <[email protected]> * Keep the oldest advisory while deduping Signed-off-by: Keshav Priyadarshi <[email protected]> * Use iterator() instead of paginated() for fetching advisories - paginated() performs poorly when iterating over large records compared to the built-in iterator() Signed-off-by: Keshav Priyadarshi <[email protected]> * Move pipeline test to test/pipelines/ Signed-off-by: Keshav Priyadarshi <[email protected]> * Update test for the new dedupe pipeline Signed-off-by: Keshav Priyadarshi <[email protected]> --------- Signed-off-by: Tushar Goel <[email protected]> Signed-off-by: Keshav Priyadarshi <[email protected]> Co-authored-by: Tushar Goel <[email protected]> Co-authored-by: Tushar Goel <[email protected]>
1 parent 432a7d4 commit cbda0ca

12 files changed

+821
-258
lines changed

vulnerabilities/importer.py

+47-18
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import dataclasses
1111
import datetime
12+
import functools
1213
import logging
1314
import os
1415
import shutil
@@ -46,7 +47,8 @@
4647
logger = logging.getLogger(__name__)
4748

4849

49-
@dataclasses.dataclass(order=True)
50+
@dataclasses.dataclass(eq=True)
51+
@functools.total_ordering
5052
class VulnerabilitySeverity:
5153
# FIXME: this should be named scoring_system, like in the model
5254
system: ScoringSystem
@@ -55,15 +57,26 @@ class VulnerabilitySeverity:
5557
published_at: Optional[datetime.datetime] = None
5658

5759
def to_dict(self):
58-
published_at_dict = (
59-
{"published_at": self.published_at.isoformat()} if self.published_at else {}
60-
)
61-
return {
60+
data = {
6261
"system": self.system.identifier,
6362
"value": self.value,
6463
"scoring_elements": self.scoring_elements,
65-
**published_at_dict,
6664
}
65+
if self.published_at:
66+
if isinstance(self.published_at, datetime.datetime):
67+
data["published_at"] = self.published_at.isoformat()
68+
else:
69+
data["published_at"] = self.published_at
70+
return data
71+
72+
def __lt__(self, other):
73+
if not isinstance(other, VulnerabilitySeverity):
74+
return NotImplemented
75+
return self._cmp_key() < other._cmp_key()
76+
77+
# TODO: Add cache
78+
def _cmp_key(self):
79+
return (self.system.identifier, self.value, self.scoring_elements, self.published_at)
6780

6881
@classmethod
6982
def from_dict(cls, severity: dict):
@@ -79,7 +92,8 @@ def from_dict(cls, severity: dict):
7992
)
8093

8194

82-
@dataclasses.dataclass(order=True)
95+
@dataclasses.dataclass(eq=True)
96+
@functools.total_ordering
8397
class Reference:
8498
reference_id: str = ""
8599
reference_type: str = ""
@@ -90,27 +104,28 @@ def __post_init__(self):
90104
if not self.url:
91105
raise TypeError("Reference must have a url")
92106

93-
def normalized(self):
94-
severities = sorted(self.severities)
95-
return Reference(
96-
reference_id=self.reference_id,
97-
url=self.url,
98-
severities=severities,
99-
reference_type=self.reference_type,
100-
)
107+
def __lt__(self, other):
108+
if not isinstance(other, Reference):
109+
return NotImplemented
110+
return self._cmp_key() < other._cmp_key()
111+
112+
# TODO: Add cache
113+
def _cmp_key(self):
114+
return (self.reference_id, self.reference_type, self.url, tuple(self.severities))
101115

102116
def to_dict(self):
117+
"""Return a normalized dictionary representation"""
103118
return {
104119
"reference_id": self.reference_id,
105120
"reference_type": self.reference_type,
106121
"url": self.url,
107-
"severities": [severity.to_dict() for severity in self.severities],
122+
"severities": [severity.to_dict() for severity in sorted(self.severities)],
108123
}
109124

110125
@classmethod
111126
def from_dict(cls, ref: dict):
112127
return cls(
113-
reference_id=ref["reference_id"],
128+
reference_id=str(ref["reference_id"]),
114129
reference_type=ref.get("reference_type") or "",
115130
url=ref["url"],
116131
severities=[
@@ -140,7 +155,8 @@ class NoAffectedPackages(Exception):
140155
"""
141156

142157

143-
@dataclasses.dataclass(order=True, frozen=True)
158+
@functools.total_ordering
159+
@dataclasses.dataclass(eq=True)
144160
class AffectedPackage:
145161
"""
146162
Relate a Package URL with a range of affected versions and a fixed version.
@@ -170,6 +186,19 @@ def get_fixed_purl(self):
170186
raise ValueError(f"Affected Package {self.package!r} does not have a fixed version")
171187
return update_purl_version(purl=self.package, version=str(self.fixed_version))
172188

189+
def __lt__(self, other):
190+
if not isinstance(other, AffectedPackage):
191+
return NotImplemented
192+
return self._cmp_key() < other._cmp_key()
193+
194+
# TODO: Add cache
195+
def _cmp_key(self):
196+
return (
197+
str(self.package),
198+
str(self.affected_version_range or ""),
199+
str(self.fixed_version or ""),
200+
)
201+
173202
@classmethod
174203
def merge(
175204
cls, affected_packages: Iterable

vulnerabilities/improvers/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from vulnerabilities.pipelines import enhance_with_kev
1919
from vulnerabilities.pipelines import enhance_with_metasploit
2020
from vulnerabilities.pipelines import flag_ghost_packages
21+
from vulnerabilities.pipelines import remove_duplicate_advisories
2122

2223
IMPROVERS_REGISTRY = [
2324
valid_versions.GitHubBasicImprover,
@@ -45,6 +46,7 @@
4546
compute_package_version_rank.ComputeVersionRankPipeline,
4647
collect_commits.CollectFixCommitsPipeline,
4748
add_cvss31_to_CVEs.CVEAdvisoryMappingPipeline,
49+
remove_duplicate_advisories.RemoveDuplicateAdvisoriesPipeline,
4850
]
4951

5052
IMPROVERS_REGISTRY = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Generated by Django 4.2.17 on 2025-02-27 07:47
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vulnerabilities", "0088_fix_alpine_purl_type"),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name="advisory",
15+
name="unique_content_id",
16+
field=models.CharField(
17+
blank=True,
18+
help_text="A 64 character unique identifier for the content of the advisory since we use sha256 as hex",
19+
max_length=64,
20+
),
21+
),
22+
]

vulnerabilities/models.py

+5-11
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from vulnerabilities import utils
5454
from vulnerabilities.severity_systems import EPSS
5555
from vulnerabilities.severity_systems import SCORING_SYSTEMS
56+
from vulnerabilities.utils import compute_content_id
5657
from vulnerabilities.utils import normalize_purl
5758
from vulnerabilities.utils import purl_to_dict
5859
from vulnerablecode import __version__ as VULNERABLECODE_VERSION
@@ -1315,8 +1316,9 @@ class Advisory(models.Model):
13151316
"""
13161317

13171318
unique_content_id = models.CharField(
1318-
max_length=32,
1319+
max_length=64,
13191320
blank=True,
1321+
help_text="A 64 character unique identifier for the content of the advisory since we use sha256 as hex",
13201322
)
13211323
aliases = models.JSONField(blank=True, default=list, help_text="A list of alias strings")
13221324
summary = models.TextField(
@@ -1357,16 +1359,8 @@ class Meta:
13571359
ordering = ["aliases", "date_published", "unique_content_id"]
13581360

13591361
def save(self, *args, **kwargs):
1360-
checksum = hashlib.md5()
1361-
for field in (
1362-
self.summary,
1363-
self.affected_packages,
1364-
self.references,
1365-
self.weaknesses,
1366-
):
1367-
value = json.dumps(field, separators=(",", ":")).encode("utf-8")
1368-
checksum.update(value)
1369-
self.unique_content_id = checksum.hexdigest()
1362+
advisory_data = self.to_advisory_data()
1363+
self.unique_content_id = compute_content_id(advisory_data, include_metadata=False)
13701364
super().save(*args, **kwargs)
13711365

13721366
def to_advisory_data(self) -> "AdvisoryData":
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from aboutcode.pipeline import LoopProgress
11+
12+
from vulnerabilities.models import Advisory
13+
from vulnerabilities.pipelines import VulnerableCodePipeline
14+
from vulnerabilities.utils import compute_content_id
15+
16+
17+
class RemoveDuplicateAdvisoriesPipeline(VulnerableCodePipeline):
18+
"""Pipeline to compute new advisory content id and remove duplicate advisories based on their content."""
19+
20+
pipeline_id = "remove_duplicate_advisories"
21+
22+
@classmethod
23+
def steps(cls):
24+
return (cls.remove_duplicates,)
25+
26+
def remove_duplicates(self):
27+
"""
28+
Recompute the content ID and remove duplicate advisories, keeping the oldest one.
29+
"""
30+
31+
advisories_count = Advisory.objects.all().count()
32+
self.log(f"Computing new content id for {advisories_count} and removing duplicates.")
33+
34+
update_batch_size = 500
35+
delete_batch_size = 5000
36+
chunk_size = 5000
37+
deleted_advisories_count = 0
38+
updated_advisories_count = 0
39+
duplicate_advisory_ids = []
40+
advisories_to_update = []
41+
content_ids = set()
42+
43+
advisories = Advisory.objects.all().order_by("id").iterator(chunk_size=chunk_size)
44+
45+
progress = LoopProgress(
46+
total_iterations=advisories_count,
47+
logger=self.log,
48+
progress_step=1,
49+
)
50+
51+
for advisory in progress.iter(advisories):
52+
content_id = compute_content_id(advisory.to_advisory_data())
53+
54+
if content_id in content_ids:
55+
duplicate_advisory_ids.append(advisory.id)
56+
else:
57+
content_ids.add(content_id)
58+
if advisory.unique_content_id != content_id:
59+
advisory.unique_content_id = content_id
60+
advisories_to_update.append(advisory)
61+
62+
if len(duplicate_advisory_ids) > delete_batch_size:
63+
deleted_advisories_count += delete_advisories(
64+
advisory_ids=duplicate_advisory_ids,
65+
logger=self.log,
66+
)
67+
duplicate_advisory_ids.clear()
68+
69+
if len(advisories_to_update) > update_batch_size:
70+
updated_advisories_count += bulk_update_advisories(
71+
advisories=advisories_to_update,
72+
fields=["unique_content_id"],
73+
logger=self.log,
74+
)
75+
advisories_to_update.clear()
76+
77+
deleted_advisories_count += delete_advisories(
78+
advisory_ids=duplicate_advisory_ids,
79+
logger=self.log,
80+
)
81+
updated_advisories_count += bulk_update_advisories(
82+
advisories=advisories_to_update,
83+
fields=["unique_content_id"],
84+
logger=self.log,
85+
)
86+
87+
self.log(f"Removed {deleted_advisories_count} duplicates advisories.")
88+
self.log(f"Updated content id for {deleted_advisories_count} advisories.")
89+
90+
91+
def bulk_update_advisories(advisories, fields, logger):
92+
item_count = 0
93+
if advisories:
94+
try:
95+
Advisory.objects.bulk_update(objs=advisories, fields=fields)
96+
item_count += len(advisories)
97+
except Exception as e:
98+
logger(f"Error updating Advisory: {e}")
99+
return item_count
100+
101+
102+
def delete_advisories(advisory_ids, logger):
103+
item_count = 0
104+
if advisory_ids:
105+
try:
106+
Advisory.objects.filter(id__in=advisory_ids).delete()
107+
item_count += len(advisory_ids)
108+
except Exception as e:
109+
logger(f"Error deleting Advisory: {e}")
110+
return item_count

vulnerabilities/severity_systems.py

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ def compute(self, scoring_elements: str) -> str:
4242
def get(self, scoring_elements: str):
4343
return NotImplementedError
4444

45+
def __str__(self):
46+
return f"{self.identifier}"
47+
4548

4649
@dataclasses.dataclass(order=True)
4750
class Cvssv2ScoringSystem(ScoringSystem):

0 commit comments

Comments
 (0)