|  | #!/usr/bin/env python3 | 
|  |  | 
|  | # Copyright (c) 2013 Intel Corporation | 
|  | # | 
|  | # This program is free software; you can redistribute it and/or modify | 
|  | # it under the terms of the GNU General Public License version 2 as | 
|  | # published by the Free Software Foundation. | 
|  | # | 
|  | # This program is distributed in the hope that it will be useful, | 
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | 
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
|  | # GNU General Public License for more details. | 
|  | # | 
|  | # You should have received a copy of the GNU General Public License along | 
|  | # with this program; if not, write to the Free Software Foundation, Inc., | 
|  | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 
|  |  | 
|  | # DESCRIPTION | 
|  | # This script runs tests defined in meta/lib/oeqa/selftest/ | 
|  | # It's purpose is to automate the testing of different bitbake tools. | 
|  | # To use it you just need to source your build environment setup script and | 
|  | # add the meta-selftest layer to your BBLAYERS. | 
|  | # Call the script as: "oe-selftest -a" to run all the tests in meta/lib/oeqa/selftest/ | 
|  | # Call the script as: "oe-selftest -r <module>.<Class>.<method>" to run just a single test | 
|  | # E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/oeqa/selftest/bblayers.py | 
|  |  | 
|  |  | 
|  | import os | 
|  | import sys | 
|  | import unittest | 
|  | import logging | 
|  | import argparse | 
|  | import subprocess | 
|  | import time as t | 
|  | import re | 
|  | import fnmatch | 
|  | import collections | 
|  | import imp | 
|  |  | 
|  | sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib') | 
|  | import scriptpath | 
|  | scriptpath.add_bitbake_lib_path() | 
|  | scriptpath.add_oe_lib_path() | 
|  | import argparse_oe | 
|  |  | 
|  | import oeqa.selftest | 
|  | import oeqa.utils.ftools as ftools | 
|  | from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer | 
|  | from oeqa.selftest.base import oeSelfTest, get_available_machines | 
|  |  | 
|  | try: | 
|  | import xmlrunner | 
|  | from xmlrunner.result import _XMLTestResult as TestResult | 
|  | from xmlrunner import XMLTestRunner as _TestRunner | 
|  | except ImportError: | 
|  | # use the base runner instead | 
|  | from unittest import TextTestResult as TestResult | 
|  | from unittest import TextTestRunner as _TestRunner | 
|  |  | 
|  | log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S") | 
|  |  | 
|  | def logger_create(): | 
|  | log_file = log_prefix + ".log" | 
|  | if os.path.exists("oe-selftest.log"): os.remove("oe-selftest.log") | 
|  | os.symlink(log_file, "oe-selftest.log") | 
|  |  | 
|  | log = logging.getLogger("selftest") | 
|  | log.setLevel(logging.DEBUG) | 
|  |  | 
|  | fh = logging.FileHandler(filename=log_file, mode='w') | 
|  | fh.setLevel(logging.DEBUG) | 
|  |  | 
|  | ch = logging.StreamHandler(sys.stdout) | 
|  | ch.setLevel(logging.INFO) | 
|  |  | 
|  | formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | 
|  | fh.setFormatter(formatter) | 
|  | ch.setFormatter(formatter) | 
|  |  | 
|  | log.addHandler(fh) | 
|  | log.addHandler(ch) | 
|  |  | 
|  | return log | 
|  |  | 
|  | log = logger_create() | 
|  |  | 
|  | def get_args_parser(): | 
|  | description = "Script that runs unit tests agains bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information." | 
|  | parser = argparse_oe.ArgumentParser(description=description) | 
|  | group = parser.add_mutually_exclusive_group(required=True) | 
|  | group.add_argument('-r', '--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: <module>.<class>.<test_method>') | 
|  | group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests') | 
|  | group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.') | 
|  | group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.') | 
|  | parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing") | 
|  | parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from") | 
|  | parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement") | 
|  | parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement") | 
|  | group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*', | 
|  | help='run-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>') | 
|  | group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*', | 
|  | help='list-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>') | 
|  | group.add_argument('-l', '--list-tests', required=False,  action="store_true", dest="list_tests", default=False, | 
|  | help='List all available tests.') | 
|  | group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true", | 
|  | help='List all tags that have been set to test cases.') | 
|  | parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None, | 
|  | help='Run tests on different machines (random/all).') | 
|  | return parser | 
|  |  | 
|  |  | 
|  | def preflight_check(): | 
|  |  | 
|  | log.info("Checking that everything is in order before running the tests") | 
|  |  | 
|  | if not os.environ.get("BUILDDIR"): | 
|  | log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?") | 
|  | return False | 
|  |  | 
|  | builddir = os.environ.get("BUILDDIR") | 
|  | if os.getcwd() != builddir: | 
|  | log.info("Changing cwd to %s" % builddir) | 
|  | os.chdir(builddir) | 
|  |  | 
|  | if not "meta-selftest" in get_bb_var("BBLAYERS"): | 
|  | log.error("You don't seem to have the meta-selftest layer in BBLAYERS") | 
|  | return False | 
|  |  | 
|  | log.info("Running bitbake -p") | 
|  | runCmd("bitbake -p") | 
|  |  | 
|  | return True | 
|  |  | 
|  | def add_include(): | 
|  | builddir = os.environ.get("BUILDDIR") | 
|  | if "#include added by oe-selftest.py" \ | 
|  | not in ftools.read_file(os.path.join(builddir, "conf/local.conf")): | 
|  | log.info("Adding: \"include selftest.inc\" in local.conf") | 
|  | ftools.append_file(os.path.join(builddir, "conf/local.conf"), \ | 
|  | "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc") | 
|  |  | 
|  | if "#include added by oe-selftest.py" \ | 
|  | not in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")): | 
|  | log.info("Adding: \"include bblayers.inc\" in bblayers.conf") | 
|  | ftools.append_file(os.path.join(builddir, "conf/bblayers.conf"), \ | 
|  | "\n#include added by oe-selftest.py\ninclude bblayers.inc") | 
|  |  | 
|  | def remove_include(): | 
|  | builddir = os.environ.get("BUILDDIR") | 
|  | if builddir is None: | 
|  | return | 
|  | if "#include added by oe-selftest.py" \ | 
|  | in ftools.read_file(os.path.join(builddir, "conf/local.conf")): | 
|  | log.info("Removing the include from local.conf") | 
|  | ftools.remove_from_file(os.path.join(builddir, "conf/local.conf"), \ | 
|  | "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc") | 
|  |  | 
|  | if "#include added by oe-selftest.py" \ | 
|  | in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")): | 
|  | log.info("Removing the include from bblayers.conf") | 
|  | ftools.remove_from_file(os.path.join(builddir, "conf/bblayers.conf"), \ | 
|  | "\n#include added by oe-selftest.py\ninclude bblayers.inc") | 
|  |  | 
|  | def remove_inc_files(): | 
|  | try: | 
|  | os.remove(os.path.join(os.environ.get("BUILDDIR"), "conf/selftest.inc")) | 
|  | for root, _, files in os.walk(get_test_layer()): | 
|  | for f in files: | 
|  | if f == 'test_recipe.inc': | 
|  | os.remove(os.path.join(root, f)) | 
|  | except (AttributeError, OSError,) as e:    # AttributeError may happen if BUILDDIR is not set | 
|  | pass | 
|  |  | 
|  | for incl_file in ['conf/bblayers.inc', 'conf/machine.inc']: | 
|  | try: | 
|  | os.remove(os.path.join(os.environ.get("BUILDDIR"), incl_file)) | 
|  | except: | 
|  | pass | 
|  |  | 
|  |  | 
|  | def get_tests_modules(include_hidden=False): | 
|  | modules_list = list() | 
|  | for modules_path in oeqa.selftest.__path__: | 
|  | for (p, d, f) in os.walk(modules_path): | 
|  | files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py']) | 
|  | for f in files: | 
|  | submodules = p.split("selftest")[-1] | 
|  | module = "" | 
|  | if submodules: | 
|  | module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0] | 
|  | else: | 
|  | module = 'oeqa.selftest.' + f.split('.py')[0] | 
|  | if module not in modules_list: | 
|  | modules_list.append(module) | 
|  | return modules_list | 
|  |  | 
|  |  | 
|  | def get_tests(exclusive_modules=[], include_hidden=False): | 
|  | test_modules = list() | 
|  | for x in exclusive_modules: | 
|  | test_modules.append('oeqa.selftest.' + x) | 
|  | if not test_modules: | 
|  | inc_hidden = include_hidden | 
|  | test_modules = get_tests_modules(inc_hidden) | 
|  |  | 
|  | return test_modules | 
|  |  | 
|  |  | 
|  | class Tc: | 
|  | def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None): | 
|  | self.tcname = tcname | 
|  | self.tcclass = tcclass | 
|  | self.tcmodule = tcmodule | 
|  | self.tcid = tcid | 
|  | # A test case can have multiple tags (as tuples) otherwise str will suffice | 
|  | self.tctag = tctag | 
|  | self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname]) | 
|  |  | 
|  |  | 
|  | def get_tests_from_module(tmod): | 
|  | tlist = [] | 
|  | prefix = 'oeqa.selftest.' | 
|  |  | 
|  | try: | 
|  | import importlib | 
|  | modlib = importlib.import_module(tmod) | 
|  | for mod in list(vars(modlib).values()): | 
|  | if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest: | 
|  | for test in dir(mod): | 
|  | if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'): | 
|  | # Get test case id and feature tag | 
|  | # NOTE: if testcase decorator or feature tag not set will throw error | 
|  | try: | 
|  | tid = vars(mod)[test].test_case | 
|  | except: | 
|  | print('DEBUG: tc id missing for ' + str(test)) | 
|  | tid = None | 
|  | try: | 
|  | ttag = vars(mod)[test].tag__feature | 
|  | except: | 
|  | # print('DEBUG: feature tag missing for ' + str(test)) | 
|  | ttag = None | 
|  |  | 
|  | # NOTE: for some reason lstrip() doesn't work for mod.__module__ | 
|  | tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag)) | 
|  | except: | 
|  | pass | 
|  |  | 
|  | return tlist | 
|  |  | 
|  |  | 
|  | def get_all_tests(): | 
|  | # Get all the test modules (except the hidden ones) | 
|  | testlist = [] | 
|  | tests_modules = get_tests_modules() | 
|  | # Get all the tests from modules | 
|  | for tmod in sorted(tests_modules): | 
|  | testlist += get_tests_from_module(tmod) | 
|  | return testlist | 
|  |  | 
|  |  | 
|  | def get_testsuite_by(criteria, keyword): | 
|  | # Get a testsuite based on 'keyword' | 
|  | # criteria: name, class, module, id, tag | 
|  | # keyword: a list of tests, classes, modules, ids, tags | 
|  |  | 
|  | ts = [] | 
|  | all_tests = get_all_tests() | 
|  |  | 
|  | def get_matches(values): | 
|  | # Get an item and return the ones that match with keyword(s) | 
|  | # values: the list of items (names, modules, classes...) | 
|  | result = [] | 
|  | remaining = values[:] | 
|  | for key in keyword: | 
|  | found = False | 
|  | if key in remaining: | 
|  | # Regular matching of exact item | 
|  | result.append(key) | 
|  | remaining.remove(key) | 
|  | found = True | 
|  | else: | 
|  | # Wildcard matching | 
|  | pattern = re.compile(fnmatch.translate(r"%s" % key)) | 
|  | added = [x for x in remaining if pattern.match(x)] | 
|  | if added: | 
|  | result.extend(added) | 
|  | remaining = [x for x in remaining if x not in added] | 
|  | found = True | 
|  | if not found: | 
|  | log.error("Failed to find test: %s" % key) | 
|  |  | 
|  | return result | 
|  |  | 
|  | if criteria == 'name': | 
|  | names = get_matches([ tc.tcname for tc in all_tests ]) | 
|  | ts = [ tc for tc in all_tests if tc.tcname in names ] | 
|  |  | 
|  | elif criteria == 'class': | 
|  | classes = get_matches([ tc.tcclass for tc in all_tests ]) | 
|  | ts = [ tc for tc in all_tests if tc.tcclass in classes ] | 
|  |  | 
|  | elif criteria == 'module': | 
|  | modules = get_matches([ tc.tcmodule for tc in all_tests ]) | 
|  | ts = [ tc for tc in all_tests if tc.tcmodule in modules ] | 
|  |  | 
|  | elif criteria == 'id': | 
|  | ids = get_matches([ str(tc.tcid) for tc in all_tests ]) | 
|  | ts = [ tc for tc in all_tests if str(tc.tcid) in ids ] | 
|  |  | 
|  | elif criteria == 'tag': | 
|  | values = set() | 
|  | for tc in all_tests: | 
|  | # tc can have multiple tags (as tuple) otherwise str will suffice | 
|  | if isinstance(tc.tctag, tuple): | 
|  | values |= { str(tag) for tag in tc.tctag } | 
|  | else: | 
|  | values.add(str(tc.tctag)) | 
|  |  | 
|  | tags = get_matches(list(values)) | 
|  |  | 
|  | for tc in all_tests: | 
|  | for tag in tags: | 
|  | if isinstance(tc.tctag, tuple) and tag in tc.tctag: | 
|  | ts.append(tc) | 
|  | elif tag == tc.tctag: | 
|  | ts.append(tc) | 
|  |  | 
|  | # Remove duplicates from the list | 
|  | ts = list(set(ts)) | 
|  |  | 
|  | return ts | 
|  |  | 
|  |  | 
|  | def list_testsuite_by(criteria, keyword): | 
|  | # Get a testsuite based on 'keyword' | 
|  | # criteria: name, class, module, id, tag | 
|  | # keyword: a list of tests, classes, modules, ids, tags | 
|  |  | 
|  | ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) for tc in get_testsuite_by(criteria, keyword) ]) | 
|  |  | 
|  | print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % ('id', 'tag', 'name', 'class', 'module')) | 
|  | print('_' * 150) | 
|  | for t in ts: | 
|  | if isinstance(t[1], (tuple, list)): | 
|  | print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4])) | 
|  | else: | 
|  | print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t) | 
|  | print('_' * 150) | 
|  | print('Filtering by:\t %s' % criteria) | 
|  | print('Looking for:\t %s' % ', '.join(str(x) for x in keyword)) | 
|  | print('Total found:\t %s' % len(ts)) | 
|  |  | 
|  |  | 
|  | def list_tests(): | 
|  | # List all available oe-selftest tests | 
|  |  | 
|  | ts = get_all_tests() | 
|  |  | 
|  | print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test')) | 
|  | print('_' * 80) | 
|  | for t in ts: | 
|  | if isinstance(t.tctag, (tuple, list)): | 
|  | print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, t.tcname]))) | 
|  | else: | 
|  | print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname]))) | 
|  | print('_' * 80) | 
|  | print('Total found:\t %s' % len(ts)) | 
|  |  | 
|  | def list_tags(): | 
|  | # Get all tags set to test cases | 
|  | # This is useful when setting tags to test cases | 
|  | # The list of tags should be kept as minimal as possible | 
|  | tags = set() | 
|  | all_tests = get_all_tests() | 
|  |  | 
|  | for tc in all_tests: | 
|  | if isinstance(tc.tctag, (tuple, list)): | 
|  | tags.update(set(tc.tctag)) | 
|  | else: | 
|  | tags.add(tc.tctag) | 
|  |  | 
|  | print('Tags:\t%s' % ', '.join(str(x) for x in tags)) | 
|  |  | 
|  | def coverage_setup(coverage_source, coverage_include, coverage_omit): | 
|  | """ Set up the coverage measurement for the testcases to be run """ | 
|  | import datetime | 
|  | import subprocess | 
|  | builddir = os.environ.get("BUILDDIR") | 
|  | pokydir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) | 
|  | curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", "HEAD"]).decode('utf-8') | 
|  | coveragerc = "%s/.coveragerc" % builddir | 
|  | data_file = "%s/.coverage." % builddir | 
|  | data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S') | 
|  | if os.path.isfile(data_file): | 
|  | os.remove(data_file) | 
|  | with open(coveragerc, 'w') as cps: | 
|  | cps.write("# Generated with command '%s'\n" % " ".join(sys.argv)) | 
|  | cps.write("# HEAD commit %s\n" % curcommit.strip()) | 
|  | cps.write("[run]\n") | 
|  | cps.write("data_file = %s\n" % data_file) | 
|  | cps.write("branch = True\n") | 
|  | # Measure just BBLAYERS, scripts and bitbake folders | 
|  | cps.write("source = \n") | 
|  | if coverage_source: | 
|  | for directory in coverage_source: | 
|  | if not os.path.isdir(directory): | 
|  | log.warn("Directory %s is not valid.", directory) | 
|  | cps.write("    %s\n" % directory) | 
|  | else: | 
|  | for layer in get_bb_var('BBLAYERS').split(): | 
|  | cps.write("    %s\n" % layer) | 
|  | cps.write("    %s\n" % os.path.dirname(os.path.realpath(__file__))) | 
|  | cps.write("    %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake')) | 
|  |  | 
|  | if coverage_include: | 
|  | cps.write("include = \n") | 
|  | for pattern in coverage_include: | 
|  | cps.write("    %s\n" % pattern) | 
|  | if coverage_omit: | 
|  | cps.write("omit = \n") | 
|  | for pattern in coverage_omit: | 
|  | cps.write("    %s\n" % pattern) | 
|  |  | 
|  | return coveragerc | 
|  |  | 
|  | def coverage_report(): | 
|  | """ Loads the coverage data gathered and reports it back """ | 
|  | try: | 
|  | # Coverage4 uses coverage.Coverage | 
|  | from coverage import Coverage | 
|  | except: | 
|  | # Coverage under version 4 uses coverage.coverage | 
|  | from coverage import coverage as Coverage | 
|  |  | 
|  | import io as StringIO | 
|  | from coverage.misc import CoverageException | 
|  |  | 
|  | cov_output = StringIO.StringIO() | 
|  | # Creating the coverage data with the setting from the configuration file | 
|  | cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START')) | 
|  | try: | 
|  | # Load data from the data file specified in the configuration | 
|  | cov.load() | 
|  | # Store report data in a StringIO variable | 
|  | cov.report(file = cov_output, show_missing=False) | 
|  | log.info("\n%s" % cov_output.getvalue()) | 
|  | except CoverageException as e: | 
|  | # Show problems with the reporting. Since Coverage4 not finding  any data to report raises an exception | 
|  | log.warn("%s" % str(e)) | 
|  | finally: | 
|  | cov_output.close() | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | parser = get_args_parser() | 
|  | args = parser.parse_args() | 
|  |  | 
|  | # Add <layer>/lib to sys.path, so layers can add selftests | 
|  | log.info("Running bitbake -e to get BBPATH") | 
|  | bbpath = get_bb_var('BBPATH').split(':') | 
|  | layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)] | 
|  | sys.path.extend(layer_libdirs) | 
|  | imp.reload(oeqa.selftest) | 
|  |  | 
|  | if args.run_tests_by and len(args.run_tests_by) >= 2: | 
|  | valid_options = ['name', 'class', 'module', 'id', 'tag'] | 
|  | if args.run_tests_by[0] not in valid_options: | 
|  | print('--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.run_tests_by[0]) | 
|  | return 1 | 
|  | else: | 
|  | criteria = args.run_tests_by[0] | 
|  | keyword = args.run_tests_by[1:] | 
|  | ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ]) | 
|  | if not ts: | 
|  | return 1 | 
|  |  | 
|  | if args.list_tests_by and len(args.list_tests_by) >= 2: | 
|  | valid_options = ['name', 'class', 'module', 'id', 'tag'] | 
|  | if args.list_tests_by[0] not in valid_options: | 
|  | print('--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.list_tests_by[0]) | 
|  | return 1 | 
|  | else: | 
|  | criteria = args.list_tests_by[0] | 
|  | keyword = args.list_tests_by[1:] | 
|  | list_testsuite_by(criteria, keyword) | 
|  |  | 
|  | if args.list_tests: | 
|  | list_tests() | 
|  |  | 
|  | if args.list_tags: | 
|  | list_tags() | 
|  |  | 
|  | if args.list_allclasses: | 
|  | args.list_modules = True | 
|  |  | 
|  | if args.list_modules: | 
|  | log.info('Listing all available test modules:') | 
|  | testslist = get_tests(include_hidden=True) | 
|  | for test in testslist: | 
|  | module = test.split('oeqa.selftest.')[-1] | 
|  | info = '' | 
|  | if module.startswith('_'): | 
|  | info = ' (hidden)' | 
|  | print(module + info) | 
|  | if args.list_allclasses: | 
|  | try: | 
|  | import importlib | 
|  | modlib = importlib.import_module(test) | 
|  | for v in vars(modlib): | 
|  | t = vars(modlib)[v] | 
|  | if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest: | 
|  | print(" --", v) | 
|  | for method in dir(t): | 
|  | if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable): | 
|  | print(" --  --", method) | 
|  |  | 
|  | except (AttributeError, ImportError) as e: | 
|  | print(e) | 
|  | pass | 
|  |  | 
|  | if args.run_tests or args.run_all_tests or args.run_tests_by: | 
|  | if not preflight_check(): | 
|  | return 1 | 
|  |  | 
|  | if args.run_tests_by: | 
|  | testslist = ts | 
|  | else: | 
|  | testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False) | 
|  |  | 
|  | suite = unittest.TestSuite() | 
|  | loader = unittest.TestLoader() | 
|  | loader.sortTestMethodsUsing = None | 
|  | runner = TestRunner(verbosity=2, | 
|  | resultclass=buildResultClass(args)) | 
|  | # we need to do this here, otherwise just loading the tests | 
|  | # will take 2 minutes (bitbake -e calls) | 
|  | oeSelfTest.testlayer_path = get_test_layer() | 
|  | for test in testslist: | 
|  | log.info("Loading tests from: %s" % test) | 
|  | try: | 
|  | suite.addTests(loader.loadTestsFromName(test)) | 
|  | except AttributeError as e: | 
|  | log.error("Failed to import %s" % test) | 
|  | log.error(e) | 
|  | return 1 | 
|  | add_include() | 
|  |  | 
|  | if args.machine: | 
|  | # Custom machine sets only weak default values (??=) for MACHINE in machine.inc | 
|  | # This let test cases that require a specific MACHINE to be able to override it, using (?= or =) | 
|  | log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine) | 
|  | if args.machine == 'random': | 
|  | os.environ['CUSTOMMACHINE'] = 'random' | 
|  | result = runner.run(suite) | 
|  | else:  # all | 
|  | machines = get_available_machines() | 
|  | for m in machines: | 
|  | log.info('Run tests with custom MACHINE set to: %s' % m) | 
|  | os.environ['CUSTOMMACHINE'] = m | 
|  | result = runner.run(suite) | 
|  | else: | 
|  | result = runner.run(suite) | 
|  |  | 
|  | log.info("Finished") | 
|  |  | 
|  | if result.wasSuccessful(): | 
|  | return 0 | 
|  | else: | 
|  | return 1 | 
|  |  | 
|  | def buildResultClass(args): | 
|  | """Build a Result Class to use in the testcase execution""" | 
|  | import site | 
|  |  | 
|  | class StampedResult(TestResult): | 
|  | """ | 
|  | Custom TestResult that prints the time when a test starts.  As oe-selftest | 
|  | can take a long time (ie a few hours) to run, timestamps help us understand | 
|  | what tests are taking a long time to execute. | 
|  | If coverage is required, this class executes the coverage setup and reporting. | 
|  | """ | 
|  | def startTest(self, test): | 
|  | import time | 
|  | self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ") | 
|  | super(StampedResult, self).startTest(test) | 
|  |  | 
|  | def startTestRun(self): | 
|  | """ Setup coverage before running any testcase """ | 
|  |  | 
|  | # variable holding the coverage configuration file allowing subprocess to be measured | 
|  | self.coveragepth = None | 
|  |  | 
|  | # indicates the system if coverage is currently installed | 
|  | self.coverage_installed = True | 
|  |  | 
|  | if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: | 
|  | try: | 
|  | # check if user can do coverage | 
|  | import coverage | 
|  | except: | 
|  | log.warn("python coverage is not installed. More info on https://pypi.python.org/pypi/coverage") | 
|  | self.coverage_installed = False | 
|  |  | 
|  | if self.coverage_installed: | 
|  | log.info("Coverage is enabled") | 
|  |  | 
|  | major_version = int(coverage.version.__version__[0]) | 
|  | if major_version < 4: | 
|  | log.error("python coverage %s installed. Require version 4 or greater." % coverage.version.__version__) | 
|  | self.stop() | 
|  | # In case the user has not set the variable COVERAGE_PROCESS_START, | 
|  | # create a default one and export it. The COVERAGE_PROCESS_START | 
|  | # value indicates where the coverage configuration file resides | 
|  | # More info on https://pypi.python.org/pypi/coverage | 
|  | if not os.environ.get('COVERAGE_PROCESS_START'): | 
|  | os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, args.coverage_include, args.coverage_omit) | 
|  |  | 
|  | # Use default site.USER_SITE and write corresponding config file | 
|  | site.ENABLE_USER_SITE = True | 
|  | if not os.path.exists(site.USER_SITE): | 
|  | os.makedirs(site.USER_SITE) | 
|  | self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth") | 
|  | with open(self.coveragepth, 'w') as cps: | 
|  | cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();') | 
|  |  | 
|  | def stopTestRun(self): | 
|  | """ Report coverage data after the testcases are run """ | 
|  |  | 
|  | if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: | 
|  | if self.coverage_installed: | 
|  | with open(os.environ['COVERAGE_PROCESS_START']) as ccf: | 
|  | log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START')) | 
|  | log.info("===========================") | 
|  | log.info("\n%s" % "".join(ccf.readlines())) | 
|  |  | 
|  | log.info("Coverage Report") | 
|  | log.info("===============") | 
|  | try: | 
|  | coverage_report() | 
|  | finally: | 
|  | # remove the pth file | 
|  | try: | 
|  | os.remove(self.coveragepth) | 
|  | except OSError: | 
|  | log.warn("Expected temporal file from coverage is missing, ignoring removal.") | 
|  |  | 
|  | return StampedResult | 
|  |  | 
|  | class TestRunner(_TestRunner): | 
|  | """Test runner class aware of exporting tests.""" | 
|  | def __init__(self, *args, **kwargs): | 
|  | try: | 
|  | exportdir = os.path.join(os.getcwd(), log_prefix) | 
|  | kwargsx = dict(**kwargs) | 
|  | # argument specific to XMLTestRunner, if adding a new runner then | 
|  | # also add logic to use other runner's args. | 
|  | kwargsx['output'] = exportdir | 
|  | kwargsx['descriptions'] = False | 
|  | # done for the case where telling the runner where to export | 
|  | super(TestRunner, self).__init__(*args, **kwargsx) | 
|  | except TypeError: | 
|  | log.info("test runner init'ed like unittest") | 
|  | super(TestRunner, self).__init__(*args, **kwargs) | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | try: | 
|  | ret = main() | 
|  | except Exception: | 
|  | ret = 1 | 
|  | import traceback | 
|  | traceback.print_exc() | 
|  | finally: | 
|  | remove_include() | 
|  | remove_inc_files() | 
|  | sys.exit(ret) |