| # ex:ts=4:sw=4:sts=4:et |
| # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- |
| # |
| # BitBake Tests for the Event implementation (event.py) |
| # |
| # Copyright (C) 2017 Intel Corporation |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License version 2 as |
| # published by the Free Software Foundation. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License along |
| # with this program; if not, write to the Free Software Foundation, Inc., |
| # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| # |
| |
| import unittest |
| import bb |
| import logging |
| import bb.compat |
| import bb.event |
| import importlib |
| import threading |
| import time |
| import pickle |
| from unittest.mock import Mock |
| from unittest.mock import call |
| |
| |
| class EventQueueStub(): |
| """ Class used as specification for UI event handler queue stub objects """ |
| def __init__(self): |
| return |
| |
| def send(self, event): |
| return |
| |
| |
| class PickleEventQueueStub(): |
| """ Class used as specification for UI event handler queue stub objects |
| with sendpickle method """ |
| def __init__(self): |
| return |
| |
| def sendpickle(self, pickled_event): |
| return |
| |
| |
| class UIClientStub(): |
| """ Class used as specification for UI event handler stub objects """ |
| def __init__(self): |
| self.event = None |
| |
| |
| class EventHandlingTest(unittest.TestCase): |
| """ Event handling test class """ |
| _threadlock_test_calls = [] |
| |
| def setUp(self): |
| self._test_process = Mock() |
| ui_client1 = UIClientStub() |
| ui_client2 = UIClientStub() |
| self._test_ui1 = Mock(wraps=ui_client1) |
| self._test_ui2 = Mock(wraps=ui_client2) |
| importlib.reload(bb.event) |
| |
| def _create_test_handlers(self): |
| """ Method used to create a test handler ordered dictionary """ |
| test_handlers = bb.compat.OrderedDict() |
| test_handlers["handler1"] = self._test_process.handler1 |
| test_handlers["handler2"] = self._test_process.handler2 |
| return test_handlers |
| |
| def test_class_handlers(self): |
| """ Test set_class_handlers and get_class_handlers methods """ |
| test_handlers = self._create_test_handlers() |
| bb.event.set_class_handlers(test_handlers) |
| self.assertEqual(test_handlers, |
| bb.event.get_class_handlers()) |
| |
| def test_handlers(self): |
| """ Test set_handlers and get_handlers """ |
| test_handlers = self._create_test_handlers() |
| bb.event.set_handlers(test_handlers) |
| self.assertEqual(test_handlers, |
| bb.event.get_handlers()) |
| |
| def test_clean_class_handlers(self): |
| """ Test clean_class_handlers method """ |
| cleanDict = bb.compat.OrderedDict() |
| self.assertEqual(cleanDict, |
| bb.event.clean_class_handlers()) |
| |
| def test_register(self): |
| """ Test register method for class handlers """ |
| result = bb.event.register("handler", self._test_process.handler) |
| self.assertEqual(result, bb.event.Registered) |
| handlers_dict = bb.event.get_class_handlers() |
| self.assertIn("handler", handlers_dict) |
| |
| def test_already_registered(self): |
| """ Test detection of an already registed class handler """ |
| bb.event.register("handler", self._test_process.handler) |
| handlers_dict = bb.event.get_class_handlers() |
| self.assertIn("handler", handlers_dict) |
| result = bb.event.register("handler", self._test_process.handler) |
| self.assertEqual(result, bb.event.AlreadyRegistered) |
| |
| def test_register_from_string(self): |
| """ Test register method receiving code in string """ |
| result = bb.event.register("string_handler", " return True") |
| self.assertEqual(result, bb.event.Registered) |
| handlers_dict = bb.event.get_class_handlers() |
| self.assertIn("string_handler", handlers_dict) |
| |
| def test_register_with_mask(self): |
| """ Test register method with event masking """ |
| mask = ["bb.event.OperationStarted", |
| "bb.event.OperationCompleted"] |
| result = bb.event.register("event_handler", |
| self._test_process.event_handler, |
| mask) |
| self.assertEqual(result, bb.event.Registered) |
| handlers_dict = bb.event.get_class_handlers() |
| self.assertIn("event_handler", handlers_dict) |
| |
| def test_remove(self): |
| """ Test remove method for class handlers """ |
| test_handlers = self._create_test_handlers() |
| bb.event.set_class_handlers(test_handlers) |
| count = len(test_handlers) |
| bb.event.remove("handler1", None) |
| test_handlers = bb.event.get_class_handlers() |
| self.assertEqual(len(test_handlers), count - 1) |
| with self.assertRaises(KeyError): |
| bb.event.remove("handler1", None) |
| |
| def test_execute_handler(self): |
| """ Test execute_handler method for class handlers """ |
| mask = ["bb.event.OperationProgress"] |
| result = bb.event.register("event_handler", |
| self._test_process.event_handler, |
| mask) |
| self.assertEqual(result, bb.event.Registered) |
| event = bb.event.OperationProgress(current=10, total=100) |
| bb.event.execute_handler("event_handler", |
| self._test_process.event_handler, |
| event, |
| None) |
| self._test_process.event_handler.assert_called_once_with(event) |
| |
| def test_fire_class_handlers(self): |
| """ Test fire_class_handlers method """ |
| mask = ["bb.event.OperationStarted"] |
| result = bb.event.register("event_handler1", |
| self._test_process.event_handler1, |
| mask) |
| self.assertEqual(result, bb.event.Registered) |
| result = bb.event.register("event_handler2", |
| self._test_process.event_handler2, |
| "*") |
| self.assertEqual(result, bb.event.Registered) |
| event1 = bb.event.OperationStarted() |
| event2 = bb.event.OperationCompleted(total=123) |
| bb.event.fire_class_handlers(event1, None) |
| bb.event.fire_class_handlers(event2, None) |
| bb.event.fire_class_handlers(event2, None) |
| expected_event_handler1 = [call(event1)] |
| expected_event_handler2 = [call(event1), |
| call(event2), |
| call(event2)] |
| self.assertEqual(self._test_process.event_handler1.call_args_list, |
| expected_event_handler1) |
| self.assertEqual(self._test_process.event_handler2.call_args_list, |
| expected_event_handler2) |
| |
| def test_change_handler_event_mapping(self): |
| """ Test changing the event mapping for class handlers """ |
| event1 = bb.event.OperationStarted() |
| event2 = bb.event.OperationCompleted(total=123) |
| |
| # register handler for all events |
| result = bb.event.register("event_handler1", |
| self._test_process.event_handler1, |
| "*") |
| self.assertEqual(result, bb.event.Registered) |
| bb.event.fire_class_handlers(event1, None) |
| bb.event.fire_class_handlers(event2, None) |
| expected = [call(event1), call(event2)] |
| self.assertEqual(self._test_process.event_handler1.call_args_list, |
| expected) |
| |
| # unregister handler and register it only for OperationStarted |
| result = bb.event.remove("event_handler1", |
| self._test_process.event_handler1) |
| mask = ["bb.event.OperationStarted"] |
| result = bb.event.register("event_handler1", |
| self._test_process.event_handler1, |
| mask) |
| self.assertEqual(result, bb.event.Registered) |
| bb.event.fire_class_handlers(event1, None) |
| bb.event.fire_class_handlers(event2, None) |
| expected = [call(event1), call(event2), call(event1)] |
| self.assertEqual(self._test_process.event_handler1.call_args_list, |
| expected) |
| |
| # unregister handler and register it only for OperationCompleted |
| result = bb.event.remove("event_handler1", |
| self._test_process.event_handler1) |
| mask = ["bb.event.OperationCompleted"] |
| result = bb.event.register("event_handler1", |
| self._test_process.event_handler1, |
| mask) |
| self.assertEqual(result, bb.event.Registered) |
| bb.event.fire_class_handlers(event1, None) |
| bb.event.fire_class_handlers(event2, None) |
| expected = [call(event1), call(event2), call(event1), call(event2)] |
| self.assertEqual(self._test_process.event_handler1.call_args_list, |
| expected) |
| |
| def test_register_UIHhandler(self): |
| """ Test register_UIHhandler method """ |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| |
| def test_UIHhandler_already_registered(self): |
| """ Test registering an UIHhandler already existing """ |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 2) |
| |
| def test_unregister_UIHhandler(self): |
| """ Test unregister_UIHhandler method """ |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| result = bb.event.unregister_UIHhandler(1) |
| self.assertIs(result, None) |
| |
| def test_fire_ui_handlers(self): |
| """ Test fire_ui_handlers method """ |
| self._test_ui1.event = Mock(spec_set=EventQueueStub) |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| self._test_ui2.event = Mock(spec_set=PickleEventQueueStub) |
| result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) |
| self.assertEqual(result, 2) |
| event1 = bb.event.OperationStarted() |
| bb.event.fire_ui_handlers(event1, None) |
| expected = [call(event1)] |
| self.assertEqual(self._test_ui1.event.send.call_args_list, |
| expected) |
| expected = [call(pickle.dumps(event1))] |
| self.assertEqual(self._test_ui2.event.sendpickle.call_args_list, |
| expected) |
| |
| def test_fire(self): |
| """ Test fire method used to trigger class and ui event handlers """ |
| mask = ["bb.event.ConfigParsed"] |
| result = bb.event.register("event_handler1", |
| self._test_process.event_handler1, |
| mask) |
| |
| self._test_ui1.event = Mock(spec_set=EventQueueStub) |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| |
| event1 = bb.event.ConfigParsed() |
| bb.event.fire(event1, None) |
| expected = [call(event1)] |
| self.assertEqual(self._test_process.event_handler1.call_args_list, |
| expected) |
| self.assertEqual(self._test_ui1.event.send.call_args_list, |
| expected) |
| |
| def test_fire_from_worker(self): |
| """ Test fire_from_worker method """ |
| self._test_ui1.event = Mock(spec_set=EventQueueStub) |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| event1 = bb.event.ConfigParsed() |
| bb.event.fire_from_worker(event1, None) |
| expected = [call(event1)] |
| self.assertEqual(self._test_ui1.event.send.call_args_list, |
| expected) |
| |
| def test_print_ui_queue(self): |
| """ Test print_ui_queue method """ |
| event1 = bb.event.OperationStarted() |
| event2 = bb.event.OperationCompleted(total=123) |
| bb.event.fire(event1, None) |
| bb.event.fire(event2, None) |
| logger = logging.getLogger("BitBake") |
| logger.addHandler(bb.event.LogHandler()) |
| logger.info("Test info LogRecord") |
| logger.warning("Test warning LogRecord") |
| with self.assertLogs("BitBake", level="INFO") as cm: |
| bb.event.print_ui_queue() |
| self.assertEqual(cm.output, |
| ["INFO:BitBake:Test info LogRecord", |
| "WARNING:BitBake:Test warning LogRecord"]) |
| |
| def _set_threadlock_test_mockups(self): |
| """ Create UI event handler mockups used in enable and disable |
| threadlock tests """ |
| def ui1_event_send(event): |
| if type(event) is bb.event.ConfigParsed: |
| self._threadlock_test_calls.append("w1_ui1") |
| if type(event) is bb.event.OperationStarted: |
| self._threadlock_test_calls.append("w2_ui1") |
| time.sleep(2) |
| |
| def ui2_event_send(event): |
| if type(event) is bb.event.ConfigParsed: |
| self._threadlock_test_calls.append("w1_ui2") |
| if type(event) is bb.event.OperationStarted: |
| self._threadlock_test_calls.append("w2_ui2") |
| time.sleep(2) |
| |
| self._threadlock_test_calls = [] |
| self._test_ui1.event = EventQueueStub() |
| self._test_ui1.event.send = ui1_event_send |
| result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) |
| self.assertEqual(result, 1) |
| self._test_ui2.event = EventQueueStub() |
| self._test_ui2.event.send = ui2_event_send |
| result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) |
| self.assertEqual(result, 2) |
| |
| def _set_and_run_threadlock_test_workers(self): |
| """ Create and run the workers used to trigger events in enable and |
| disable threadlock tests """ |
| worker1 = threading.Thread(target=self._thread_lock_test_worker1) |
| worker2 = threading.Thread(target=self._thread_lock_test_worker2) |
| worker1.start() |
| time.sleep(1) |
| worker2.start() |
| worker1.join() |
| worker2.join() |
| |
| def _thread_lock_test_worker1(self): |
| """ First worker used to fire the ConfigParsed event for enable and |
| disable threadlocks tests """ |
| bb.event.fire(bb.event.ConfigParsed(), None) |
| |
| def _thread_lock_test_worker2(self): |
| """ Second worker used to fire the OperationStarted event for enable |
| and disable threadlocks tests """ |
| bb.event.fire(bb.event.OperationStarted(), None) |
| |
| def test_enable_threadlock(self): |
| """ Test enable_threadlock method """ |
| self._set_threadlock_test_mockups() |
| bb.event.enable_threadlock() |
| self._set_and_run_threadlock_test_workers() |
| # Calls to UI handlers should be in order as all the registered |
| # handlers for the event coming from the first worker should be |
| # called before processing the event from the second worker. |
| self.assertEqual(self._threadlock_test_calls, |
| ["w1_ui1", "w1_ui2", "w2_ui1", "w2_ui2"]) |
| |
| def test_disable_threadlock(self): |
| """ Test disable_threadlock method """ |
| self._set_threadlock_test_mockups() |
| bb.event.disable_threadlock() |
| self._set_and_run_threadlock_test_workers() |
| # Calls to UI handlers should be intertwined together. Thanks to the |
| # delay in the registered handlers for the event coming from the first |
| # worker, the event coming from the second worker starts being |
| # processed before finishing handling the first worker event. |
| self.assertEqual(self._threadlock_test_calls, |
| ["w1_ui1", "w2_ui1", "w1_ui2", "w2_ui2"]) |