| # Copyright (C) 2013 Intel Corporation |
| # |
| # Released under the MIT license (see COPYING.MIT) |
| |
| # Main unittest module used by testimage.bbclass |
| # This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime. |
| |
| # It also has some helper functions and it's responsible for actually starting the tests |
| |
| import os, re, mmap, sys |
| import unittest |
| import inspect |
| import subprocess |
| import signal |
| import shutil |
| import functools |
| try: |
| import bb |
| except ImportError: |
| pass |
| import logging |
| |
| import oeqa.runtime |
| # Exported test doesn't require sdkext |
| try: |
| import oeqa.sdkext |
| except ImportError: |
| pass |
| from oeqa.utils.decorators import LogResults, gettag, getResults |
| |
| logger = logging.getLogger("BitBake") |
| |
| def getVar(obj): |
| #extend form dict, if a variable didn't exists, need find it in testcase |
| class VarDict(dict): |
| def __getitem__(self, key): |
| return gettag(obj, key) |
| return VarDict() |
| |
| def checkTags(tc, tagexp): |
| return eval(tagexp, None, getVar(tc)) |
| |
| def filterByTagExp(testsuite, tagexp): |
| if not tagexp: |
| return testsuite |
| caseList = [] |
| for each in testsuite: |
| if not isinstance(each, unittest.BaseTestSuite): |
| if checkTags(each, tagexp): |
| caseList.append(each) |
| else: |
| caseList.append(filterByTagExp(each, tagexp)) |
| return testsuite.__class__(caseList) |
| |
| @LogResults |
| class oeTest(unittest.TestCase): |
| |
| pscmd = "ps" |
| longMessage = True |
| |
| @classmethod |
| def hasPackage(self, pkg): |
| """ |
| True if the full package name exists in the manifest, False otherwise. |
| """ |
| return pkg in oeTest.tc.pkgmanifest |
| |
| @classmethod |
| def hasPackageMatch(self, match): |
| """ |
| True if match exists in the manifest as a regular expression substring, |
| False otherwise. |
| """ |
| for s in oeTest.tc.pkgmanifest: |
| if re.match(match, s): |
| return True |
| return False |
| |
| @classmethod |
| def hasFeature(self,feature): |
| if feature in oeTest.tc.imagefeatures or \ |
| feature in oeTest.tc.distrofeatures: |
| return True |
| else: |
| return False |
| |
| class oeRuntimeTest(oeTest): |
| def __init__(self, methodName='runTest'): |
| self.target = oeRuntimeTest.tc.target |
| super(oeRuntimeTest, self).__init__(methodName) |
| |
| def setUp(self): |
| # Install packages in the DUT |
| self.tc.install_uninstall_packages(self.id()) |
| |
| # Check if test needs to run |
| if self.tc.sigterm: |
| self.fail("Got SIGTERM") |
| elif (type(self.target).__name__ == "QemuTarget"): |
| self.assertTrue(self.target.check(), msg = "Qemu not running?") |
| |
| self.setUpLocal() |
| |
| # a setup method before tests but after the class instantiation |
| def setUpLocal(self): |
| pass |
| |
| def tearDown(self): |
| # Uninstall packages in the DUT |
| self.tc.install_uninstall_packages(self.id(), False) |
| |
| res = getResults() |
| # If a test fails or there is an exception dump |
| # for QemuTarget only |
| if (type(self.target).__name__ == "QemuTarget" and |
| (self.id() in res.getErrorList() or |
| self.id() in res.getFailList())): |
| self.tc.host_dumper.create_dir(self._testMethodName) |
| self.tc.host_dumper.dump_host() |
| self.target.target_dumper.dump_target( |
| self.tc.host_dumper.dump_dir) |
| print ("%s dump data stored in %s" % (self._testMethodName, |
| self.tc.host_dumper.dump_dir)) |
| |
| self.tearDownLocal() |
| |
| # Method to be run after tearDown and implemented by child classes |
| def tearDownLocal(self): |
| pass |
| |
| def getmodule(pos=2): |
| # stack returns a list of tuples containg frame information |
| # First element of the list the is current frame, caller is 1 |
| frameinfo = inspect.stack()[pos] |
| modname = inspect.getmodulename(frameinfo[1]) |
| #modname = inspect.getmodule(frameinfo[0]).__name__ |
| return modname |
| |
| def skipModule(reason, pos=2): |
| modname = getmodule(pos) |
| if modname not in oeTest.tc.testsrequired: |
| raise unittest.SkipTest("%s: %s" % (modname, reason)) |
| else: |
| raise Exception("\nTest %s wants to be skipped.\nReason is: %s" \ |
| "\nTest was required in TEST_SUITES, so either the condition for skipping is wrong" \ |
| "\nor the image really doesn't have the required feature/package when it should." % (modname, reason)) |
| |
| def skipModuleIf(cond, reason): |
| |
| if cond: |
| skipModule(reason, 3) |
| |
| def skipModuleUnless(cond, reason): |
| |
| if not cond: |
| skipModule(reason, 3) |
| |
| _buffer_logger = "" |
| def custom_verbose(msg, *args, **kwargs): |
| global _buffer_logger |
| if msg[-1] != "\n": |
| _buffer_logger += msg |
| else: |
| _buffer_logger += msg |
| try: |
| bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs) |
| except NameError: |
| logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs) |
| _buffer_logger = "" |
| |
| class TestContext(object): |
| def __init__(self, d, exported=False): |
| self.d = d |
| |
| self.testsuites = self._get_test_suites() |
| |
| if exported: |
| path = [os.path.dirname(os.path.abspath(__file__))] |
| extrapath = "" |
| else: |
| path = d.getVar("BBPATH").split(':') |
| extrapath = "lib/oeqa" |
| |
| self.testslist = self._get_tests_list(path, extrapath) |
| self.testsrequired = self._get_test_suites_required() |
| |
| self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files") |
| self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files") |
| self.imagefeatures = d.getVar("IMAGE_FEATURES").split() |
| self.distrofeatures = d.getVar("DISTRO_FEATURES").split() |
| |
| # get testcase list from specified file |
| # if path is a relative path, then relative to build/conf/ |
| def _read_testlist(self, fpath, builddir): |
| if not os.path.isabs(fpath): |
| fpath = os.path.join(builddir, "conf", fpath) |
| if not os.path.exists(fpath): |
| bb.fatal("No such manifest file: ", fpath) |
| tcs = [] |
| for line in open(fpath).readlines(): |
| line = line.strip() |
| if line and not line.startswith("#"): |
| tcs.append(line) |
| return " ".join(tcs) |
| |
| # return test list by type also filter if TEST_SUITES is specified |
| def _get_tests_list(self, bbpath, extrapath): |
| testslist = [] |
| |
| type = self._get_test_namespace() |
| |
| # This relies on lib/ under each directory in BBPATH being added to sys.path |
| # (as done by default in base.bbclass) |
| for testname in self.testsuites: |
| if testname != "auto": |
| if testname.startswith("oeqa."): |
| testslist.append(testname) |
| continue |
| found = False |
| for p in bbpath: |
| if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")): |
| testslist.append("oeqa." + type + "." + testname) |
| found = True |
| break |
| elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")): |
| testslist.append("oeqa." + type + "." + testname) |
| found = True |
| break |
| if not found: |
| bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname) |
| |
| if "auto" in self.testsuites: |
| def add_auto_list(path): |
| files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')]) |
| for f in files: |
| module = 'oeqa.' + type + '.' + f[:-3] |
| if module not in testslist: |
| testslist.append(module) |
| |
| for p in bbpath: |
| testpath = os.path.join(p, 'lib', 'oeqa', type) |
| bb.debug(2, 'Searching for tests in %s' % testpath) |
| if os.path.exists(testpath): |
| add_auto_list(testpath) |
| |
| return testslist |
| |
| def getTestModules(self): |
| """ |
| Returns all the test modules in the testlist. |
| """ |
| |
| import pkgutil |
| |
| modules = [] |
| for test in self.testslist: |
| if re.search("\w+\.\w+\.test_\S+", test): |
| test = '.'.join(t.split('.')[:3]) |
| module = pkgutil.get_loader(test) |
| modules.append(module) |
| |
| return modules |
| |
| def getModulefromID(self, test_id): |
| """ |
| Returns the test module based on a test id. |
| """ |
| |
| module_name = ".".join(test_id.split(".")[:3]) |
| modules = self.getTestModules() |
| for module in modules: |
| if module.name == module_name: |
| return module |
| |
| return None |
| |
| def getTests(self, test): |
| '''Return all individual tests executed when running the suite.''' |
| # Unfortunately unittest does not have an API for this, so we have |
| # to rely on implementation details. This only needs to work |
| # for TestSuite containing TestCase. |
| method = getattr(test, '_testMethodName', None) |
| if method: |
| # leaf case: a TestCase |
| yield test |
| else: |
| # Look into TestSuite. |
| tests = getattr(test, '_tests', []) |
| for t1 in tests: |
| for t2 in self.getTests(t1): |
| yield t2 |
| |
| def loadTests(self): |
| setattr(oeTest, "tc", self) |
| |
| testloader = unittest.TestLoader() |
| testloader.sortTestMethodsUsing = None |
| suites = [testloader.loadTestsFromName(name) for name in self.testslist] |
| suites = filterByTagExp(suites, getattr(self, "tagexp", None)) |
| |
| # Determine dependencies between suites by looking for @skipUnlessPassed |
| # method annotations. Suite A depends on suite B if any method in A |
| # depends on a method on B. |
| for suite in suites: |
| suite.dependencies = [] |
| suite.depth = 0 |
| for test in self.getTests(suite): |
| methodname = getattr(test, '_testMethodName', None) |
| if methodname: |
| method = getattr(test, methodname) |
| depends_on = getattr(method, '_depends_on', None) |
| if depends_on: |
| for dep_suite in suites: |
| if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]: |
| if dep_suite not in suite.dependencies and \ |
| dep_suite is not suite: |
| suite.dependencies.append(dep_suite) |
| break |
| else: |
| logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." % |
| (test, depends_on)) |
| |
| # Use brute-force topological sort to determine ordering. Sort by |
| # depth (higher depth = must run later), with original ordering to |
| # break ties. |
| def set_suite_depth(suite): |
| for dep in suite.dependencies: |
| new_depth = set_suite_depth(dep) + 1 |
| if new_depth > suite.depth: |
| suite.depth = new_depth |
| return suite.depth |
| |
| for index, suite in enumerate(suites): |
| set_suite_depth(suite) |
| suite.index = index |
| |
| def cmp(a, b): |
| return (a > b) - (a < b) |
| |
| def cmpfunc(a, b): |
| return cmp((a.depth, a.index), (b.depth, b.index)) |
| |
| suites.sort(key=functools.cmp_to_key(cmpfunc)) |
| |
| self.suite = testloader.suiteClass(suites) |
| |
| return self.suite |
| |
| def runTests(self): |
| logger.info("Test modules %s" % self.testslist) |
| if hasattr(self, "tagexp") and self.tagexp: |
| logger.info("Filter test cases by tags: %s" % self.tagexp) |
| logger.info("Found %s tests" % self.suite.countTestCases()) |
| runner = unittest.TextTestRunner(verbosity=2) |
| if 'bb' in sys.modules: |
| runner.stream.write = custom_verbose |
| |
| return runner.run(self.suite) |
| |
| class RuntimeTestContext(TestContext): |
| def __init__(self, d, target, exported=False): |
| super(RuntimeTestContext, self).__init__(d, exported) |
| |
| self.target = target |
| |
| self.pkgmanifest = {} |
| manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), |
| d.getVar("IMAGE_LINK_NAME") + ".manifest") |
| nomanifest = d.getVar("IMAGE_NO_MANIFEST") |
| if nomanifest is None or nomanifest != "1": |
| try: |
| with open(manifest) as f: |
| for line in f: |
| (pkg, arch, version) = line.strip().split() |
| self.pkgmanifest[pkg] = (version, arch) |
| except IOError as e: |
| bb.fatal("No package manifest file found. Did you build the image?\n%s" % e) |
| |
| def _get_test_namespace(self): |
| return "runtime" |
| |
| def _get_test_suites(self): |
| testsuites = [] |
| |
| manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split() |
| if manifests: |
| for manifest in manifests: |
| testsuites.extend(self._read_testlist(manifest, |
| self.d.getVar("TOPDIR")).split()) |
| |
| else: |
| testsuites = self.d.getVar("TEST_SUITES").split() |
| |
| return testsuites |
| |
| def _get_test_suites_required(self): |
| return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"] |
| |
| def loadTests(self): |
| super(RuntimeTestContext, self).loadTests() |
| if oeTest.hasPackage("procps"): |
| oeRuntimeTest.pscmd = "ps -ef" |
| |
| def extract_packages(self): |
| """ |
| Find packages that will be needed during runtime. |
| """ |
| |
| modules = self.getTestModules() |
| bbpaths = self.d.getVar("BBPATH").split(":") |
| |
| shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR")) |
| shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR")) |
| for module in modules: |
| json_file = self._getJsonFile(module) |
| if json_file: |
| needed_packages = self._getNeededPackages(json_file) |
| self._perform_package_extraction(needed_packages) |
| |
| def _perform_package_extraction(self, needed_packages): |
| """ |
| Extract packages that will be needed during runtime. |
| """ |
| |
| import oe.path |
| |
| extracted_path = self.d.getVar("TEST_EXTRACTED_DIR") |
| packaged_path = self.d.getVar("TEST_PACKAGED_DIR") |
| |
| for key,value in needed_packages.items(): |
| packages = () |
| if isinstance(value, dict): |
| packages = (value, ) |
| elif isinstance(value, list): |
| packages = value |
| else: |
| bb.fatal("Failed to process needed packages for %s; " |
| "Value must be a dict or list" % key) |
| |
| for package in packages: |
| pkg = package["pkg"] |
| rm = package.get("rm", False) |
| extract = package.get("extract", True) |
| if extract: |
| dst_dir = os.path.join(extracted_path, pkg) |
| else: |
| dst_dir = os.path.join(packaged_path) |
| |
| # Extract package and copy it to TEST_EXTRACTED_DIR |
| pkg_dir = self._extract_in_tmpdir(pkg) |
| if extract: |
| |
| # Same package used for more than one test, |
| # don't need to extract again. |
| if os.path.exists(dst_dir): |
| continue |
| oe.path.copytree(pkg_dir, dst_dir) |
| shutil.rmtree(pkg_dir) |
| |
| # Copy package to TEST_PACKAGED_DIR |
| else: |
| self._copy_package(pkg) |
| |
| def _getJsonFile(self, module): |
| """ |
| Returns the path of the JSON file for a module, empty if doesn't exitst. |
| """ |
| |
| module_file = module.path |
| json_file = "%s.json" % module_file.rsplit(".", 1)[0] |
| if os.path.isfile(module_file) and os.path.isfile(json_file): |
| return json_file |
| else: |
| return "" |
| |
| def _getNeededPackages(self, json_file, test=None): |
| """ |
| Returns a dict with needed packages based on a JSON file. |
| |
| |
| If a test is specified it will return the dict just for that test. |
| """ |
| |
| import json |
| |
| needed_packages = {} |
| |
| with open(json_file) as f: |
| test_packages = json.load(f) |
| for key,value in test_packages.items(): |
| needed_packages[key] = value |
| |
| if test: |
| if test in needed_packages: |
| needed_packages = needed_packages[test] |
| else: |
| needed_packages = {} |
| |
| return needed_packages |
| |
| def _extract_in_tmpdir(self, pkg): |
| """" |
| Returns path to a temp directory where the package was |
| extracted without dependencies. |
| """ |
| |
| from oeqa.utils.package_manager import get_package_manager |
| |
| pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) |
| pm = get_package_manager(self.d, pkg_path) |
| extract_dir = pm.extract(pkg) |
| shutil.rmtree(pkg_path) |
| |
| return extract_dir |
| |
| def _copy_package(self, pkg): |
| """ |
| Copy the RPM, DEB or IPK package to dst_dir |
| """ |
| |
| from oeqa.utils.package_manager import get_package_manager |
| |
| pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) |
| dst_dir = self.d.getVar("TEST_PACKAGED_DIR") |
| pm = get_package_manager(self.d, pkg_path) |
| pkg_info = pm.package_info(pkg) |
| file_path = pkg_info[pkg]["filepath"] |
| shutil.copy2(file_path, dst_dir) |
| shutil.rmtree(pkg_path) |
| |
| def install_uninstall_packages(self, test_id, pkg_dir, install): |
| """ |
| Check if the test requires a package and Install/Uninstall it in the DUT |
| """ |
| |
| test = test_id.split(".")[4] |
| module = self.getModulefromID(test_id) |
| json = self._getJsonFile(module) |
| if json: |
| needed_packages = self._getNeededPackages(json, test) |
| if needed_packages: |
| self._install_uninstall_packages(needed_packages, pkg_dir, install) |
| |
| def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True): |
| """ |
| Install/Uninstall packages in the DUT without using a package manager |
| """ |
| |
| if isinstance(needed_packages, dict): |
| packages = [needed_packages] |
| elif isinstance(needed_packages, list): |
| packages = needed_packages |
| |
| for package in packages: |
| pkg = package["pkg"] |
| rm = package.get("rm", False) |
| extract = package.get("extract", True) |
| src_dir = os.path.join(pkg_dir, pkg) |
| |
| # Install package |
| if install and extract: |
| self.target.connection.copy_dir_to(src_dir, "/") |
| |
| # Uninstall package |
| elif not install and rm: |
| self.target.connection.delete_dir_structure(src_dir, "/") |
| |
| class ImageTestContext(RuntimeTestContext): |
| def __init__(self, d, target, host_dumper): |
| super(ImageTestContext, self).__init__(d, target) |
| |
| self.tagexp = d.getVar("TEST_SUITES_TAGS") |
| |
| self.host_dumper = host_dumper |
| |
| self.sigterm = False |
| self.origsigtermhandler = signal.getsignal(signal.SIGTERM) |
| signal.signal(signal.SIGTERM, self._sigterm_exception) |
| |
| def _sigterm_exception(self, signum, stackframe): |
| bb.warn("TestImage received SIGTERM, shutting down...") |
| self.sigterm = True |
| self.target.stop() |
| |
| def install_uninstall_packages(self, test_id, install=True): |
| """ |
| Check if the test requires a package and Install/Uninstall it in the DUT |
| """ |
| |
| pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR") |
| super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) |
| |
| class ExportTestContext(RuntimeTestContext): |
| def __init__(self, d, target, exported=False, parsedArgs={}): |
| """ |
| This class is used when exporting tests and when are executed outside OE environment. |
| |
| parsedArgs can contain the following: |
| - tag: Filter test by tag. |
| """ |
| super(ExportTestContext, self).__init__(d, target, exported) |
| |
| tag = parsedArgs.get("tag", None) |
| self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS") |
| |
| self.sigterm = None |
| |
| def install_uninstall_packages(self, test_id, install=True): |
| """ |
| Check if the test requires a package and Install/Uninstall it in the DUT |
| """ |
| |
| export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) |
| extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR") |
| pkg_dir = os.path.join(export_dir, extracted_dir) |
| super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) |