# encoding: utf-8
#
# Copyright 2019-Present Sonatype Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
from argparse import ArgumentParser
import certifi
from decimal import Decimal
from pathlib import Path
from typing import Dict, Iterable, List, Set
from cyclonedx.model import XsUri
from cyclonedx.model.bom import Bom
from cyclonedx.model.component import Component
from cyclonedx.model.impact_analysis import ImpactAnalysisAffectedStatus
from cyclonedx.model.vulnerability import (
BomTarget, BomTargetVersionRange, Vulnerability, VulnerabilityAdvisory,
VulnerabilityRating, VulnerabilityReference, VulnerabilityScoreSource,
VulnerabilitySeverity, VulnerabilitySource
)
from cyclonedx.output import make_outputter
from cyclonedx.schema import OutputFormat, SchemaVersion
from sonatype_guide_api_client import ApiClient, Configuration, OSSIndexCompatibilityApi, OssiVulnerabilityPost, \
PurlRequestPost, ComponentReportPost
from sonatype_guide_api_client.exceptions import UnauthorizedException
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, TaskID
from rich.table import Table
from rich.tree import Tree
from . import BaseCommand
from . import parser_selector
from jake._internal.parsers import BaseJakeParser
[docs]
_SONATYPE_GUIDE_SOURCE = 'Sonatype Guide'
[docs]
class OssCommand(BaseCommand):
[docs]
def handle_args(self) -> int:
self._console = Console()
try:
components, vulnerabilities, guide_results = self._perform_scan()
except UnauthorizedException:
self._console.print(
'[red]Authentication failed: Sonatype Guide requires a username and token.\n'
'Set SONATYPE_GUIDE_USERNAME and SONATYPE_GUIDE_TOKEN environment variables, '
'or pass -u / --token on the command line.'
)
return 1
print('')
self._print_oss_index_report(components=components, vulnerabilities=vulnerabilities)
if self.arguments.oss_output_file:
cyclonedx_output = make_outputter(
OssCommand._build_bom(components=components, vulnerabilities=vulnerabilities),
OutputFormat[str(self.arguments.oss_output_format).upper()],
SchemaVersion.from_version(str(self.arguments.oss_schema_version))
)
output_filename = os.path.realpath(self.arguments.oss_output_file)
cyclonedx_output.output_to_file(filename=output_filename, allow_overwrite=True)
print('')
print('CycloneDX has been written to {}'.format(output_filename))
exit_code: int = 0
if not self.arguments.warn_only:
for report in guide_results:
if report.vulnerabilities:
exit_code = 1
break
return exit_code
[docs]
def _apply_whitelist(self, guide_results: List[ComponentReportPost]) -> None:
if not self.arguments.oss_whitelist_json_file:
return
with open(self.arguments.oss_whitelist_json_file) as f:
json_data = json.load(f)
whitelisted_ids = {entry["id"] for entry in json_data.get("ignore", [])}
if whitelisted_ids:
for guide_report in guide_results:
if guide_report.vulnerabilities:
guide_report.vulnerabilities = [
v for v in guide_report.vulnerabilities if v.id not in whitelisted_ids
]
[docs]
def _process_components(
self,
parser: BaseJakeParser,
guide_results: List[ComponentReportPost],
progress: Progress,
task: TaskID,
) -> tuple[List[Component], List[Vulnerability]]:
components: List[Component] = []
vulnerabilities: List[Vulnerability] = []
for component in parser.get_components():
if not component.purl:
continue
purl_str = str(component.purl)
matching = [r for r in guide_results if r.coordinates == purl_str]
if not matching:
progress.update(task, advance=1)
components.append(component)
continue
report: ComponentReportPost = matching[0]
if report.vulnerabilities:
for vuln in report.vulnerabilities:
vulnerabilities.append(OssCommand._build_vulnerability(component, vuln))
components.append(component)
progress.update(task, advance=1)
return components, vulnerabilities
@staticmethod
[docs]
def _build_ratings(vuln: OssiVulnerabilityPost) -> List[VulnerabilityRating]:
if not vuln.cvss_score:
return []
return [
VulnerabilityRating(
source=VulnerabilitySource(
name=_SONATYPE_GUIDE_SOURCE, url=XsUri(vuln.reference or '')
),
score=Decimal(str(vuln.cvss_score)),
severity=VulnerabilitySeverity.get_from_cvss_scores((vuln.cvss_score,)),
method=VulnerabilityScoreSource.get_from_vector(
vector=vuln.cvss_vector
) if vuln.cvss_vector else None,
vector=vuln.cvss_vector
)
]
@staticmethod
[docs]
def _build_vulnerability(component: Component, vuln: OssiVulnerabilityPost) -> Vulnerability:
ratings = OssCommand._build_ratings(vuln)
cwes = None
if vuln.cwe:
try:
cwes = [int(vuln.cwe[4:])]
except ValueError:
pass # ignore cases where conversion to int fails
vulnerability: Vulnerability = Vulnerability(
bom_ref=vuln.id,
id=vuln.id,
source=VulnerabilitySource(
name=_SONATYPE_GUIDE_SOURCE, url=XsUri(vuln.reference or '')
),
cwes=cwes,
description=vuln.title,
detail=vuln.description,
ratings=ratings,
references=[
VulnerabilityReference(
id=vuln.display_name or '', source=VulnerabilitySource(
name=_SONATYPE_GUIDE_SOURCE, url=XsUri(vuln.reference or '')
)
)
]
)
if vuln.external_references:
advisories: Set[VulnerabilityAdvisory] = set()
for ext_ref_url in vuln.external_references:
advisories.add(VulnerabilityAdvisory(url=XsUri(uri=ext_ref_url)))
vulnerability.advisories = advisories
vulnerability.affects.add(
BomTarget(
ref=str(component.bom_ref),
versions=[
BomTargetVersionRange(
version=component.version, status=ImpactAnalysisAffectedStatus.AFFECTED
)
]
)
)
return vulnerability
[docs]
def get_argument_parser_name(self) -> str:
return 'guide'
[docs]
def get_argument_parser_help(self) -> str:
return 'perform a scan backed by Sonatype Guide'
[docs]
def setup_argument_parser(self, arg_parser: ArgumentParser) -> None:
parser_selector.add_parser_selector_arguments(arg_parser)
default_username = os.environ.get('SONATYPE_GUIDE_USERNAME') or os.environ.get('OSS_INDEX_USERNAME')
default_token = os.environ.get('SONATYPE_GUIDE_TOKEN') or os.environ.get('OSS_INDEX_TOKEN')
arg_parser.add_argument('-u', '--username',
help='Sonatype Guide username/email '
'(env var: SONATYPE_GUIDE_USERNAME; '
'OSS_INDEX_USERNAME accepted as a fallback)',
metavar='USERNAME', dest='oss_username',
default=default_username)
arg_parser.add_argument('--token',
help='Sonatype Guide API token '
'(env var: SONATYPE_GUIDE_TOKEN; '
'OSS_INDEX_TOKEN accepted as a fallback)',
metavar='TOKEN', dest='oss_token',
default=default_token)
arg_parser.add_argument('-o', '--output-file',
help='Specify a file to output the SBOM to. If not specified the '
'report will be output to the console. '
'STDOUT is not supported.',
metavar='PATH/TO/FILE', dest='oss_output_file', default=None)
arg_parser.add_argument('--output-format', help='SBOM output format (default = xml)', choices={'json', 'xml'},
default='xml', dest='oss_output_format')
arg_parser.add_argument('--schema-version',
help=f'CycloneDX schema version to use (default = '
f'{SchemaVersion.V1_6.to_version()})',
choices={'1.6', '1.5', '1.4', '1.3', '1.2', '1.1', '1.0'},
default=f'{SchemaVersion.V1_6.to_version()}',
dest='oss_schema_version')
arg_parser.add_argument('--whitelist', help='Set path to whitelist json file', type=Path,
dest='oss_whitelist_json_file')
@staticmethod
[docs]
def _build_bom(components: Iterable[Component], vulnerabilities: Iterable[Vulnerability]) -> Bom:
return Bom(components=set(components), vulnerabilities=set(vulnerabilities))
[docs]
def _print_oss_index_report(self, components: List[Component], vulnerabilities: List[Vulnerability]) -> None:
vuln_map: Dict[str, List[Vulnerability]] = {str(c.bom_ref): [] for c in components}
for v in vulnerabilities:
for target in v.affects:
if target.ref in vuln_map:
vuln_map[target.ref].append(v)
total_vulnerabilities = 0
total_packages = len(components)
component: Component
i: int = 1
for component in components:
comp_vulns = vuln_map.get(str(component.bom_ref), [])
if bool(comp_vulns):
self._console.print(
f"[{i}/{total_packages}] - {component.name}@{component.version} [VULNERABLE]",
style=OssCommand._get_color_for_cvss_score(
cvss_score=OssCommand._get_max_cvss_score(
component=component, vulnerabilities=comp_vulns)
)
)
total_vulnerabilities += len(comp_vulns)
tree = Tree(f'Vulnerability Details for [bright_white]{component.name}@{component.version}[white]')
for v in comp_vulns:
OssCommand._print_vulnerability(tree=tree, v=v)
self._console.print(tree)
i += 1
self._console.print('')
table = Table(title='Summary')
table.add_column("Audited Dependencies", justify="left", no_wrap=True)
table.add_column("Vulnerabilities Found", justify="left", no_wrap=True)
table.add_row('{}'.format(len(components)), f'{total_vulnerabilities}')
self._console.print(table)
@staticmethod
[docs]
def _get_max_cvss_score_for_vulnerability(vulnerability: Vulnerability) -> float:
max_score: float = 0.0
for rating in vulnerability.ratings:
if rating.score and float(rating.score) > max_score:
max_score = float(rating.score)
return max_score
@staticmethod
[docs]
def _get_max_cvss_score(component: Component, vulnerabilities: List[Vulnerability]) -> float:
max_cvss_score: float = 0.0
for v in vulnerabilities:
score = OssCommand._get_max_cvss_score_for_vulnerability(vulnerability=v)
if score > max_cvss_score:
max_cvss_score = score
return max_cvss_score
@staticmethod
[docs]
def _print_vulnerability(tree: Tree, v: Vulnerability) -> None:
b = tree.add(
f'[bright_red](!) ID: {v.id}'
)
severity_color = OssCommand._get_color_for_cvss_score(
OssCommand._get_max_cvss_score_for_vulnerability(vulnerability=v)
)
ratings_text = os.linesep.join([
f' - [{severity_color}]{rating.score:.1f} {rating.severity.name if rating.severity else ""} - '
f'Vector: {rating.vector if rating.vector else "Unknown"}, '
f'CWEs: {",".join(list(map(lambda cwe: str(cwe), v.cwes))) if v.cwes else "None Recorded"}'
f'[bright_white]'
for rating in v.ratings
])
references_text = os.linesep.join([
f' - {reference.source.name if reference.source and reference.source.name else ""} '
f'[Ref: {reference.id}]{os.linesep}'
f' URL: {reference.source.url if reference.source and reference.source.url else "None"}'
for reference in v.references
])
content = f"""
[bright_white]{v.description}
{v.detail}
Ratings:
{ratings_text}
References:
{references_text}
"""
b.add(Panel(content, title=f'[bright_white]{v.id}', title_align="left"))
@staticmethod
[docs]
def _get_color_for_cvss_score(cvss_score: float = 0.0) -> str:
if cvss_score >= 9.0:
return 'bright_red'
elif cvss_score >= 7.0:
return 'bright_yellow'
elif cvss_score >= 4.0:
return 'yellow3'
elif cvss_score > 0.0:
return 'bright_cyan'
else:
return 'bright_green'
@staticmethod
[docs]
def _get_severity_for_cvss_score(cvss_score: float) -> str:
if cvss_score >= 9.0:
return 'Critical'
elif cvss_score >= 7.0:
return 'High'
elif cvss_score >= 4.0:
return 'Medium'
elif cvss_score > 0.0:
return 'Low'
else:
return 'None'
[docs]
class DdtCommand(OssCommand):
"""Deprecated alias for OssCommand that registers as the 'ddt' subcommand."""
[docs]
def get_argument_parser_name(self) -> str:
return 'ddt'
[docs]
def get_argument_parser_help(self) -> str:
return '(DEPRECATED: use guide instead) perform a scan backed by Sonatype Guide'