blob: 0d79223a290184f15636970475c61fef1df10ef9 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001# Copyright (C) 2013 Intel Corporation
2#
3# Released under the MIT license (see COPYING.MIT)
4
5# Some custom decorators that can be used by unittests
6# Most useful is skipUnlessPassed which can be used for
7# creating dependecies between two test methods.
8
9import os
10import logging
11import sys
12import unittest
13import threading
14import signal
15from functools import wraps
16
17#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame
18class getResults(object):
19 def __init__(self):
20 #dynamically determine the unittest.case frame and use it to get the name of the test method
21 ident = threading.current_thread().ident
22 upperf = sys._current_frames()[ident]
23 while (upperf.f_globals['__name__'] != 'unittest.case'):
24 upperf = upperf.f_back
25
26 def handleList(items):
27 ret = []
28 # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
29 for i in items:
30 s = i[0].id()
31 #Handle the _ErrorHolder objects from skipModule failures
32 if "setUpModule (" in s:
33 ret.append(s.replace("setUpModule (", "").replace(")",""))
34 else:
35 ret.append(s)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050036 # Append also the test without the full path
37 testname = s.split('.')[-1]
38 if testname:
39 ret.append(testname)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050040 return ret
41 self.faillist = handleList(upperf.f_locals['result'].failures)
42 self.errorlist = handleList(upperf.f_locals['result'].errors)
43 self.skiplist = handleList(upperf.f_locals['result'].skipped)
44
45 def getFailList(self):
46 return self.faillist
47
48 def getErrorList(self):
49 return self.errorlist
50
51 def getSkipList(self):
52 return self.skiplist
53
54class skipIfFailure(object):
55
56 def __init__(self,testcase):
57 self.testcase = testcase
58
59 def __call__(self,f):
Patrick Williamsf1e5d692016-03-30 15:21:19 -050060 def wrapped_f(*args, **kwargs):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050061 res = getResults()
62 if self.testcase in (res.getFailList() or res.getErrorList()):
63 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050064 return f(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050065 wrapped_f.__name__ = f.__name__
66 return wrapped_f
67
68class skipIfSkipped(object):
69
70 def __init__(self,testcase):
71 self.testcase = testcase
72
73 def __call__(self,f):
Patrick Williamsf1e5d692016-03-30 15:21:19 -050074 def wrapped_f(*args, **kwargs):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050075 res = getResults()
76 if self.testcase in res.getSkipList():
77 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050078 return f(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050079 wrapped_f.__name__ = f.__name__
80 return wrapped_f
81
82class skipUnlessPassed(object):
83
84 def __init__(self,testcase):
85 self.testcase = testcase
86
87 def __call__(self,f):
Patrick Williamsf1e5d692016-03-30 15:21:19 -050088 def wrapped_f(*args, **kwargs):
Patrick Williamsc124f4f2015-09-15 14:41:29 -050089 res = getResults()
90 if self.testcase in res.getSkipList() or \
91 self.testcase in res.getFailList() or \
92 self.testcase in res.getErrorList():
93 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
Patrick Williamsf1e5d692016-03-30 15:21:19 -050094 return f(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -050095 wrapped_f.__name__ = f.__name__
96 wrapped_f._depends_on = self.testcase
97 return wrapped_f
98
99class testcase(object):
100
101 def __init__(self, test_case):
102 self.test_case = test_case
103
104 def __call__(self, func):
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500105 def wrapped_f(*args, **kwargs):
106 return func(*args, **kwargs)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500107 wrapped_f.test_case = self.test_case
108 wrapped_f.__name__ = func.__name__
109 return wrapped_f
110
111class NoParsingFilter(logging.Filter):
112 def filter(self, record):
113 return record.levelno == 100
114
115def LogResults(original_class):
116 orig_method = original_class.run
117
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500118 from time import strftime, gmtime
119 caller = os.path.basename(sys.argv[0])
120 timestamp = strftime('%Y%m%d%H%M%S',gmtime())
121 logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
122 linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
123
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500124 #rewrite the run method of unittest.TestCase to add testcase logging
125 def run(self, result, *args, **kws):
126 orig_method(self, result, *args, **kws)
127 passed = True
128 testMethod = getattr(self, self._testMethodName)
129 #if test case is decorated then use it's number, else use it's name
130 try:
131 test_case = testMethod.test_case
132 except AttributeError:
133 test_case = self._testMethodName
134
135 class_name = str(testMethod.im_class).split("'")[1]
136
137 #create custom logging level for filtering.
138 custom_log_level = 100
139 logging.addLevelName(custom_log_level, 'RESULTS')
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500140
141 def results(self, message, *args, **kws):
142 if self.isEnabledFor(custom_log_level):
143 self.log(custom_log_level, message, *args, **kws)
144 logging.Logger.results = results
145
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500146 logging.basicConfig(filename=logfile,
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500147 filemode='w',
148 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
149 datefmt='%H:%M:%S',
150 level=custom_log_level)
151 for handler in logging.root.handlers:
152 handler.addFilter(NoParsingFilter())
153 local_log = logging.getLogger(caller)
154
155 #check status of tests and record it
156
157 for (name, msg) in result.errors:
158 if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]):
159 local_log.results("Testcase "+str(test_case)+": ERROR")
160 local_log.results("Testcase "+str(test_case)+":\n"+msg)
161 passed = False
162 for (name, msg) in result.failures:
163 if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]):
164 local_log.results("Testcase "+str(test_case)+": FAILED")
165 local_log.results("Testcase "+str(test_case)+":\n"+msg)
166 passed = False
167 for (name, msg) in result.skipped:
168 if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]):
169 local_log.results("Testcase "+str(test_case)+": SKIPPED")
170 passed = False
171 if passed:
172 local_log.results("Testcase "+str(test_case)+": PASSED")
173
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500174 # Create symlink to the current log
175 if os.path.exists(linkfile):
176 os.remove(linkfile)
177 os.symlink(logfile, linkfile)
178
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500179 original_class.run = run
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500180
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500181 return original_class
182
183class TimeOut(BaseException):
184 pass
185
186def timeout(seconds):
187 def decorator(fn):
188 if hasattr(signal, 'alarm'):
189 @wraps(fn)
190 def wrapped_f(*args, **kw):
191 current_frame = sys._getframe()
192 def raiseTimeOut(signal, frame):
193 if frame is not current_frame:
194 raise TimeOut('%s seconds' % seconds)
195 prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
196 try:
197 signal.alarm(seconds)
198 return fn(*args, **kw)
199 finally:
200 signal.alarm(0)
201 signal.signal(signal.SIGALRM, prev_handler)
202 return wrapped_f
203 else:
204 return fn
205 return decorator
206
207__tag_prefix = "tag__"
208def tag(*args, **kwargs):
209 """Decorator that adds attributes to classes or functions
210 for use with the Attribute (-a) plugin.
211 """
212 def wrap_ob(ob):
213 for name in args:
214 setattr(ob, __tag_prefix + name, True)
215 for name, value in kwargs.iteritems():
216 setattr(ob, __tag_prefix + name, value)
217 return ob
218 return wrap_ob
219
220def gettag(obj, key, default=None):
221 key = __tag_prefix + key
222 if not isinstance(obj, unittest.TestCase):
223 return getattr(obj, key, default)
224 tc_method = getattr(obj, obj._testMethodName)
225 ret = getattr(tc_method, key, getattr(obj, key, default))
226 return ret
227
228def getAllTags(obj):
229 def __gettags(o):
230 r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)}
231 return r
232 if not isinstance(obj, unittest.TestCase):
233 return __gettags(obj)
234 tc_method = getattr(obj, obj._testMethodName)
235 ret = __gettags(obj)
236 ret.update(__gettags(tc_method))
237 return ret
Patrick Williamsd7e96312015-09-22 08:09:05 -0500238
239def timeout_handler(seconds):
240 def decorator(fn):
241 if hasattr(signal, 'alarm'):
242 @wraps(fn)
243 def wrapped_f(self, *args, **kw):
244 current_frame = sys._getframe()
245 def raiseTimeOut(signal, frame):
246 if frame is not current_frame:
247 try:
248 self.target.restart()
249 raise TimeOut('%s seconds' % seconds)
250 except:
251 raise TimeOut('%s seconds' % seconds)
252 prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
253 try:
254 signal.alarm(seconds)
255 return fn(self, *args, **kw)
256 finally:
257 signal.alarm(0)
258 signal.signal(signal.SIGALRM, prev_handler)
259 return wrapped_f
260 else:
261 return fn
262 return decorator