#
# Copyright 2019-Present Sonatype Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# encoding: utf-8
import time
from argparse import ArgumentParser
import certifi
from cyclonedx.model.bom import Bom
from cyclonedx.output import make_outputter
from cyclonedx.schema import OutputFormat, SchemaVersion
from rich.progress import Progress
from typing import Optional
from sonatype_iq_api_client import ApiClient, ApiThirdPartyScanResultDTO, ApplicationsApi, Configuration, \
ThirdPartyAnalysisApi
from sonatype_iq_api_client.exceptions import NotFoundException
from . import BaseCommand
from . import parser_selector
[docs]
class IqCommand(BaseCommand):
[docs]
def handle_args(self) -> int:
exit_code: int = 0
input_source_msg = "your python environment" if self.arguments.sbom_input_type == "ENV" else "provided specs"
with Progress() as progress:
task_validate_iq = progress.add_task(
description="[yellow]Checking out your Sonatype Lifecycle Server", start=True, total=10
)
task_parser = progress.add_task(
description=f"[yellow]Collecting packages in {input_source_msg}", start=True, total=10
)
task_query_iq = progress.add_task(
description="[yellow]Submitting to Sonatype Lifecycle for Policy Evaluation", start=True, total=10
)
config = Configuration(
host=self.arguments.iq_server_url,
username=self.arguments.iq_username,
password=self.arguments.iq_password,
ssl_ca_cert=certifi.where()
)
with ApiClient(config) as api_client:
apps_api = ApplicationsApi(api_client)
scan_api = ThirdPartyAnalysisApi(api_client)
progress.update(
task_validate_iq, completed=10,
description=f"[green]Sonatype Lifecycle Server at {self.arguments.iq_server_url} is up and "
f"accessible"
)
# task_parser
parser = parser_selector.get_parser(
self.arguments.sbom_input_type, self.arguments.sbom_input_source
)
total_packages_collected = len(parser.get_components())
progress.update(
task_parser, completed=10,
description=f'[green]Collected {total_packages_collected} packages from {input_source_msg}'
)
# Look up internal application ID
app_list = apps_api.get_applications(public_id=[self.arguments.iq_application_id])
if not app_list.applications or len(app_list.applications) != 1:
raise ValueError(
'There were {} matching Applications found in Sonatype Lifecycle for {}'.format(
len(app_list.applications) if app_list.applications else 0,
self.arguments.iq_application_id
)
)
internal_id = app_list.applications[0].id
if not internal_id:
raise ValueError('Application in Sonatype Lifecycle has no internal ID')
# Build BOM and serialise to XML
bom = Bom(components=set(parser.get_components()))
bom_xml = make_outputter(bom, OutputFormat.XML, SchemaVersion.V1_4).output_as_string()
# Submit scan
progress.start_task(task_query_iq)
ticket = scan_api.scan_components(
internal_id, 'cyclonedx', self.arguments.iq_scan_stage, bom_xml,
_content_type='application/xml'
)
# Extract scan ID from the last path segment of status_url
if not ticket.status_url:
raise RuntimeError('Scan returned no status URL')
scan_id = ticket.status_url.rstrip('/').split('/')[-1]
# Poll for results — IQ returns 404 while the scan is still processing
result = IqCommand._poll_scan_result(scan_api, internal_id, scan_id)
if result is None:
raise RuntimeError('Timed out waiting for Sonatype Lifecycle scan results after 300 seconds')
if result.policy_action == 'Failure':
progress.update(
task_query_iq, completed=10,
description='[red]Policy failures detected from Sonatype Lifecycle.'
)
exit_code = 1
elif result.policy_action == 'Warning':
progress.update(
task_query_iq, completed=10,
description='[yellow]Policy warnings detected from Sonatype Lifecycle.'
)
else:
progress.update(
task_query_iq, completed=10,
description='[green]Sonatype Lifecycle Policy Evaluation complete with no policy violations.'
)
print('')
print('Your Sonatype Lifecycle Report is available here:')
print(' HTML: {}/{}'.format(self.arguments.iq_server_url, result.report_html_url))
print(' PDF: {}/{}'.format(self.arguments.iq_server_url, result.report_pdf_url))
print('')
return exit_code
@staticmethod
[docs]
def _poll_scan_result(
scan_api: ThirdPartyAnalysisApi, internal_id: str, scan_id: str
) -> Optional[ApiThirdPartyScanResultDTO]:
for _ in range(30):
try:
result = scan_api.get_scan_status(internal_id, scan_id)
if result.is_error is not None:
return result
except NotFoundException:
pass
time.sleep(2)
return None
[docs]
def get_argument_parser_name(self) -> str:
return 'iq'
[docs]
def get_argument_parser_help(self) -> str:
return 'perform a scan backed by Sonatype Lifecycle'
[docs]
def setup_argument_parser(self, arg_parser: ArgumentParser) -> None:
parser_selector.add_parser_selector_arguments(arg_parser)
arg_parser.add_argument('-s', '--server-url',
help='Full http(s):// URL to your Sonatype Lifecycle server',
metavar='https://localhost:8070', required=True, dest='iq_server_url')
arg_parser.add_argument('-i', '--application-id',
help='Public Application ID in Sonatype Lifecycle',
metavar='APP_ID', required=True, dest='iq_application_id')
arg_parser.add_argument('-u', '--username',
help='Username for authentication to Sonatype Lifecycle',
metavar='USER_ID', required=True, dest='iq_username')
arg_parser.add_argument('-p', '--password',
help='Password for authentication to Sonatype Lifecycle',
metavar='PASSWORD', required=True, dest='iq_password')
arg_parser.add_argument('-st', '--stage', help='The stage for the report',
metavar='STAGE', required=False, dest='iq_scan_stage', default='source')