-
Notifications
You must be signed in to change notification settings - Fork 108
/
Copy pathrootfs.py
427 lines (350 loc) · 14.3 KB
/
rootfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https://github.com/aboutcode-org/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.
import fnmatch
import logging
import os
from collections import Counter
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
import attr
from commoncode.ignore import default_ignores
from container_inspector.distro import Distro
from packagedcode import plugin_package
from scanpipe import pipes
from scanpipe.pipes import flag
logger = logging.getLogger(__name__)
SUPPORTED_DISTROS = [
"alpine",
"debian",
"ubuntu",
"rhel",
"centos",
"fedora",
"sles",
"opensuse",
"mariner",
"opensuse-tumbleweed",
"photon",
"windows",
"rocky",
]
class DistroNotFound(Exception):
pass
class DistroNotSupported(Exception):
pass
@attr.attributes
class Resource:
rootfs_path = attr.attrib(
default=None,
metadata=dict(doc="The rootfs root-relative path for this Resource."),
)
location = attr.attrib(
default=None, metadata=dict(doc="The absolute location for this Resource.")
)
@attr.attributes
class RootFs:
"""A root filesystem."""
location = attr.attrib(
metadata=dict(doc="The root directory location where this rootfs lives.")
)
distro = attr.attrib(
default=None, metadata=dict(doc="The Distro object for this rootfs.")
)
def __attrs_post_init__(self, *args, **kwargs):
self.distro = Distro.from_rootfs(self.location)
if not self.distro:
os_release_path = os.path.join(self.location, "etc", "os-release")
if os.path.exists(os_release_path):
with open(os_release_path) as f:
data = dict(line.split("=", 1) for line in f if "=" in line)
distro_id = data.get("ID", "").strip('"')
version_id = data.get("VERSION_ID", "").strip('"')
if distro_id in SUPPORTED_DISTROS:
self.distro = Distro(
identifier=distro_id,
version=version_id,
)
logger.info(f"Fallback distro detection: {distro_id}")
@classmethod
def from_project_codebase(cls, project):
"""
Return RootFs objects collected from the project's "codebase" directory.
Each directory in the input/ is considered as the root of a root filesystem.
"""
subdirs = [path for path in project.codebase_path.glob("*/") if path.is_dir()]
for subdir in subdirs:
rootfs_location = str(subdir.absolute())
yield RootFs(location=rootfs_location)
def get_resources(self, with_dir=False):
"""Return a Resource for each file in this rootfs."""
return get_resources(location=self.location, with_dir=with_dir)
def get_installed_packages(self, packages_getter):
"""
Return tuples of (package_url, package) for installed packages found in
this rootfs layer using the `packages_getter` function or callable.
The `packages_getter()` function should:
- Accept a first argument string that is the root directory of
filesystem of this rootfs
- Return tuples of (package_url, package) where package_url is a
package_url string that uniquely identifies a package; while, a `package`
is an object that represents a package (typically a scancode-
toolkit packagedcode.models.Package class or some nested mapping with
the same structure).
The `packages_getter` function would typically query the system packages
database, such as an RPM database or similar, to collect the list of
installed system packages.
"""
return packages_getter(self.location)
def get_resources(location, with_dir=False):
"""Return the Resource found in the `location` in root directory of a rootfs."""
def get_res(parent, fname):
loc = os.path.join(parent, fname)
rootfs_path = pipes.normalize_path(loc.replace(location, ""))
return Resource(
location=loc,
rootfs_path=rootfs_path,
)
for top, dirs, files in os.walk(location):
for f in files:
yield get_res(parent=top, fname=f)
if with_dir:
for d in dirs:
yield get_res(parent=top, fname=d)
def create_codebase_resources(project, rootfs):
"""Create the CodebaseResource for a `rootfs` in `project`."""
for resource in rootfs.get_resources(with_dir=True):
pipes.make_codebase_resource(
project=project,
location=resource.location,
rootfs_path=resource.rootfs_path,
)
def has_hash_diff(install_file, codebase_resource):
"""
Return True if one of available hashes on both `install_file` and
`codebase_resource`, by hash type, is different.
For example: Alpine uses SHA1 while Debian uses MD5, we prefer the strongest hash
that's present.
"""
hash_types = ["sha512", "sha256", "sha1", "md5"]
for hash_type in hash_types:
# Find a suitable hash type that is present on both install_file and
# codebase_resource, skip otherwise.
share_hash_type = all(
[hasattr(install_file, hash_type), hasattr(codebase_resource, hash_type)]
)
if not share_hash_type:
continue
install_file_sum = getattr(install_file, hash_type)
codebase_resource_sum = getattr(codebase_resource, hash_type)
hashes_differ = all(
[
install_file_sum,
codebase_resource_sum,
install_file_sum != codebase_resource_sum,
]
)
if hashes_differ:
return True
return False
def package_getter(root_dir, **kwargs):
"""Return installed package objects."""
packages = plugin_package.get_installed_packages(root_dir)
for package in packages:
yield package.purl, package
def _create_system_package(project, purl, package):
"""Create system package and related resources."""
created_package = pipes.update_or_create_package(project, package.to_dict())
installed_files = []
if hasattr(package, "resources"):
installed_files = package.resources
# We have no files for this installed package, we cannot go further.
if not installed_files:
logger.info(f" No installed_files for: {purl}")
return
missing_resources = created_package.missing_resources[:]
modified_resources = created_package.modified_resources[:]
codebase_resources = project.codebaseresources.all()
for install_file in installed_files:
install_file_path = install_file.get_path(strip_root=True)
rootfs_path = pipes.normalize_path(install_file_path)
logger.info(f" installed file rootfs_path: {rootfs_path}")
try:
codebase_resource = codebase_resources.get(
rootfs_path=rootfs_path,
)
except ObjectDoesNotExist:
if rootfs_path not in missing_resources:
missing_resources.append(rootfs_path)
logger.info(f" installed file is missing: {rootfs_path}")
continue
if created_package not in codebase_resource.discovered_packages.all():
codebase_resource.discovered_packages.add(created_package)
codebase_resource.update(status=flag.SYSTEM_PACKAGE)
logger.info(f" added as system-package to: {purl}")
if has_hash_diff(install_file, codebase_resource):
if install_file.path not in modified_resources:
modified_resources.append(install_file.path)
created_package.update(
missing_resources=missing_resources,
modified_resources=modified_resources,
)
def scan_rootfs_for_system_packages(project, rootfs):
"""
Given a `project` Project and a `rootfs` RootFs, scan the `rootfs` for
installed system packages, and create a DiscoveredPackage for each.
Then for each installed DiscoveredPackage file, check if it exists
as a CodebaseResource. If exists, relate that CodebaseResource to its
DiscoveredPackage; otherwise, keep that as a missing file.
"""
if not rootfs.distro:
raise DistroNotFound("Distro not found.")
distro_id = rootfs.distro.identifier
if distro_id not in SUPPORTED_DISTROS:
raise DistroNotSupported(f'Distro "{distro_id}" is not supported.')
logger.info(f"rootfs location: {rootfs.location}")
installed_packages = rootfs.get_installed_packages(package_getter)
created_system_packages = []
seen_namespaces = []
for index, (purl, package) in enumerate(installed_packages):
logger.info(f"Creating package #{index}: {purl}")
created_system_packages.append(package)
seen_namespaces.append(package.namespace)
_create_system_package(project, purl, package)
namespace_counts = Counter(seen_namespaces)
# we overwite namespace only when there are multiple
# namespaces in the packages
if not len(namespace_counts.keys()) > 1:
return
most_seen_namespace = max(namespace_counts)
# if the distro_id is different from the namespace
# most seen in packages, we update all the package
# namespaces to the distro_id
if most_seen_namespace != distro_id:
for package in created_system_packages:
if package.namespace != distro_id:
package.update(namespace=distro_id)
def get_resource_with_md5(project, status):
"""
Return a queryset of CodebaseResource from a `project` that has a `status`,
a non-empty size, and md5.
"""
return (
project.codebaseresources.status(status=status)
.exclude(md5__exact="")
.exclude(size__exact=0)
)
def match_not_analyzed(
project,
reference_status=flag.SYSTEM_PACKAGE,
not_analyzed_status=flag.NOT_ANALYZED,
):
"""
Given a `project` Project :
1. Build an MD5 index of files assigned to a package that has a status of
`reference_status`
2. Attempt to match resources with status `not_analyzed_status` to that
index
3. Relate each matched CodebaseResource to the matching DiscoveredPackage and
set its status.
"""
known_resources = get_resource_with_md5(project=project, status=reference_status)
known_resources_by_md5_size = {
(
r.md5,
r.size,
): r
for r in known_resources
}
count = 0
matchables = get_resource_with_md5(project=project, status=not_analyzed_status)
for matchable in matchables:
key = (matchable.md5, matchable.size)
matched = known_resources_by_md5_size.get(key)
if matched is None:
continue
count += 1
package = matched.discovered_packages.all()[0]
matchable.discovered_packages.add(package)
matchable.update(status=reference_status)
def flag_uninteresting_codebase_resources(project):
"""
Flag any file that do not belong to any system package and determine if it's:
- A temp file
- Generated
- Log file of sorts (such as var) using few heuristics
"""
uninteresting_and_transient = (
"/tmp/", # noqa: S108
"/etc/",
"/proc/",
"/dev/",
"/run/",
"/lib/apk/db/", # alpine specific
)
lookups = Q()
for segment in uninteresting_and_transient:
lookups |= Q(rootfs_path__startswith=segment)
qs = project.codebaseresources.no_status()
qs.filter(lookups).update(status=flag.IGNORED_NOT_INTERESTING)
def flag_ignorable_codebase_resources(project):
"""
Flag codebase resource using the glob patterns from commoncode.ignore of
ignorable files/directories, if their paths match an ignorable pattern.
"""
lookups = Q()
for pattern in default_ignores.keys():
# Translate glob pattern to regex
translated_pattern = fnmatch.translate(pattern)
# PostgreSQL does not like parts of Python regex
if translated_pattern.startswith("(?s"):
translated_pattern = translated_pattern.replace("(?s", "(?")
lookups |= Q(rootfs_path__icontains=pattern)
lookups |= Q(rootfs_path__iregex=translated_pattern)
qs = project.codebaseresources.no_status()
qs.filter(lookups).update(status=flag.IGNORED_DEFAULT_IGNORES)
def flag_data_files_with_no_clues(project):
"""
Flag CodebaseResources that have a file type of `data` and no detected clues
to be uninteresting.
"""
lookup = Q(
file_type="data",
copyrights=[],
holders=[],
authors=[],
license_detections=[],
detected_license_expression="",
emails=[],
urls=[],
)
qs = project.codebaseresources
qs.filter(lookup).update(status=flag.IGNORED_DATA_FILE_NO_CLUES)
def flag_media_files_as_uninteresting(project):
"""Flag CodebaseResources that are media files to be uninteresting."""
qs = project.codebaseresources.no_status()
qs.filter(is_media=True).update(status=flag.IGNORED_MEDIA_FILE)
def get_rootfs_data(root_fs):
"""Return a mapping of rootfs-related data given a ``root_fs``."""
return {
"name": os.path.basename(root_fs.location),
"distro": root_fs.distro.to_dict() if root_fs.distro else {},
}