#
# BitBake Tests for the Event implementation (event.py)
#
# Copyright (C) 2017 Intel Corporation
#
# SPDX-License-Identifier: GPL-2.0-only
#

import collections
import importlib
import logging
import pickle
import threading
import time
import unittest
import tempfile
from unittest.mock import Mock
from unittest.mock import call

import bb
import bb.event
from bb.msg import BBLogFormatter


class EventQueueStubBase(object):
    """ Base class for EventQueueStub classes """
    def __init__(self):
        self.event_calls = []
        return

    def _store_event_data_string(self, event):
        if isinstance(event, logging.LogRecord):
            formatter = BBLogFormatter("%(levelname)s: %(message)s")
            self.event_calls.append(formatter.format(event))
        else:
            self.event_calls.append(bb.event.getName(event))
        return


class EventQueueStub(EventQueueStubBase):
    """ Class used as specification for UI event handler queue stub objects """
    def __init__(self):
        super(EventQueueStub, self).__init__()

    def send(self, event):
        super(EventQueueStub, self)._store_event_data_string(event)


class PickleEventQueueStub(EventQueueStubBase):
    """ Class used as specification for UI event handler queue stub objects
        with sendpickle method """
    def __init__(self):
        super(PickleEventQueueStub, self).__init__()

    def sendpickle(self, pickled_event):
        event = pickle.loads(pickled_event)
        super(PickleEventQueueStub, self)._store_event_data_string(event)


class UIClientStub(object):
    """ Class used as specification for UI event handler stub objects """
    def __init__(self):
        self.event = None


class EventHandlingTest(unittest.TestCase):
    """ Event handling test class """


    def setUp(self):
        self._test_process = Mock()
        ui_client1 = UIClientStub()
        ui_client2 = UIClientStub()
        self._test_ui1 = Mock(wraps=ui_client1)
        self._test_ui2 = Mock(wraps=ui_client2)
        importlib.reload(bb.event)

    def _create_test_handlers(self):
        """ Method used to create a test handler ordered dictionary """
        test_handlers = collections.OrderedDict()
        test_handlers["handler1"] = self._test_process.handler1
        test_handlers["handler2"] = self._test_process.handler2
        return test_handlers

    def test_class_handlers(self):
        """ Test set_class_handlers and get_class_handlers methods """
        test_handlers = self._create_test_handlers()
        bb.event.set_class_handlers(test_handlers)
        self.assertEqual(test_handlers,
                         bb.event.get_class_handlers())

    def test_handlers(self):
        """ Test set_handlers and get_handlers """
        test_handlers = self._create_test_handlers()
        bb.event.set_handlers(test_handlers)
        self.assertEqual(test_handlers,
                         bb.event.get_handlers())

    def test_clean_class_handlers(self):
        """ Test clean_class_handlers method """
        cleanDict = collections.OrderedDict()
        self.assertEqual(cleanDict,
                         bb.event.clean_class_handlers())

    def test_register(self):
        """ Test register method for class handlers """
        result = bb.event.register("handler", self._test_process.handler)
        self.assertEqual(result, bb.event.Registered)
        handlers_dict = bb.event.get_class_handlers()
        self.assertIn("handler", handlers_dict)

    def test_already_registered(self):
        """ Test detection of an already registed class handler """
        bb.event.register("handler", self._test_process.handler)
        handlers_dict = bb.event.get_class_handlers()
        self.assertIn("handler", handlers_dict)
        result = bb.event.register("handler", self._test_process.handler)
        self.assertEqual(result, bb.event.AlreadyRegistered)

    def test_register_from_string(self):
        """ Test register method receiving code in string """
        result = bb.event.register("string_handler", "    return True")
        self.assertEqual(result, bb.event.Registered)
        handlers_dict = bb.event.get_class_handlers()
        self.assertIn("string_handler", handlers_dict)

    def test_register_with_mask(self):
        """ Test register method with event masking """
        mask = ["bb.event.OperationStarted",
                "bb.event.OperationCompleted"]
        result = bb.event.register("event_handler",
                                   self._test_process.event_handler,
                                   mask)
        self.assertEqual(result, bb.event.Registered)
        handlers_dict = bb.event.get_class_handlers()
        self.assertIn("event_handler", handlers_dict)

    def test_remove(self):
        """ Test remove method for class handlers """
        test_handlers = self._create_test_handlers()
        bb.event.set_class_handlers(test_handlers)
        count = len(test_handlers)
        bb.event.remove("handler1", None)
        test_handlers = bb.event.get_class_handlers()
        self.assertEqual(len(test_handlers), count - 1)
        with self.assertRaises(KeyError):
            bb.event.remove("handler1", None)

    def test_execute_handler(self):
        """ Test execute_handler method for class handlers """
        mask = ["bb.event.OperationProgress"]
        result = bb.event.register("event_handler",
                                   self._test_process.event_handler,
                                   mask)
        self.assertEqual(result, bb.event.Registered)
        event = bb.event.OperationProgress(current=10, total=100)
        bb.event.execute_handler("event_handler",
                                 self._test_process.event_handler,
                                 event,
                                 None)
        self._test_process.event_handler.assert_called_once_with(event, None)

    def test_fire_class_handlers(self):
        """ Test fire_class_handlers method """
        mask = ["bb.event.OperationStarted"]
        result = bb.event.register("event_handler1",
                                   self._test_process.event_handler1,
                                   mask)
        self.assertEqual(result, bb.event.Registered)
        result = bb.event.register("event_handler2",
                                   self._test_process.event_handler2,
                                   "*")
        self.assertEqual(result, bb.event.Registered)
        event1 = bb.event.OperationStarted()
        event2 = bb.event.OperationCompleted(total=123)
        bb.event.fire_class_handlers(event1, None)
        bb.event.fire_class_handlers(event2, None)
        bb.event.fire_class_handlers(event2, None)
        expected_event_handler1 = [call(event1, None)]
        expected_event_handler2 = [call(event1, None),
                                   call(event2, None),
                                   call(event2, None)]
        self.assertEqual(self._test_process.event_handler1.call_args_list,
                         expected_event_handler1)
        self.assertEqual(self._test_process.event_handler2.call_args_list,
                         expected_event_handler2)

    def test_class_handler_filters(self):
        """ Test filters for class handlers """
        mask = ["bb.event.OperationStarted"]
        result = bb.event.register("event_handler1",
                                   self._test_process.event_handler1,
                                   mask)
        self.assertEqual(result, bb.event.Registered)
        result = bb.event.register("event_handler2",
                                   self._test_process.event_handler2,
                                   "*")
        self.assertEqual(result, bb.event.Registered)
        bb.event.set_eventfilter(
            lambda name, handler, event, d :
            name == 'event_handler2' and
            bb.event.getName(event) == "OperationStarted")
        event1 = bb.event.OperationStarted()
        event2 = bb.event.OperationCompleted(total=123)
        bb.event.fire_class_handlers(event1, None)
        bb.event.fire_class_handlers(event2, None)
        bb.event.fire_class_handlers(event2, None)
        expected_event_handler1 = []
        expected_event_handler2 = [call(event1, None)]
        self.assertEqual(self._test_process.event_handler1.call_args_list,
                         expected_event_handler1)
        self.assertEqual(self._test_process.event_handler2.call_args_list,
                         expected_event_handler2)

    def test_change_handler_event_mapping(self):
        """ Test changing the event mapping for class handlers """
        event1 = bb.event.OperationStarted()
        event2 = bb.event.OperationCompleted(total=123)

        # register handler for all events
        result = bb.event.register("event_handler1",
                                   self._test_process.event_handler1,
                                   "*")
        self.assertEqual(result, bb.event.Registered)
        bb.event.fire_class_handlers(event1, None)
        bb.event.fire_class_handlers(event2, None)
        expected = [call(event1, None), call(event2, None)]
        self.assertEqual(self._test_process.event_handler1.call_args_list,
                         expected)

        # unregister handler and register it only for OperationStarted
        bb.event.remove("event_handler1",
                        self._test_process.event_handler1)
        mask = ["bb.event.OperationStarted"]
        result = bb.event.register("event_handler1",
                                   self._test_process.event_handler1,
                                   mask)
        self.assertEqual(result, bb.event.Registered)
        bb.event.fire_class_handlers(event1, None)
        bb.event.fire_class_handlers(event2, None)
        expected = [call(event1, None), call(event2, None), call(event1, None)]
        self.assertEqual(self._test_process.event_handler1.call_args_list,
                         expected)

        # unregister handler and register it only for OperationCompleted
        bb.event.remove("event_handler1",
                        self._test_process.event_handler1)
        mask = ["bb.event.OperationCompleted"]
        result = bb.event.register("event_handler1",
                                   self._test_process.event_handler1,
                                   mask)
        self.assertEqual(result, bb.event.Registered)
        bb.event.fire_class_handlers(event1, None)
        bb.event.fire_class_handlers(event2, None)
        expected = [call(event1,None), call(event2, None), call(event1, None), call(event2, None)]
        self.assertEqual(self._test_process.event_handler1.call_args_list,
                         expected)

    def test_register_UIHhandler(self):
        """ Test register_UIHhandler method """
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)

    def test_UIHhandler_already_registered(self):
        """ Test registering an UIHhandler already existing """
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 2)

    def test_unregister_UIHhandler(self):
        """ Test unregister_UIHhandler method """
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)
        result = bb.event.unregister_UIHhandler(1)
        self.assertIs(result, None)

    def test_fire_ui_handlers(self):
        """ Test fire_ui_handlers method """
        self._test_ui1.event = Mock(spec_set=EventQueueStub)
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)
        self._test_ui2.event = Mock(spec_set=PickleEventQueueStub)
        result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
        self.assertEqual(result, 2)
        event1 = bb.event.OperationStarted()
        bb.event.fire_ui_handlers(event1, None)
        expected = [call(event1)]
        self.assertEqual(self._test_ui1.event.send.call_args_list,
                         expected)
        expected = [call(pickle.dumps(event1))]
        self.assertEqual(self._test_ui2.event.sendpickle.call_args_list,
                         expected)

    def test_ui_handler_mask_filter(self):
        """ Test filters for UI handlers """
        mask = ["bb.event.OperationStarted"]
        debug_domains = {}
        self._test_ui1.event = Mock(spec_set=EventQueueStub)
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask)
        self._test_ui2.event = Mock(spec_set=PickleEventQueueStub)
        result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
        bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask)

        event1 = bb.event.OperationStarted()
        event2 = bb.event.OperationCompleted(total=1)

        bb.event.fire_ui_handlers(event1, None)
        bb.event.fire_ui_handlers(event2, None)
        expected = [call(event1)]
        self.assertEqual(self._test_ui1.event.send.call_args_list,
                         expected)
        expected = [call(pickle.dumps(event1))]
        self.assertEqual(self._test_ui2.event.sendpickle.call_args_list,
                         expected)

    def test_ui_handler_log_filter(self):
        """ Test log filters for UI handlers """
        mask = ["*"]
        debug_domains = {'BitBake.Foo': logging.WARNING}

        self._test_ui1.event = EventQueueStub()
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask)
        self._test_ui2.event = PickleEventQueueStub()
        result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
        bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask)

        event1 = bb.event.OperationStarted()
        bb.event.fire_ui_handlers(event1, None)   # All events match

        event_log_handler = bb.event.LogHandler()
        logger = logging.getLogger("BitBake")
        logger.addHandler(event_log_handler)
        logger1 = logging.getLogger("BitBake.Foo")
        logger1.warning("Test warning LogRecord1") # Matches debug_domains level
        logger1.info("Test info LogRecord")        # Filtered out
        logger2 = logging.getLogger("BitBake.Bar")
        logger2.error("Test error LogRecord")      # Matches filter base level
        logger2.warning("Test warning LogRecord2") # Filtered out
        logger.removeHandler(event_log_handler)

        expected = ['OperationStarted',
                    'WARNING: Test warning LogRecord1',
                    'ERROR: Test error LogRecord']
        self.assertEqual(self._test_ui1.event.event_calls, expected)
        self.assertEqual(self._test_ui2.event.event_calls, expected)

    def test_fire(self):
        """ Test fire method used to trigger class and ui event handlers """
        mask = ["bb.event.ConfigParsed"]
        result = bb.event.register("event_handler1",
                                   self._test_process.event_handler1,
                                   mask)

        self._test_ui1.event = Mock(spec_set=EventQueueStub)
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)

        event1 = bb.event.ConfigParsed()
        bb.event.fire(event1, None)
        expected = [call(event1, None)]
        self.assertEqual(self._test_process.event_handler1.call_args_list,
                         expected)
        expected = [call(event1)]
        self.assertEqual(self._test_ui1.event.send.call_args_list,
                         expected)

    def test_fire_from_worker(self):
        """ Test fire_from_worker method """
        self._test_ui1.event = Mock(spec_set=EventQueueStub)
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)
        event1 = bb.event.ConfigParsed()
        bb.event.fire_from_worker(event1, None)
        expected = [call(event1)]
        self.assertEqual(self._test_ui1.event.send.call_args_list,
                         expected)

    def test_worker_fire(self):
        """ Test the triggering of bb.event.worker_fire callback """
        bb.event.worker_fire = Mock()
        event = bb.event.Event()
        bb.event.fire(event, None)
        expected = [call(event, None)]
        self.assertEqual(bb.event.worker_fire.call_args_list, expected)

    def test_print_ui_queue(self):
        """ Test print_ui_queue method """
        event1 = bb.event.OperationStarted()
        event2 = bb.event.OperationCompleted(total=123)
        bb.event.fire(event1, None)
        bb.event.fire(event2, None)
        event_log_handler = bb.event.LogHandler()
        logger = logging.getLogger("BitBake")
        logger.addHandler(event_log_handler)
        logger.info("Test info LogRecord")
        logger.warning("Test warning LogRecord")
        with self.assertLogs("BitBake", level="INFO") as cm:
            bb.event.print_ui_queue()
        logger.removeHandler(event_log_handler)
        self.assertEqual(cm.output,
                         ["INFO:BitBake:Test info LogRecord",
                          "WARNING:BitBake:Test warning LogRecord"])

    def _set_threadlock_test_mockups(self):
        """ Create UI event handler mockups used in enable and disable
            threadlock tests """
        def ui1_event_send(event):
            if type(event) is bb.event.ConfigParsed:
                self._threadlock_test_calls.append("w1_ui1")
            if type(event) is bb.event.OperationStarted:
                self._threadlock_test_calls.append("w2_ui1")
            time.sleep(2)

        def ui2_event_send(event):
            if type(event) is bb.event.ConfigParsed:
                self._threadlock_test_calls.append("w1_ui2")
            if type(event) is bb.event.OperationStarted:
                self._threadlock_test_calls.append("w2_ui2")
            time.sleep(2)

        self._threadlock_test_calls = []
        self._test_ui1.event = EventQueueStub()
        self._test_ui1.event.send = ui1_event_send
        result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
        self.assertEqual(result, 1)
        self._test_ui2.event = EventQueueStub()
        self._test_ui2.event.send = ui2_event_send
        result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
        self.assertEqual(result, 2)

    def _set_and_run_threadlock_test_workers(self):
        """ Create and run the workers used to trigger events in enable and
            disable threadlock tests """
        worker1 = threading.Thread(target=self._thread_lock_test_worker1)
        worker2 = threading.Thread(target=self._thread_lock_test_worker2)
        worker1.start()
        time.sleep(1)
        worker2.start()
        worker1.join()
        worker2.join()

    def _thread_lock_test_worker1(self):
        """ First worker used to fire the ConfigParsed event for enable and
            disable threadlocks tests """
        bb.event.fire(bb.event.ConfigParsed(), None)

    def _thread_lock_test_worker2(self):
        """ Second worker used to fire the OperationStarted event for enable
            and disable threadlocks tests """
        bb.event.fire(bb.event.OperationStarted(), None)

    def test_event_threadlock(self):
        """ Test enable_threadlock method """
        self._set_threadlock_test_mockups()
        self._set_and_run_threadlock_test_workers()
        # Calls to UI handlers should be in order as all the registered
        # handlers for the event coming from the first worker should be
        # called before processing the event from the second worker.
        self.assertEqual(self._threadlock_test_calls,
                         ["w1_ui1", "w1_ui2", "w2_ui1", "w2_ui2"])

class EventClassesTest(unittest.TestCase):
    """ Event classes test class """

    _worker_pid = 54321

    def setUp(self):
        bb.event.worker_pid = EventClassesTest._worker_pid
        self.d = bb.data.init()
        bb.parse.siggen = bb.siggen.init(self.d)

    def test_Event(self):
        """ Test the Event base class """
        event = bb.event.Event()
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_HeartbeatEvent(self):
        """ Test the HeartbeatEvent class """
        time = 10
        event = bb.event.HeartbeatEvent(time)
        self.assertEqual(event.time, time)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_OperationStarted(self):
        """ Test OperationStarted event class """
        msg = "Foo Bar"
        event = bb.event.OperationStarted(msg)
        self.assertEqual(event.msg, msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_OperationCompleted(self):
        """ Test OperationCompleted event class """
        msg = "Foo Bar"
        total = 123
        event = bb.event.OperationCompleted(total, msg)
        self.assertEqual(event.msg, msg)
        self.assertEqual(event.total, total)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_OperationProgress(self):
        """ Test OperationProgress event class """
        msg = "Foo Bar"
        total = 123
        current = 111
        event = bb.event.OperationProgress(current, total, msg)
        self.assertEqual(event.msg, msg + ": %s/%s" % (current, total))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ConfigParsed(self):
        """ Test the ConfigParsed class """
        event = bb.event.ConfigParsed()
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_MultiConfigParsed(self):
        """ Test MultiConfigParsed event class """
        mcdata = {"foobar": "Foo Bar"}
        event = bb.event.MultiConfigParsed(mcdata)
        self.assertEqual(event.mcdata, mcdata)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_RecipeEvent(self):
        """ Test RecipeEvent event base class """
        callback = lambda a: 2 * a
        event = bb.event.RecipeEvent(callback)
        self.assertEqual(event.fn(1), callback(1))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_RecipePreFinalise(self):
        """ Test RecipePreFinalise event class """
        callback = lambda a: 2 * a
        event = bb.event.RecipePreFinalise(callback)
        self.assertEqual(event.fn(1), callback(1))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_RecipeTaskPreProcess(self):
        """ Test RecipeTaskPreProcess event class """
        callback = lambda a: 2 * a
        tasklist = [("foobar", callback)]
        event = bb.event.RecipeTaskPreProcess(callback, tasklist)
        self.assertEqual(event.fn(1), callback(1))
        self.assertEqual(event.tasklist, tasklist)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_RecipeParsed(self):
        """ Test RecipeParsed event base class """
        callback = lambda a: 2 * a
        event = bb.event.RecipeParsed(callback)
        self.assertEqual(event.fn(1), callback(1))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_BuildBase(self):
        """ Test base class for bitbake build events """
        name = "foo"
        pkgs = ["bar"]
        failures = 123
        event = bb.event.BuildBase(name, pkgs, failures)
        self.assertEqual(event.name, name)
        self.assertEqual(event.pkgs, pkgs)
        self.assertEqual(event.getFailures(), failures)
        name = event.name = "bar"
        pkgs = event.pkgs = ["foo"]
        self.assertEqual(event.name, name)
        self.assertEqual(event.pkgs, pkgs)
        self.assertEqual(event.getFailures(), failures)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_BuildInit(self):
        """ Test class for bitbake build invocation events """
        event = bb.event.BuildInit()
        self.assertEqual(event.name, None)
        self.assertEqual(event.pkgs, [])
        self.assertEqual(event.getFailures(), 0)
        name = event.name = "bar"
        pkgs = event.pkgs = ["foo"]
        self.assertEqual(event.name, name)
        self.assertEqual(event.pkgs, pkgs)
        self.assertEqual(event.getFailures(), 0)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_BuildStarted(self):
        """ Test class for build started events """
        name = "foo"
        pkgs = ["bar"]
        failures = 123
        event = bb.event.BuildStarted(name, pkgs, failures)
        self.assertEqual(event.name, name)
        self.assertEqual(event.pkgs, pkgs)
        self.assertEqual(event.getFailures(), failures)
        self.assertEqual(event.msg, "Building Started")
        name = event.name = "bar"
        pkgs = event.pkgs = ["foo"]
        msg = event.msg = "foobar"
        self.assertEqual(event.name, name)
        self.assertEqual(event.pkgs, pkgs)
        self.assertEqual(event.getFailures(), failures)
        self.assertEqual(event.msg, msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_BuildCompleted(self):
        """ Test class for build completed events """
        total = 1000
        name = "foo"
        pkgs = ["bar"]
        failures = 123
        interrupted = 1
        event = bb.event.BuildCompleted(total, name, pkgs, failures,
                                        interrupted)
        self.assertEqual(event.name, name)
        self.assertEqual(event.pkgs, pkgs)
        self.assertEqual(event.getFailures(), failures)
        self.assertEqual(event.msg, "Building Failed")
        event2 = bb.event.BuildCompleted(total, name, pkgs)
        self.assertEqual(event2.name, name)
        self.assertEqual(event2.pkgs, pkgs)
        self.assertEqual(event2.getFailures(), 0)
        self.assertEqual(event2.msg, "Building Succeeded")
        self.assertEqual(event2.pid, EventClassesTest._worker_pid)

    def test_DiskFull(self):
        """ Test DiskFull event class """
        dev = "/dev/foo"
        type = "ext4"
        freespace = "104M"
        mountpoint = "/"
        event = bb.event.DiskFull(dev, type, freespace, mountpoint)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_MonitorDiskEvent(self):
        """ Test MonitorDiskEvent class """
        available_bytes = 10000000
        free_bytes = 90000000
        total_bytes = 1000000000
        du = bb.event.DiskUsageSample(available_bytes, free_bytes,
                                      total_bytes)
        event = bb.event.MonitorDiskEvent(du)
        self.assertEqual(event.disk_usage.available_bytes, available_bytes)
        self.assertEqual(event.disk_usage.free_bytes, free_bytes)
        self.assertEqual(event.disk_usage.total_bytes, total_bytes)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_NoProvider(self):
        """ Test NoProvider event class """
        item = "foobar"
        event1 = bb.event.NoProvider(item)
        self.assertEqual(event1.getItem(), item)
        self.assertEqual(event1.isRuntime(), False)
        self.assertEqual(str(event1), "Nothing PROVIDES 'foobar'")
        runtime = True
        dependees = ["foo", "bar"]
        reasons = None
        close_matches = ["foibar", "footbar"]
        event2 = bb.event.NoProvider(item, runtime, dependees, reasons,
                                     close_matches)
        self.assertEqual(event2.isRuntime(), True)
        expected = ("Nothing RPROVIDES 'foobar' (but foo, bar RDEPENDS"
                    " on or otherwise requires it). Close matches:\n"
                    "  foibar\n"
                    "  footbar")
        self.assertEqual(str(event2), expected)
        reasons = ["Item does not exist on database"]
        close_matches = ["foibar", "footbar"]
        event3 = bb.event.NoProvider(item, runtime, dependees, reasons,
                                     close_matches)
        expected = ("Nothing RPROVIDES 'foobar' (but foo, bar RDEPENDS"
                    " on or otherwise requires it)\n"
                    "Item does not exist on database")
        self.assertEqual(str(event3), expected)
        self.assertEqual(event3.pid, EventClassesTest._worker_pid)

    def test_MultipleProviders(self):
        """ Test MultipleProviders event class """
        item = "foobar"
        candidates = ["foobarv1", "foobars"]
        event1 = bb.event.MultipleProviders(item, candidates)
        self.assertEqual(event1.isRuntime(), False)
        self.assertEqual(event1.getItem(), item)
        self.assertEqual(event1.getCandidates(), candidates)
        expected = ("Multiple providers are available for foobar (foobarv1,"
                    " foobars)\n"
                    "Consider defining a PREFERRED_PROVIDER entry to match "
                    "foobar")
        self.assertEqual(str(event1), expected)
        runtime = True
        event2 = bb.event.MultipleProviders(item, candidates, runtime)
        self.assertEqual(event2.isRuntime(), runtime)
        expected = ("Multiple providers are available for runtime foobar "
                    "(foobarv1, foobars)\n"
                    "Consider defining a PREFERRED_RPROVIDER entry to match "
                    "foobar")
        self.assertEqual(str(event2), expected)
        self.assertEqual(event2.pid, EventClassesTest._worker_pid)

    def test_ParseStarted(self):
        """ Test ParseStarted event class """
        total = 123
        event = bb.event.ParseStarted(total)
        self.assertEqual(event.msg, "Recipe parsing Started")
        self.assertEqual(event.total, total)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ParseCompleted(self):
        """ Test ParseCompleted event class """
        cached = 10
        parsed = 13
        skipped = 7
        virtuals = 2
        masked = 1
        errors = 0
        total = 23
        event = bb.event.ParseCompleted(cached, parsed, skipped, masked,
                                        virtuals, errors, total)
        self.assertEqual(event.msg, "Recipe parsing Completed")
        expected = [cached, parsed, skipped, virtuals, masked, errors,
                    cached + parsed, total]
        actual = [event.cached, event.parsed, event.skipped, event.virtuals,
                  event.masked, event.errors, event.sofar, event.total]
        self.assertEqual(str(actual), str(expected))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ParseProgress(self):
        """ Test ParseProgress event class """
        current = 10
        total = 100
        event = bb.event.ParseProgress(current, total)
        self.assertEqual(event.msg,
                         "Recipe parsing" + ": %s/%s" % (current, total))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_CacheLoadStarted(self):
        """ Test CacheLoadStarted event class """
        total = 123
        event = bb.event.CacheLoadStarted(total)
        self.assertEqual(event.msg, "Loading cache Started")
        self.assertEqual(event.total, total)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_CacheLoadProgress(self):
        """ Test CacheLoadProgress event class """
        current = 10
        total = 100
        event = bb.event.CacheLoadProgress(current, total)
        self.assertEqual(event.msg,
                         "Loading cache" + ": %s/%s" % (current, total))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_CacheLoadCompleted(self):
        """ Test CacheLoadCompleted event class """
        total = 23
        num_entries = 12
        event = bb.event.CacheLoadCompleted(total, num_entries)
        self.assertEqual(event.msg, "Loading cache Completed")
        expected = [total, num_entries]
        actual = [event.total, event.num_entries]
        self.assertEqual(str(actual), str(expected))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_TreeDataPreparationStarted(self):
        """ Test TreeDataPreparationStarted event class """
        event = bb.event.TreeDataPreparationStarted()
        self.assertEqual(event.msg, "Preparing tree data Started")
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_TreeDataPreparationProgress(self):
        """ Test TreeDataPreparationProgress event class """
        current = 10
        total = 100
        event = bb.event.TreeDataPreparationProgress(current, total)
        self.assertEqual(event.msg,
                         "Preparing tree data" + ": %s/%s" % (current, total))
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_TreeDataPreparationCompleted(self):
        """ Test TreeDataPreparationCompleted event class """
        total = 23
        event = bb.event.TreeDataPreparationCompleted(total)
        self.assertEqual(event.msg, "Preparing tree data Completed")
        self.assertEqual(event.total, total)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_DepTreeGenerated(self):
        """ Test DepTreeGenerated event class """
        depgraph = Mock()
        event = bb.event.DepTreeGenerated(depgraph)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_TargetsTreeGenerated(self):
        """ Test TargetsTreeGenerated event class """
        model = Mock()
        event = bb.event.TargetsTreeGenerated(model)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ReachableStamps(self):
        """ Test ReachableStamps event class """
        stamps = [Mock(), Mock()]
        event = bb.event.ReachableStamps(stamps)
        self.assertEqual(event.stamps, stamps)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_FilesMatchingFound(self):
        """ Test FilesMatchingFound event class """
        pattern = "foo.*bar"
        matches = ["foobar"]
        event = bb.event.FilesMatchingFound(pattern, matches)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ConfigFilesFound(self):
        """ Test ConfigFilesFound event class """
        variable = "FOO_BAR"
        values = ["foo", "bar"]
        event = bb.event.ConfigFilesFound(variable, values)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ConfigFilePathFound(self):
        """ Test ConfigFilePathFound event class """
        path = "/foo/bar"
        event = bb.event.ConfigFilePathFound(path)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_message_classes(self):
        """ Test message event classes """
        msg = "foobar foo bar"
        event = bb.event.MsgBase(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)
        event = bb.event.MsgDebug(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)
        event = bb.event.MsgNote(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)
        event = bb.event.MsgWarn(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)
        event = bb.event.MsgError(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)
        event = bb.event.MsgFatal(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)
        event = bb.event.MsgPlain(msg)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_LogExecTTY(self):
        """ Test LogExecTTY event class """
        msg = "foo bar"
        prog = "foo.sh"
        sleep_delay = 10
        retries = 3
        event = bb.event.LogExecTTY(msg, prog, sleep_delay, retries)
        self.assertEqual(event.msg, msg)
        self.assertEqual(event.prog, prog)
        self.assertEqual(event.sleep_delay, sleep_delay)
        self.assertEqual(event.retries, retries)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def _throw_zero_division_exception(self):
        a = 1 / 0
        return

    def _worker_handler(self, event, d):
        self._returned_event = event
        return

    def test_LogHandler(self):
        """ Test LogHandler class """
        logger = logging.getLogger("TestEventClasses")
        logger.propagate = False
        handler = bb.event.LogHandler(logging.INFO)
        logger.addHandler(handler)
        bb.event.worker_fire = self._worker_handler
        try:
            self._throw_zero_division_exception()
        except ZeroDivisionError as ex:
            logger.exception(ex)
        event = self._returned_event
        try:
            pe = pickle.dumps(event)
            newevent = pickle.loads(pe)
        except:
            self.fail('Logged event is not serializable')
        self.assertEqual(event.taskpid, EventClassesTest._worker_pid)

    def test_MetadataEvent(self):
        """ Test MetadataEvent class """
        eventtype = "footype"
        eventdata = {"foo": "bar"}
        event = bb.event.MetadataEvent(eventtype, eventdata)
        self.assertEqual(event.type, eventtype)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ProcessStarted(self):
        """ Test ProcessStarted class """
        processname = "foo"
        total = 9783128974
        event = bb.event.ProcessStarted(processname, total)
        self.assertEqual(event.processname, processname)
        self.assertEqual(event.total, total)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ProcessProgress(self):
        """ Test ProcessProgress class """
        processname = "foo"
        progress = 243224
        event = bb.event.ProcessProgress(processname, progress)
        self.assertEqual(event.processname, processname)
        self.assertEqual(event.progress, progress)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_ProcessFinished(self):
        """ Test ProcessFinished class """
        processname = "foo"
        total = 1242342344
        event = bb.event.ProcessFinished(processname)
        self.assertEqual(event.processname, processname)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_SanityCheck(self):
        """ Test SanityCheck class """
        event1 = bb.event.SanityCheck()
        self.assertEqual(event1.generateevents, True)
        self.assertEqual(event1.pid, EventClassesTest._worker_pid)
        generateevents = False
        event2 = bb.event.SanityCheck(generateevents)
        self.assertEqual(event2.generateevents, generateevents)
        self.assertEqual(event2.pid, EventClassesTest._worker_pid)

    def test_SanityCheckPassed(self):
        """ Test SanityCheckPassed class """
        event = bb.event.SanityCheckPassed()
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_SanityCheckFailed(self):
        """ Test SanityCheckFailed class """
        msg = "The sanity test failed."
        event1 = bb.event.SanityCheckFailed(msg)
        self.assertEqual(event1.pid, EventClassesTest._worker_pid)
        network_error = True
        event2 = bb.event.SanityCheckFailed(msg, network_error)
        self.assertEqual(event2.pid, EventClassesTest._worker_pid)

    def test_network_event_classes(self):
        """ Test network event classes """
        event1 = bb.event.NetworkTest()
        generateevents = False
        self.assertEqual(event1.pid, EventClassesTest._worker_pid)
        event2 = bb.event.NetworkTest(generateevents)
        self.assertEqual(event2.pid, EventClassesTest._worker_pid)
        event3 = bb.event.NetworkTestPassed()
        self.assertEqual(event3.pid, EventClassesTest._worker_pid)
        event4 = bb.event.NetworkTestFailed()
        self.assertEqual(event4.pid, EventClassesTest._worker_pid)

    def test_FindSigInfoResult(self):
        """ Test FindSigInfoResult event class """
        result = [Mock()]
        event = bb.event.FindSigInfoResult(result)
        self.assertEqual(event.result, result)
        self.assertEqual(event.pid, EventClassesTest._worker_pid)

    def test_lineno_in_eventhandler(self):
        # The error lineno is 5, not 4 since the first line is '\n'
        error_line = """
# Comment line1
# Comment line2
python test_lineno_in_eventhandler() {
    This is an error line
}
addhandler test_lineno_in_eventhandler
test_lineno_in_eventhandler[eventmask] = "bb.event.ConfigParsed"
"""

        with self.assertLogs() as logs:
            f = tempfile.NamedTemporaryFile(suffix = '.bb')
            f.write(bytes(error_line, "utf-8"))
            f.flush()
            d = bb.parse.handle(f.name, self.d)['']

        output = "".join(logs.output)
        self.assertTrue(" line 5\n" in output)
