| # |
| # Copyright (C) 2016 Intel Corporation |
| # |
| # SPDX-License-Identifier: MIT |
| # |
| |
| from unittest import SkipTest |
| |
| from oeqa.core.exception import OEQADependency |
| |
| from . import OETestDiscover, registerDecorator |
| |
| def _add_depends(registry, case, depends): |
| module_name = case.__module__ |
| class_name = case.__class__.__name__ |
| |
| case_id = case.id() |
| |
| for depend in depends: |
| dparts = depend.split('.') |
| |
| if len(dparts) == 1: |
| depend_id = ".".join((module_name, class_name, dparts[0])) |
| elif len(dparts) == 2: |
| depend_id = ".".join((module_name, dparts[0], dparts[1])) |
| else: |
| depend_id = depend |
| |
| if not case_id in registry: |
| registry[case_id] = [] |
| if not depend_id in registry[case_id]: |
| registry[case_id].append(depend_id) |
| |
| def _validate_test_case_depends(cases, depends): |
| for case in depends: |
| if not case in cases: |
| continue |
| for dep in depends[case]: |
| if not dep in cases: |
| raise OEQADependency("TestCase %s depends on %s and isn't available"\ |
| ", cases available %s." % (case, dep, str(cases.keys()))) |
| |
| def _order_test_case_by_depends(cases, depends): |
| def _dep_resolve(graph, node, resolved, seen): |
| seen.append(node) |
| for edge in graph[node]: |
| if edge not in resolved: |
| if edge in seen: |
| raise OEQADependency("Test cases %s and %s have a circular" \ |
| " dependency." % (node, edge)) |
| _dep_resolve(graph, edge, resolved, seen) |
| resolved.append(node) |
| |
| dep_graph = {} |
| dep_graph['__root__'] = cases.keys() |
| for case in cases: |
| if case in depends: |
| dep_graph[case] = depends[case] |
| else: |
| dep_graph[case] = [] |
| |
| cases_ordered = [] |
| _dep_resolve(dep_graph, '__root__', cases_ordered, []) |
| cases_ordered.remove('__root__') |
| |
| return [cases[case_id] for case_id in cases_ordered] |
| |
| def _skipTestDependency(case, depends): |
| for dep in depends: |
| found = False |
| for test, _ in case.tc.results.successes: |
| if test.id() == dep: |
| found = True |
| break |
| if not found: |
| raise SkipTest("Test case %s depends on %s but it didn't pass/run." \ |
| % (case.id(), dep)) |
| |
| @registerDecorator |
| class OETestDepends(OETestDiscover): |
| attrs = ('depends',) |
| |
| def bind(self, registry, case): |
| super(OETestDepends, self).bind(registry, case) |
| if not registry.get('depends'): |
| registry['depends'] = {} |
| _add_depends(registry['depends'], case, self.depends) |
| |
| @staticmethod |
| def discover(registry): |
| if registry.get('depends'): |
| _validate_test_case_depends(registry['cases'], registry['depends']) |
| return _order_test_case_by_depends(registry['cases'], registry['depends']) |
| else: |
| return [registry['cases'][case_id] for case_id in registry['cases']] |
| |
| def setUpDecorator(self): |
| _skipTestDependency(self.case, self.depends) |