blob: aabf4110cbe1edfcca9d7e2708b57acfc0aaf9b0 [file] [log] [blame]
Brad Bishopc342db32019-05-15 21:57:59 -04001#
Patrick Williamsc124f4f2015-09-15 14:41:29 -05002# Copyright (C) 2013 Intel Corporation
3#
Brad Bishopc342db32019-05-15 21:57:59 -04004# SPDX-License-Identifier: MIT
5#
Patrick Williamsc124f4f2015-09-15 14:41:29 -05006
7# Some custom decorators that can be used by unittests
8# Most useful is skipUnlessPassed which can be used for
9# creating dependecies between two test methods.
10
11import os
12import logging
13import sys
14import unittest
15import threading
16import signal
17from functools import wraps
18
19#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame
20class getResults(object):
21 def __init__(self):
22 #dynamically determine the unittest.case frame and use it to get the name of the test method
23 ident = threading.current_thread().ident
24 upperf = sys._current_frames()[ident]
25 while (upperf.f_globals['__name__'] != 'unittest.case'):
26 upperf = upperf.f_back
27
28 def handleList(items):
29 ret = []
30 # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
31 for i in items:
32 s = i[0].id()
33 #Handle the _ErrorHolder objects from skipModule failures
34 if "setUpModule (" in s:
35 ret.append(s.replace("setUpModule (", "").replace(")",""))
36 else:
37 ret.append(s)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050038 # Append also the test without the full path
39 testname = s.split('.')[-1]
40 if testname:
41 ret.append(testname)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050042 return ret
43 self.faillist = handleList(upperf.f_locals['result'].failures)
44 self.errorlist = handleList(upperf.f_locals['result'].errors)
45 self.skiplist = handleList(upperf.f_locals['result'].skipped)
46
47 def getFailList(self):
48 return self.faillist
49
50 def getErrorList(self):
51 return self.errorlist
52
53 def getSkipList(self):
54 return self.skiplist
55
56class skipIfFailure(object):
57
58 def __init__(self,testcase):
59 self.testcase = testcase
60
61 def __call__(self,f):
Patrick Williamsc0f7c042017-02-23 20:41:17 -060062 @wraps(f)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050063 def wrapped_f(*args, **kwargs):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050064 res = getResults()
65 if self.testcase in (res.getFailList() or res.getErrorList()):
66 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050067 return f(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050068 wrapped_f.__name__ = f.__name__
69 return wrapped_f
70
71class skipIfSkipped(object):
72
73 def __init__(self,testcase):
74 self.testcase = testcase
75
76 def __call__(self,f):
Patrick Williamsc0f7c042017-02-23 20:41:17 -060077 @wraps(f)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050078 def wrapped_f(*args, **kwargs):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050079 res = getResults()
80 if self.testcase in res.getSkipList():
81 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050082 return f(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050083 wrapped_f.__name__ = f.__name__
84 return wrapped_f
85
86class skipUnlessPassed(object):
87
88 def __init__(self,testcase):
89 self.testcase = testcase
90
91 def __call__(self,f):
Patrick Williamsc0f7c042017-02-23 20:41:17 -060092 @wraps(f)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050093 def wrapped_f(*args, **kwargs):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050094 res = getResults()
95 if self.testcase in res.getSkipList() or \
96 self.testcase in res.getFailList() or \
97 self.testcase in res.getErrorList():
98 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050099 return f(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500100 wrapped_f.__name__ = f.__name__
101 wrapped_f._depends_on = self.testcase
102 return wrapped_f
103
104class testcase(object):
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500105 def __init__(self, test_case):
106 self.test_case = test_case
107
108 def __call__(self, func):
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600109 @wraps(func)
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500110 def wrapped_f(*args, **kwargs):
111 return func(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500112 wrapped_f.test_case = self.test_case
113 wrapped_f.__name__ = func.__name__
114 return wrapped_f
115
116class NoParsingFilter(logging.Filter):
117 def filter(self, record):
118 return record.levelno == 100
119
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600120import inspect
121
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500122def LogResults(original_class):
123 orig_method = original_class.run
124
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500125 from time import strftime, gmtime
126 caller = os.path.basename(sys.argv[0])
127 timestamp = strftime('%Y%m%d%H%M%S',gmtime())
128 logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
129 linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
130
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600131 def get_class_that_defined_method(meth):
132 if inspect.ismethod(meth):
133 for cls in inspect.getmro(meth.__self__.__class__):
134 if cls.__dict__.get(meth.__name__) is meth:
135 return cls
136 meth = meth.__func__ # fallback to __qualname__ parsing
137 if inspect.isfunction(meth):
138 cls = getattr(inspect.getmodule(meth),
139 meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
140 if isinstance(cls, type):
141 return cls
142 return None
143
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500144 #rewrite the run method of unittest.TestCase to add testcase logging
145 def run(self, result, *args, **kws):
146 orig_method(self, result, *args, **kws)
147 passed = True
148 testMethod = getattr(self, self._testMethodName)
149 #if test case is decorated then use it's number, else use it's name
150 try:
151 test_case = testMethod.test_case
152 except AttributeError:
153 test_case = self._testMethodName
154
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600155 class_name = str(get_class_that_defined_method(testMethod)).split("'")[1]
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500156
157 #create custom logging level for filtering.
158 custom_log_level = 100
159 logging.addLevelName(custom_log_level, 'RESULTS')
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500160
161 def results(self, message, *args, **kws):
162 if self.isEnabledFor(custom_log_level):
163 self.log(custom_log_level, message, *args, **kws)
164 logging.Logger.results = results
165
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500166 logging.basicConfig(filename=logfile,
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500167 filemode='w',
168 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
169 datefmt='%H:%M:%S',
170 level=custom_log_level)
171 for handler in logging.root.handlers:
172 handler.addFilter(NoParsingFilter())
173 local_log = logging.getLogger(caller)
174
175 #check status of tests and record it
176
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500177 tcid = self.id()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500178 for (name, msg) in result.errors:
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500179 if tcid == name.id():
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500180 local_log.results("Testcase "+str(test_case)+": ERROR")
181 local_log.results("Testcase "+str(test_case)+":\n"+msg)
182 passed = False
183 for (name, msg) in result.failures:
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500184 if tcid == name.id():
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500185 local_log.results("Testcase "+str(test_case)+": FAILED")
186 local_log.results("Testcase "+str(test_case)+":\n"+msg)
187 passed = False
188 for (name, msg) in result.skipped:
Brad Bishop6e60e8b2018-02-01 10:27:11 -0500189 if tcid == name.id():
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500190 local_log.results("Testcase "+str(test_case)+": SKIPPED")
191 passed = False
192 if passed:
193 local_log.results("Testcase "+str(test_case)+": PASSED")
194
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600195 # XXX: In order to avoid race condition when test if exists the linkfile
196 # use bb.utils.lock, the best solution is to create a unique name for the
197 # link file.
198 try:
199 import bb
200 has_bb = True
201 lockfilename = linkfile + '.lock'
202 except ImportError:
203 has_bb = False
204
205 if has_bb:
206 lf = bb.utils.lockfile(lockfilename, block=True)
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500207 # Create symlink to the current log
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600208 if os.path.lexists(linkfile):
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500209 os.remove(linkfile)
210 os.symlink(logfile, linkfile)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600211 if has_bb:
212 bb.utils.unlockfile(lf)
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500213
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500214 original_class.run = run
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500215
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500216 return original_class
217
218class TimeOut(BaseException):
219 pass
220
221def timeout(seconds):
222 def decorator(fn):
223 if hasattr(signal, 'alarm'):
224 @wraps(fn)
225 def wrapped_f(*args, **kw):
226 current_frame = sys._getframe()
227 def raiseTimeOut(signal, frame):
228 if frame is not current_frame:
229 raise TimeOut('%s seconds' % seconds)
230 prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
231 try:
232 signal.alarm(seconds)
233 return fn(*args, **kw)
234 finally:
235 signal.alarm(0)
236 signal.signal(signal.SIGALRM, prev_handler)
237 return wrapped_f
238 else:
239 return fn
240 return decorator
241
242__tag_prefix = "tag__"
243def tag(*args, **kwargs):
244 """Decorator that adds attributes to classes or functions
245 for use with the Attribute (-a) plugin.
246 """
247 def wrap_ob(ob):
248 for name in args:
249 setattr(ob, __tag_prefix + name, True)
Patrick Williamsc0f7c042017-02-23 20:41:17 -0600250 for name, value in kwargs.items():
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500251 setattr(ob, __tag_prefix + name, value)
252 return ob
253 return wrap_ob
254
255def gettag(obj, key, default=None):
256 key = __tag_prefix + key
257 if not isinstance(obj, unittest.TestCase):
258 return getattr(obj, key, default)
259 tc_method = getattr(obj, obj._testMethodName)
260 ret = getattr(tc_method, key, getattr(obj, key, default))
261 return ret
262
263def getAllTags(obj):
264 def __gettags(o):
265 r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)}
266 return r
267 if not isinstance(obj, unittest.TestCase):
268 return __gettags(obj)
269 tc_method = getattr(obj, obj._testMethodName)
270 ret = __gettags(obj)
271 ret.update(__gettags(tc_method))
272 return ret
Patrick Williamsd7e96312015-09-22 08:09:05 -0500273
274def timeout_handler(seconds):
275 def decorator(fn):
276 if hasattr(signal, 'alarm'):
277 @wraps(fn)
278 def wrapped_f(self, *args, **kw):
279 current_frame = sys._getframe()
280 def raiseTimeOut(signal, frame):
281 if frame is not current_frame:
282 try:
283 self.target.restart()
284 raise TimeOut('%s seconds' % seconds)
285 except:
286 raise TimeOut('%s seconds' % seconds)
287 prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
288 try:
289 signal.alarm(seconds)
290 return fn(self, *args, **kw)
291 finally:
292 signal.alarm(0)
293 signal.signal(signal.SIGALRM, prev_handler)
294 return wrapped_f
295 else:
296 return fn
297 return decorator