| # |
| # ISA_cve_plugin.py - CVE checker plugin, part of ISA FW |
| # |
| # Copyright (c) 2015 - 2016, Intel Corporation |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # |
| # * Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # * Neither the name of Intel Corporation nor the names of its contributors |
| # may be used to endorse or promote products derived from this software |
| # without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
| # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE |
| # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
| # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
| # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
| # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, |
| # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| import subprocess |
| import os, sys |
| import re |
| |
| CVEChecker = None |
| pkglist = "/cve_check_tool_pkglist" |
| |
| |
| class ISA_CVEChecker: |
| initialized = False |
| |
| def __init__(self, ISA_config): |
| self.cacert = ISA_config.cacert |
| self.reportdir = ISA_config.reportdir |
| self.timestamp = ISA_config.timestamp |
| self.logfile = ISA_config.logdir + "/isafw_cvelog" |
| self.report_name = ISA_config.reportdir + "/cve_report_" + \ |
| ISA_config.machine + "_" + ISA_config.timestamp |
| self.initialized = True |
| with open(self.logfile, 'a') as flog: |
| flog.write("\nPlugin ISA_CVEChecker initialized!\n") |
| output = "" |
| # check that cve-check-tool is installed |
| |
| def process_package(self, ISA_pkg): |
| if (self.initialized): |
| if (ISA_pkg.name and ISA_pkg.version and ISA_pkg.patch_files): |
| alias_pkgs_faux = [] |
| # need to compose faux format line for cve-check-tool |
| cve_patch_info = self.process_patch_list(ISA_pkg.patch_files) |
| pkgline_faux = ISA_pkg.name + "," + ISA_pkg.version + "," + cve_patch_info + ",\n" |
| if ISA_pkg.aliases: |
| for a in ISA_pkg.aliases: |
| alias_pkgs_faux.append( |
| a + "," + ISA_pkg.version + "," + cve_patch_info + ",\n") |
| pkglist_faux = pkglist + "_" + self.timestamp + ".faux" |
| with open(self.reportdir + pkglist_faux, 'a') as fauxfile: |
| fauxfile.write(pkgline_faux) |
| for a in alias_pkgs_faux: |
| fauxfile.write(a) |
| |
| with open(self.logfile, 'a') as flog: |
| flog.write("\npkg info: " + pkgline_faux) |
| else: |
| self.initialized = False |
| with open(self.logfile, 'a') as flog: |
| flog.write( |
| "Mandatory arguments such as pkg name, version and list of patches are not provided!\n") |
| flog.write("Not performing the call.\n") |
| else: |
| with open(self.logfile, 'a') as flog: |
| flog.write( |
| "Plugin hasn't initialized! Not performing the call.\n") |
| |
| def process_report(self): |
| if not os.path.isfile(self.reportdir + pkglist + "_" + self.timestamp + ".faux"): |
| return |
| if (self.initialized): |
| with open(self.logfile, 'a') as flog: |
| flog.write("Creating report in HTML format.\n") |
| result = self.process_report_type("html") |
| |
| with open(self.logfile, 'a') as flog: |
| flog.write("Creating report in CSV format.\n") |
| result = self.process_report_type("csv") |
| |
| pkglist_faux = pkglist + "_" + self.timestamp + ".faux" |
| os.remove(self.reportdir + pkglist_faux) |
| |
| with open(self.logfile, 'a') as flog: |
| flog.write("Creating report in XML format.\n") |
| self.write_report_xml(result) |
| |
| def write_report_xml(self, result): |
| try: |
| from lxml import etree |
| except ImportError: |
| try: |
| import xml.etree.cElementTree as etree |
| except ImportError: |
| import xml.etree.ElementTree as etree |
| num_tests = 0 |
| root = etree.Element('testsuite', name='CVE_Plugin', tests='1') |
| |
| if result : |
| num_tests = 1 |
| tcase = etree.SubElement( |
| root, 'testcase', classname='ISA_CVEChecker', name="Error in cve-check-tool") |
| etree.SubElement( tcase, 'failure', message=result, type='violation') |
| else: |
| with open(self.report_name + ".csv", 'r') as f: |
| for line in f: |
| num_tests += 1 |
| line = line.strip() |
| line_sp = line.split(',', 2) |
| if (len(line_sp) >= 3) and (line_sp[2].startswith('CVE')): |
| tcase = etree.SubElement( |
| root, 'testcase', classname='ISA_CVEChecker', name=line.split(',', 1)[0]) |
| etree.SubElement( |
| tcase, 'failure', message=line, type='violation') |
| else: |
| tcase = etree.SubElement( |
| root, 'testcase', classname='ISA_CVEChecker', name=line.split(',', 1)[0]) |
| |
| root.set('tests', str(num_tests)) |
| tree = etree.ElementTree(root) |
| output = self.report_name + '.xml' |
| try: |
| tree.write(output, encoding='UTF-8', |
| pretty_print=True, xml_declaration=True) |
| except TypeError: |
| tree.write(output, encoding='UTF-8', xml_declaration=True) |
| |
| def process_report_type(self, rtype): |
| # now faux file is ready and we can process it |
| args = "" |
| result = "" |
| tool_stderr_value = "" |
| args += "cve-check-tool " |
| if self.cacert: |
| args += "--cacert '%s' " % self.cacert |
| if rtype != "html": |
| args += "-c " |
| rtype = "csv" |
| pkglist_faux = pkglist + "_" + self.timestamp + ".faux" |
| args += "-a -t faux '" + self.reportdir + pkglist_faux + "'" |
| with open(self.logfile, 'a') as flog: |
| flog.write("Args: " + args) |
| try: |
| popen = subprocess.Popen( |
| args, shell=True, env=os.environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| result = popen.communicate() |
| except: |
| tool_stderr_value = "Error in executing cve-check-tool" + str(sys.exc_info()) |
| with open(self.logfile, 'a') as flog: |
| flog.write("Error in executing cve-check-tool: " + |
| str(sys.exc_info())) |
| else: |
| stdout_value = result[0] |
| tool_stderr_value = result[1].decode('utf-8') |
| if not tool_stderr_value and popen.returncode == 0: |
| report = self.report_name + "." + rtype |
| with open(report, 'wb') as freport: |
| freport.write(stdout_value) |
| else: |
| tool_stderr_value = tool_stderr_value + \ |
| "\ncve-check-tool terminated with exit code " + str(popen.returncode) |
| return tool_stderr_value |
| |
| def process_patch_list(self, patch_files): |
| patch_info = "" |
| for patch in patch_files: |
| patch1 = patch.partition("cve") |
| if (patch1[0] == patch): |
| # no cve substring, try CVE |
| patch1 = patch.partition("CVE") |
| if (patch1[0] == patch): |
| continue |
| patchstripped = patch1[2].split('-') |
| try: |
| patch_info += " CVE-" + \ |
| patchstripped[1] + "-" + re.findall('\d+', patchstripped[2])[0] |
| except IndexError: |
| # string parsing attempt failed, so just skip this patch |
| continue |
| return patch_info |
| |
| # ======== supported callbacks from ISA ============= # |
| |
| |
| def init(ISA_config): |
| global CVEChecker |
| CVEChecker = ISA_CVEChecker(ISA_config) |
| |
| |
| def getPluginName(): |
| return "ISA_CVEChecker" |
| |
| |
| def process_package(ISA_pkg): |
| global CVEChecker |
| return CVEChecker.process_package(ISA_pkg) |
| |
| |
| def process_report(): |
| global CVEChecker |
| return CVEChecker.process_report() |
| |
| # ==================================================== # |