| #!/usr/bin/env python3 |
| |
| # Copyright (c) 2013 Intel Corporation |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License version 2 as |
| # published by the Free Software Foundation. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License along |
| # with this program; if not, write to the Free Software Foundation, Inc., |
| # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| |
| # DESCRIPTION |
| # This script runs tests defined in meta/lib/oeqa/selftest/ |
| # It's purpose is to automate the testing of different bitbake tools. |
| # To use it you just need to source your build environment setup script and |
| # add the meta-selftest layer to your BBLAYERS. |
| # Call the script as: "oe-selftest -a" to run all the tests in meta/lib/oeqa/selftest/ |
| # Call the script as: "oe-selftest -r <module>.<Class>.<method>" to run just a single test |
| # E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/oeqa/selftest/bblayers.py |
| |
| |
| import os |
| import sys |
| import unittest |
| import logging |
| import argparse |
| import subprocess |
| import time as t |
| import re |
| import fnmatch |
| import collections |
| import imp |
| |
| sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib') |
| import scriptpath |
| scriptpath.add_bitbake_lib_path() |
| scriptpath.add_oe_lib_path() |
| import argparse_oe |
| |
| import oeqa.selftest |
| import oeqa.utils.ftools as ftools |
| from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer |
| from oeqa.utils.metadata import metadata_from_bb, write_metadata_file |
| from oeqa.selftest.base import oeSelfTest, get_available_machines |
| |
| try: |
| import xmlrunner |
| from xmlrunner.result import _XMLTestResult as TestResult |
| from xmlrunner import XMLTestRunner as _TestRunner |
| except ImportError: |
| # use the base runner instead |
| from unittest import TextTestResult as TestResult |
| from unittest import TextTestRunner as _TestRunner |
| |
| log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S") |
| |
| def logger_create(): |
| log_file = log_prefix + ".log" |
| if os.path.lexists("oe-selftest.log"): |
| os.remove("oe-selftest.log") |
| os.symlink(log_file, "oe-selftest.log") |
| |
| log = logging.getLogger("selftest") |
| log.setLevel(logging.DEBUG) |
| |
| fh = logging.FileHandler(filename=log_file, mode='w') |
| fh.setLevel(logging.DEBUG) |
| |
| ch = logging.StreamHandler(sys.stdout) |
| ch.setLevel(logging.INFO) |
| |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| fh.setFormatter(formatter) |
| ch.setFormatter(formatter) |
| |
| log.addHandler(fh) |
| log.addHandler(ch) |
| |
| return log |
| |
| log = logger_create() |
| |
| def get_args_parser(): |
| description = "Script that runs unit tests against bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information." |
| parser = argparse_oe.ArgumentParser(description=description) |
| group = parser.add_mutually_exclusive_group(required=True) |
| group.add_argument('-r', '--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: <module>.<class>.<test_method>') |
| group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests') |
| group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.') |
| group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.') |
| parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing") |
| parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from") |
| parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement") |
| parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement") |
| group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*', |
| help='run-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>') |
| group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*', |
| help='list-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>') |
| group.add_argument('-l', '--list-tests', required=False, action="store_true", dest="list_tests", default=False, |
| help='List all available tests.') |
| group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true", |
| help='List all tags that have been set to test cases.') |
| parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None, |
| help='Run tests on different machines (random/all).') |
| parser.add_argument('--repository', required=False, dest='repository', default='', action='store', |
| help='Submit test results to a repository') |
| return parser |
| |
| builddir = None |
| |
| |
| def preflight_check(): |
| |
| global builddir |
| |
| log.info("Checking that everything is in order before running the tests") |
| |
| if not os.environ.get("BUILDDIR"): |
| log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?") |
| return False |
| |
| builddir = os.environ.get("BUILDDIR") |
| if os.getcwd() != builddir: |
| log.info("Changing cwd to %s" % builddir) |
| os.chdir(builddir) |
| |
| if not "meta-selftest" in get_bb_var("BBLAYERS"): |
| log.warn("meta-selftest layer not found in BBLAYERS, adding it") |
| meta_selftestdir = os.path.join( |
| get_bb_var("BBLAYERS_FETCH_DIR"), |
| 'meta-selftest') |
| if os.path.isdir(meta_selftestdir): |
| runCmd("bitbake-layers add-layer %s" %meta_selftestdir) |
| else: |
| log.error("could not locate meta-selftest in:\n%s" |
| %meta_selftestdir) |
| return False |
| |
| if "buildhistory.bbclass" in get_bb_var("BBINCLUDED"): |
| log.error("You have buildhistory enabled already and this isn't recommended for selftest, please disable it first.") |
| return False |
| |
| if get_bb_var("PRSERV_HOST"): |
| log.error("Please unset PRSERV_HOST in order to run oe-selftest") |
| return False |
| |
| if get_bb_var("SANITY_TESTED_DISTROS"): |
| log.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest") |
| return False |
| |
| log.info("Running bitbake -p") |
| runCmd("bitbake -p") |
| |
| return True |
| |
| def add_include(): |
| global builddir |
| if "#include added by oe-selftest.py" \ |
| not in ftools.read_file(os.path.join(builddir, "conf/local.conf")): |
| log.info("Adding: \"include selftest.inc\" in local.conf") |
| ftools.append_file(os.path.join(builddir, "conf/local.conf"), \ |
| "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc") |
| |
| if "#include added by oe-selftest.py" \ |
| not in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")): |
| log.info("Adding: \"include bblayers.inc\" in bblayers.conf") |
| ftools.append_file(os.path.join(builddir, "conf/bblayers.conf"), \ |
| "\n#include added by oe-selftest.py\ninclude bblayers.inc") |
| |
| def remove_include(): |
| global builddir |
| if builddir is None: |
| return |
| if "#include added by oe-selftest.py" \ |
| in ftools.read_file(os.path.join(builddir, "conf/local.conf")): |
| log.info("Removing the include from local.conf") |
| ftools.remove_from_file(os.path.join(builddir, "conf/local.conf"), \ |
| "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc") |
| |
| if "#include added by oe-selftest.py" \ |
| in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")): |
| log.info("Removing the include from bblayers.conf") |
| ftools.remove_from_file(os.path.join(builddir, "conf/bblayers.conf"), \ |
| "\n#include added by oe-selftest.py\ninclude bblayers.inc") |
| |
| def remove_inc_files(): |
| global builddir |
| if builddir is None: |
| return |
| try: |
| os.remove(os.path.join(builddir, "conf/selftest.inc")) |
| for root, _, files in os.walk(get_test_layer()): |
| for f in files: |
| if f == 'test_recipe.inc': |
| os.remove(os.path.join(root, f)) |
| except OSError as e: |
| pass |
| |
| for incl_file in ['conf/bblayers.inc', 'conf/machine.inc']: |
| try: |
| os.remove(os.path.join(builddir, incl_file)) |
| except: |
| pass |
| |
| |
| def get_tests_modules(include_hidden=False): |
| modules_list = list() |
| for modules_path in oeqa.selftest.__path__: |
| for (p, d, f) in os.walk(modules_path): |
| files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py']) |
| for f in files: |
| submodules = p.split("selftest")[-1] |
| module = "" |
| if submodules: |
| module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0] |
| else: |
| module = 'oeqa.selftest.' + f.split('.py')[0] |
| if module not in modules_list: |
| modules_list.append(module) |
| return modules_list |
| |
| |
| def get_tests(exclusive_modules=[], include_hidden=False): |
| test_modules = list() |
| for x in exclusive_modules: |
| test_modules.append('oeqa.selftest.' + x) |
| if not test_modules: |
| inc_hidden = include_hidden |
| test_modules = get_tests_modules(inc_hidden) |
| |
| return test_modules |
| |
| |
| class Tc: |
| def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None): |
| self.tcname = tcname |
| self.tcclass = tcclass |
| self.tcmodule = tcmodule |
| self.tcid = tcid |
| # A test case can have multiple tags (as tuples) otherwise str will suffice |
| self.tctag = tctag |
| self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname]) |
| |
| |
| def get_tests_from_module(tmod): |
| tlist = [] |
| prefix = 'oeqa.selftest.' |
| |
| try: |
| import importlib |
| modlib = importlib.import_module(tmod) |
| for mod in list(vars(modlib).values()): |
| if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest: |
| for test in dir(mod): |
| if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'): |
| # Get test case id and feature tag |
| # NOTE: if testcase decorator or feature tag not set will throw error |
| try: |
| tid = vars(mod)[test].test_case |
| except: |
| print('DEBUG: tc id missing for ' + str(test)) |
| tid = None |
| try: |
| ttag = vars(mod)[test].tag__feature |
| except: |
| # print('DEBUG: feature tag missing for ' + str(test)) |
| ttag = None |
| |
| # NOTE: for some reason lstrip() doesn't work for mod.__module__ |
| tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag)) |
| except: |
| pass |
| |
| return tlist |
| |
| |
| def get_all_tests(): |
| # Get all the test modules (except the hidden ones) |
| testlist = [] |
| tests_modules = get_tests_modules() |
| # Get all the tests from modules |
| for tmod in sorted(tests_modules): |
| testlist += get_tests_from_module(tmod) |
| return testlist |
| |
| |
| def get_testsuite_by(criteria, keyword): |
| # Get a testsuite based on 'keyword' |
| # criteria: name, class, module, id, tag |
| # keyword: a list of tests, classes, modules, ids, tags |
| |
| ts = [] |
| all_tests = get_all_tests() |
| |
| def get_matches(values): |
| # Get an item and return the ones that match with keyword(s) |
| # values: the list of items (names, modules, classes...) |
| result = [] |
| remaining = values[:] |
| for key in keyword: |
| found = False |
| if key in remaining: |
| # Regular matching of exact item |
| result.append(key) |
| remaining.remove(key) |
| found = True |
| else: |
| # Wildcard matching |
| pattern = re.compile(fnmatch.translate(r"%s" % key)) |
| added = [x for x in remaining if pattern.match(x)] |
| if added: |
| result.extend(added) |
| remaining = [x for x in remaining if x not in added] |
| found = True |
| if not found: |
| log.error("Failed to find test: %s" % key) |
| |
| return result |
| |
| if criteria == 'name': |
| names = get_matches([ tc.tcname for tc in all_tests ]) |
| ts = [ tc for tc in all_tests if tc.tcname in names ] |
| |
| elif criteria == 'class': |
| classes = get_matches([ tc.tcclass for tc in all_tests ]) |
| ts = [ tc for tc in all_tests if tc.tcclass in classes ] |
| |
| elif criteria == 'module': |
| modules = get_matches([ tc.tcmodule for tc in all_tests ]) |
| ts = [ tc for tc in all_tests if tc.tcmodule in modules ] |
| |
| elif criteria == 'id': |
| ids = get_matches([ str(tc.tcid) for tc in all_tests ]) |
| ts = [ tc for tc in all_tests if str(tc.tcid) in ids ] |
| |
| elif criteria == 'tag': |
| values = set() |
| for tc in all_tests: |
| # tc can have multiple tags (as tuple) otherwise str will suffice |
| if isinstance(tc.tctag, tuple): |
| values |= { str(tag) for tag in tc.tctag } |
| else: |
| values.add(str(tc.tctag)) |
| |
| tags = get_matches(list(values)) |
| |
| for tc in all_tests: |
| for tag in tags: |
| if isinstance(tc.tctag, tuple) and tag in tc.tctag: |
| ts.append(tc) |
| elif tag == tc.tctag: |
| ts.append(tc) |
| |
| # Remove duplicates from the list |
| ts = list(set(ts)) |
| |
| return ts |
| |
| |
| def list_testsuite_by(criteria, keyword): |
| # Get a testsuite based on 'keyword' |
| # criteria: name, class, module, id, tag |
| # keyword: a list of tests, classes, modules, ids, tags |
| def tc_key(t): |
| if t[0] is None: |
| return (0,) + t[1:] |
| return t |
| # tcid may be None if no ID was assigned, in which case sorted() will throw |
| # a TypeError as Python 3 does not allow comparison (<,<=,>=,>) of |
| # heterogeneous types, handle this by using a custom key generator |
| ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) \ |
| for tc in get_testsuite_by(criteria, keyword) ], key=tc_key) |
| print('_' * 150) |
| for t in ts: |
| if isinstance(t[1], (tuple, list)): |
| print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4])) |
| else: |
| print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t) |
| print('_' * 150) |
| print('Filtering by:\t %s' % criteria) |
| print('Looking for:\t %s' % ', '.join(str(x) for x in keyword)) |
| print('Total found:\t %s' % len(ts)) |
| |
| |
| def list_tests(): |
| # List all available oe-selftest tests |
| |
| ts = get_all_tests() |
| |
| print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test')) |
| print('_' * 80) |
| for t in ts: |
| if isinstance(t.tctag, (tuple, list)): |
| print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, t.tcname]))) |
| else: |
| print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname]))) |
| print('_' * 80) |
| print('Total found:\t %s' % len(ts)) |
| |
| def list_tags(): |
| # Get all tags set to test cases |
| # This is useful when setting tags to test cases |
| # The list of tags should be kept as minimal as possible |
| tags = set() |
| all_tests = get_all_tests() |
| |
| for tc in all_tests: |
| if isinstance(tc.tctag, (tuple, list)): |
| tags.update(set(tc.tctag)) |
| else: |
| tags.add(tc.tctag) |
| |
| print('Tags:\t%s' % ', '.join(str(x) for x in tags)) |
| |
| def coverage_setup(coverage_source, coverage_include, coverage_omit): |
| """ Set up the coverage measurement for the testcases to be run """ |
| import datetime |
| import subprocess |
| global builddir |
| pokydir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) |
| curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", "HEAD"]).decode('utf-8') |
| coveragerc = "%s/.coveragerc" % builddir |
| data_file = "%s/.coverage." % builddir |
| data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S') |
| if os.path.isfile(data_file): |
| os.remove(data_file) |
| with open(coveragerc, 'w') as cps: |
| cps.write("# Generated with command '%s'\n" % " ".join(sys.argv)) |
| cps.write("# HEAD commit %s\n" % curcommit.strip()) |
| cps.write("[run]\n") |
| cps.write("data_file = %s\n" % data_file) |
| cps.write("branch = True\n") |
| # Measure just BBLAYERS, scripts and bitbake folders |
| cps.write("source = \n") |
| if coverage_source: |
| for directory in coverage_source: |
| if not os.path.isdir(directory): |
| log.warn("Directory %s is not valid.", directory) |
| cps.write(" %s\n" % directory) |
| else: |
| for layer in get_bb_var('BBLAYERS').split(): |
| cps.write(" %s\n" % layer) |
| cps.write(" %s\n" % os.path.dirname(os.path.realpath(__file__))) |
| cps.write(" %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake')) |
| |
| if coverage_include: |
| cps.write("include = \n") |
| for pattern in coverage_include: |
| cps.write(" %s\n" % pattern) |
| if coverage_omit: |
| cps.write("omit = \n") |
| for pattern in coverage_omit: |
| cps.write(" %s\n" % pattern) |
| |
| return coveragerc |
| |
| def coverage_report(): |
| """ Loads the coverage data gathered and reports it back """ |
| try: |
| # Coverage4 uses coverage.Coverage |
| from coverage import Coverage |
| except: |
| # Coverage under version 4 uses coverage.coverage |
| from coverage import coverage as Coverage |
| |
| import io as StringIO |
| from coverage.misc import CoverageException |
| |
| cov_output = StringIO.StringIO() |
| # Creating the coverage data with the setting from the configuration file |
| cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START')) |
| try: |
| # Load data from the data file specified in the configuration |
| cov.load() |
| # Store report data in a StringIO variable |
| cov.report(file = cov_output, show_missing=False) |
| log.info("\n%s" % cov_output.getvalue()) |
| except CoverageException as e: |
| # Show problems with the reporting. Since Coverage4 not finding any data to report raises an exception |
| log.warn("%s" % str(e)) |
| finally: |
| cov_output.close() |
| |
| |
| def main(): |
| parser = get_args_parser() |
| args = parser.parse_args() |
| |
| # Add <layer>/lib to sys.path, so layers can add selftests |
| log.info("Running bitbake -e to get BBPATH") |
| bbpath = get_bb_var('BBPATH').split(':') |
| layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)] |
| sys.path.extend(layer_libdirs) |
| imp.reload(oeqa.selftest) |
| |
| # act like bitbake and enforce en_US.UTF-8 locale |
| os.environ["LC_ALL"] = "en_US.UTF-8" |
| |
| if args.run_tests_by and len(args.run_tests_by) >= 2: |
| valid_options = ['name', 'class', 'module', 'id', 'tag'] |
| if args.run_tests_by[0] not in valid_options: |
| print('--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.run_tests_by[0]) |
| return 1 |
| else: |
| criteria = args.run_tests_by[0] |
| keyword = args.run_tests_by[1:] |
| ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ]) |
| if not ts: |
| return 1 |
| |
| if args.list_tests_by and len(args.list_tests_by) >= 2: |
| valid_options = ['name', 'class', 'module', 'id', 'tag'] |
| if args.list_tests_by[0] not in valid_options: |
| print('--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.list_tests_by[0]) |
| return 1 |
| else: |
| criteria = args.list_tests_by[0] |
| keyword = args.list_tests_by[1:] |
| list_testsuite_by(criteria, keyword) |
| |
| if args.list_tests: |
| list_tests() |
| |
| if args.list_tags: |
| list_tags() |
| |
| if args.list_allclasses: |
| args.list_modules = True |
| |
| if args.list_modules: |
| log.info('Listing all available test modules:') |
| testslist = get_tests(include_hidden=True) |
| for test in testslist: |
| module = test.split('oeqa.selftest.')[-1] |
| info = '' |
| if module.startswith('_'): |
| info = ' (hidden)' |
| print(module + info) |
| if args.list_allclasses: |
| try: |
| import importlib |
| modlib = importlib.import_module(test) |
| for v in vars(modlib): |
| t = vars(modlib)[v] |
| if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest: |
| print(" --", v) |
| for method in dir(t): |
| if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable): |
| print(" -- --", method) |
| |
| except (AttributeError, ImportError) as e: |
| print(e) |
| pass |
| |
| if args.run_tests or args.run_all_tests or args.run_tests_by: |
| if not preflight_check(): |
| return 1 |
| |
| if args.run_tests_by: |
| testslist = ts |
| else: |
| testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False) |
| |
| suite = unittest.TestSuite() |
| loader = unittest.TestLoader() |
| loader.sortTestMethodsUsing = None |
| runner = TestRunner(verbosity=2, |
| resultclass=buildResultClass(args)) |
| # we need to do this here, otherwise just loading the tests |
| # will take 2 minutes (bitbake -e calls) |
| oeSelfTest.testlayer_path = get_test_layer() |
| for test in testslist: |
| log.info("Loading tests from: %s" % test) |
| try: |
| suite.addTests(loader.loadTestsFromName(test)) |
| except AttributeError as e: |
| log.error("Failed to import %s" % test) |
| log.error(e) |
| return 1 |
| add_include() |
| |
| if args.machine: |
| # Custom machine sets only weak default values (??=) for MACHINE in machine.inc |
| # This let test cases that require a specific MACHINE to be able to override it, using (?= or =) |
| log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine) |
| if args.machine == 'random': |
| os.environ['CUSTOMMACHINE'] = 'random' |
| result = runner.run(suite) |
| else: # all |
| machines = get_available_machines() |
| for m in machines: |
| log.info('Run tests with custom MACHINE set to: %s' % m) |
| os.environ['CUSTOMMACHINE'] = m |
| result = runner.run(suite) |
| else: |
| result = runner.run(suite) |
| |
| log.info("Finished") |
| |
| if args.repository: |
| import git |
| # Commit tests results to repository |
| metadata = metadata_from_bb() |
| git_dir = os.path.join(os.getcwd(), 'selftest') |
| if not os.path.isdir(git_dir): |
| os.mkdir(git_dir) |
| |
| log.debug('Checking for git repository in %s' % git_dir) |
| try: |
| repo = git.Repo(git_dir) |
| except git.exc.InvalidGitRepositoryError: |
| log.debug("Couldn't find git repository %s; " |
| "cloning from %s" % (git_dir, args.repository)) |
| repo = git.Repo.clone_from(args.repository, git_dir) |
| |
| r_branches = repo.git.branch(r=True) |
| r_branches = set(r_branches.replace('origin/', '').split()) |
| l_branches = {str(branch) for branch in repo.branches} |
| branch = '%s/%s/%s' % (metadata['hostname'], |
| metadata['layers']['meta'].get('branch', '(nogit)'), |
| metadata['config']['MACHINE']) |
| |
| if branch in l_branches: |
| log.debug('Found branch in local repository, checking out') |
| repo.git.checkout(branch) |
| elif branch in r_branches: |
| log.debug('Found branch in remote repository, checking' |
| ' out and pulling') |
| repo.git.checkout(branch) |
| repo.git.pull() |
| else: |
| log.debug('New branch %s' % branch) |
| repo.git.checkout('master') |
| repo.git.checkout(b=branch) |
| |
| cleanResultsDir(repo) |
| xml_dir = os.path.join(os.getcwd(), log_prefix) |
| copyResultFiles(xml_dir, git_dir, repo) |
| metadata_file = os.path.join(git_dir, 'metadata.xml') |
| write_metadata_file(metadata_file, metadata) |
| repo.index.add([metadata_file]) |
| repo.index.write() |
| |
| # Get information for commit message |
| layer_info = '' |
| for layer, values in metadata['layers'].items(): |
| layer_info = '%s%-17s = %s:%s\n' % (layer_info, layer, |
| values.get('branch', '(nogit)'), values.get('commit', '0'*40)) |
| msg = 'Selftest for build %s of %s for machine %s on %s\n\n%s' % ( |
| log_prefix[12:], metadata['distro']['pretty_name'], |
| metadata['config']['MACHINE'], metadata['hostname'], layer_info) |
| |
| log.debug('Commiting results to local repository') |
| repo.index.commit(msg) |
| if not repo.is_dirty(): |
| try: |
| if branch in r_branches: |
| log.debug('Pushing changes to remote repository') |
| repo.git.push() |
| else: |
| log.debug('Pushing changes to remote repository ' |
| 'creating new branch') |
| repo.git.push('-u', 'origin', branch) |
| except GitCommandError: |
| log.error('Falied to push to remote repository') |
| return 1 |
| else: |
| log.error('Local repository is dirty, not pushing commits') |
| |
| if result.wasSuccessful(): |
| return 0 |
| else: |
| return 1 |
| |
| def buildResultClass(args): |
| """Build a Result Class to use in the testcase execution""" |
| import site |
| |
| class StampedResult(TestResult): |
| """ |
| Custom TestResult that prints the time when a test starts. As oe-selftest |
| can take a long time (ie a few hours) to run, timestamps help us understand |
| what tests are taking a long time to execute. |
| If coverage is required, this class executes the coverage setup and reporting. |
| """ |
| def startTest(self, test): |
| import time |
| self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ") |
| super(StampedResult, self).startTest(test) |
| |
| def startTestRun(self): |
| """ Setup coverage before running any testcase """ |
| |
| # variable holding the coverage configuration file allowing subprocess to be measured |
| self.coveragepth = None |
| |
| # indicates the system if coverage is currently installed |
| self.coverage_installed = True |
| |
| if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: |
| try: |
| # check if user can do coverage |
| import coverage |
| except: |
| log.warn("python coverage is not installed. More info on https://pypi.python.org/pypi/coverage") |
| self.coverage_installed = False |
| |
| if self.coverage_installed: |
| log.info("Coverage is enabled") |
| |
| major_version = int(coverage.version.__version__[0]) |
| if major_version < 4: |
| log.error("python coverage %s installed. Require version 4 or greater." % coverage.version.__version__) |
| self.stop() |
| # In case the user has not set the variable COVERAGE_PROCESS_START, |
| # create a default one and export it. The COVERAGE_PROCESS_START |
| # value indicates where the coverage configuration file resides |
| # More info on https://pypi.python.org/pypi/coverage |
| if not os.environ.get('COVERAGE_PROCESS_START'): |
| os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, args.coverage_include, args.coverage_omit) |
| |
| # Use default site.USER_SITE and write corresponding config file |
| site.ENABLE_USER_SITE = True |
| if not os.path.exists(site.USER_SITE): |
| os.makedirs(site.USER_SITE) |
| self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth") |
| with open(self.coveragepth, 'w') as cps: |
| cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();') |
| |
| def stopTestRun(self): |
| """ Report coverage data after the testcases are run """ |
| |
| if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: |
| if self.coverage_installed: |
| with open(os.environ['COVERAGE_PROCESS_START']) as ccf: |
| log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START')) |
| log.info("===========================") |
| log.info("\n%s" % "".join(ccf.readlines())) |
| |
| log.info("Coverage Report") |
| log.info("===============") |
| try: |
| coverage_report() |
| finally: |
| # remove the pth file |
| try: |
| os.remove(self.coveragepth) |
| except OSError: |
| log.warn("Expected temporal file from coverage is missing, ignoring removal.") |
| |
| return StampedResult |
| |
| def cleanResultsDir(repo): |
| """ Remove result files from directory """ |
| |
| xml_files = [] |
| directory = repo.working_tree_dir |
| for f in os.listdir(directory): |
| path = os.path.join(directory, f) |
| if os.path.isfile(path) and path.endswith('.xml'): |
| xml_files.append(f) |
| repo.index.remove(xml_files, working_tree=True) |
| |
| def copyResultFiles(src, dst, repo): |
| """ Copy result files from src to dst removing the time stamp. """ |
| |
| import shutil |
| |
| re_time = re.compile("-[0-9]+") |
| file_list = [] |
| |
| for root, subdirs, files in os.walk(src): |
| tmp_dir = root.replace(src, '').lstrip('/') |
| for s in subdirs: |
| os.mkdir(os.path.join(dst, tmp_dir, s)) |
| for f in files: |
| file_name = os.path.join(dst, tmp_dir, re_time.sub("", f)) |
| shutil.copy2(os.path.join(root, f), file_name) |
| file_list.append(file_name) |
| repo.index.add(file_list) |
| |
| class TestRunner(_TestRunner): |
| """Test runner class aware of exporting tests.""" |
| def __init__(self, *args, **kwargs): |
| try: |
| exportdir = os.path.join(os.getcwd(), log_prefix) |
| kwargsx = dict(**kwargs) |
| # argument specific to XMLTestRunner, if adding a new runner then |
| # also add logic to use other runner's args. |
| kwargsx['output'] = exportdir |
| kwargsx['descriptions'] = False |
| # done for the case where telling the runner where to export |
| super(TestRunner, self).__init__(*args, **kwargsx) |
| except TypeError: |
| log.info("test runner init'ed like unittest") |
| super(TestRunner, self).__init__(*args, **kwargs) |
| |
| if __name__ == "__main__": |
| try: |
| ret = main() |
| except Exception: |
| ret = 1 |
| import traceback |
| traceback.print_exc() |
| finally: |
| remove_include() |
| remove_inc_files() |
| sys.exit(ret) |