#!/usr/bin/env python3

import os
import sys
import warnings
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
from bb import fetch2
import logging
import bb
import select
import errno
import signal
import pickle
import traceback
import queue
from multiprocessing import Lock
from threading import Thread

if sys.getfilesystemencoding() != "utf-8":
    sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")

# Users shouldn't be running this code directly
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
    sys.exit(1)

profiling = False
if sys.argv[1].startswith("decafbadbad"):
    profiling = True
    try:
        import cProfile as profile
    except:
        import profile

# Unbuffer stdout to avoid log truncation in the event
# of an unorderly exit as well as to provide timely
# updates to log files for use with tail
try:
    if sys.stdout.name == '<stdout>':
        import fcntl
        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
        fl |= os.O_SYNC 
        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
except:
    pass

logger = logging.getLogger("BitBake")

worker_pipe = sys.stdout.fileno()
bb.utils.nonblockingfd(worker_pipe)
# Need to guard against multiprocessing being used in child processes
# and multiple processes trying to write to the parent at the same time
worker_pipe_lock = None

handler = bb.event.LogHandler()
logger.addHandler(handler)

if 0:
    # Code to write out a log file of all events passing through the worker
    logfilename = "/tmp/workerlogfile"
    format_str = "%(levelname)s: %(message)s"
    conlogformat = bb.msg.BBLogFormatter(format_str)
    consolelog = logging.FileHandler(logfilename)
    bb.msg.addDefaultlogFilter(consolelog)
    consolelog.setFormatter(conlogformat)
    logger.addHandler(consolelog)

worker_queue = queue.Queue()

def worker_fire(event, d):
    data = b"<event>" + pickle.dumps(event) + b"</event>"
    worker_fire_prepickled(data)

def worker_fire_prepickled(event):
    global worker_queue

    worker_queue.put(event)

#
# We can end up with write contention with the cooker, it can be trying to send commands
# and we can be trying to send event data back. Therefore use a separate thread for writing 
# back data to cooker.
#
worker_thread_exit = False

def worker_flush(worker_queue):
    worker_queue_int = b""
    global worker_pipe, worker_thread_exit

    while True:
        try:
            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
        except queue.Empty:
            pass
        while (worker_queue_int or not worker_queue.empty()):
            try:
                (_, ready, _) = select.select([], [worker_pipe], [], 1)
                if not worker_queue.empty():
                    worker_queue_int = worker_queue_int + worker_queue.get()
                written = os.write(worker_pipe, worker_queue_int)
                worker_queue_int = worker_queue_int[written:]
            except (IOError, OSError) as e:
                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
                    raise
        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
            return

worker_thread = Thread(target=worker_flush, args=(worker_queue,))
worker_thread.start()

def worker_child_fire(event, d):
    global worker_pipe
    global worker_pipe_lock

    data = b"<event>" + pickle.dumps(event) + b"</event>"
    try:
        worker_pipe_lock.acquire()
        worker_pipe.write(data)
        worker_pipe_lock.release()
    except IOError:
        sigterm_handler(None, None)
        raise

bb.event.worker_fire = worker_fire

lf = None
#lf = open("/tmp/workercommandlog", "w+")
def workerlog_write(msg):
    if lf:
        lf.write(msg)
        lf.flush()

def sigterm_handler(signum, frame):
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    os.killpg(0, signal.SIGTERM)
    sys.exit()

def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
    # We need to setup the environment BEFORE the fork, since
    # a fork() or exec*() activates PSEUDO...

    envbackup = {}
    fakeenv = {}
    umask = None

    taskdep = workerdata["taskdeps"][fn]
    if 'umask' in taskdep and taskname in taskdep['umask']:
        # umask might come in as a number or text string..
        try:
             umask = int(taskdep['umask'][taskname],8)
        except TypeError:
             umask = taskdep['umask'][taskname]

    dry_run = cfg.dry_run or dry_run_exec

    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
        envvars = (workerdata["fakerootenv"][fn] or "").split()
        for key, value in (var.split('=') for var in envvars):
            envbackup[key] = os.environ.get(key)
            os.environ[key] = value
            fakeenv[key] = value

        fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
        for p in fakedirs:
            bb.utils.mkdirhier(p)
        logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
                        (fn, taskname, ', '.join(fakedirs)))
    else:
        envvars = (workerdata["fakerootnoenv"][fn] or "").split()
        for key, value in (var.split('=') for var in envvars):
            envbackup[key] = os.environ.get(key)
            os.environ[key] = value
            fakeenv[key] = value

    sys.stdout.flush()
    sys.stderr.flush()

    try:
        pipein, pipeout = os.pipe()
        pipein = os.fdopen(pipein, 'rb', 4096)
        pipeout = os.fdopen(pipeout, 'wb', 0)
        pid = os.fork()
    except OSError as e:
        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
        sys.exit(1)

    if pid == 0:
        def child():
            global worker_pipe
            global worker_pipe_lock
            pipein.close()

            signal.signal(signal.SIGTERM, sigterm_handler)
            # Let SIGHUP exit as SIGTERM
            signal.signal(signal.SIGHUP, sigterm_handler)
            bb.utils.signal_on_parent_exit("SIGTERM")

            # Save out the PID so that the event can include it the
            # events
            bb.event.worker_pid = os.getpid()
            bb.event.worker_fire = worker_child_fire
            worker_pipe = pipeout
            worker_pipe_lock = Lock()

            # Make the child the process group leader and ensure no
            # child process will be controlled by the current terminal
            # This ensures signals sent to the controlling terminal like Ctrl+C
            # don't stop the child processes.
            os.setsid()
            # No stdin
            newsi = os.open(os.devnull, os.O_RDWR)
            os.dup2(newsi, sys.stdin.fileno())

            if umask:
                os.umask(umask)

            try:
                bb_cache = bb.cache.NoCache(databuilder)
                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
                the_data = databuilder.mcdata[mc]
                the_data.setVar("BB_WORKERCONTEXT", "1")
                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
                if cfg.limited_deps:
                    the_data.setVar("BB_LIMITEDDEPS", "1")
                the_data.setVar("BUILDNAME", workerdata["buildname"])
                the_data.setVar("DATE", workerdata["date"])
                the_data.setVar("TIME", workerdata["time"])
                for varname, value in extraconfigdata.items():
                    the_data.setVar(varname, value)

                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
                ret = 0

                the_data = bb_cache.loadDataFull(fn, appends)
                the_data.setVar('BB_TASKHASH', taskhash)
                the_data.setVar('BB_UNIHASH', unihash)

                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))

                # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
                # successfully. We also need to unset anything from the environment which shouldn't be there 
                exports = bb.data.exported_vars(the_data)

                bb.utils.empty_environment()
                for e, v in exports:
                    os.environ[e] = v

                for e in fakeenv:
                    os.environ[e] = fakeenv[e]
                    the_data.setVar(e, fakeenv[e])
                    the_data.setVarFlag(e, 'export', "1")

                task_exports = the_data.getVarFlag(taskname, 'exports')
                if task_exports:
                    for e in task_exports.split():
                        the_data.setVarFlag(e, 'export', '1')
                        v = the_data.getVar(e)
                        if v is not None:
                            os.environ[e] = v

                if quieterrors:
                    the_data.setVarFlag(taskname, "quieterrors", "1")

            except Exception:
                if not quieterrors:
                    logger.critical(traceback.format_exc())
                os._exit(1)
            try:
                if dry_run:
                    return 0
                return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
            except:
                os._exit(1)
        if not profiling:
            os._exit(child())
        else:
            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
            prof = profile.Profile()
            try: 
                ret = profile.Profile.runcall(prof, child)
            finally:
                prof.dump_stats(profname)
                bb.utils.process_profilelog(profname)
                os._exit(ret)
    else:
        for key, value in iter(envbackup.items()):
            if value is None:
                del os.environ[key]
            else:
                os.environ[key] = value

    return pid, pipein, pipeout

class runQueueWorkerPipe():
    """
    Abstraction for a pipe between a worker thread and the worker server
    """
    def __init__(self, pipein, pipeout):
        self.input = pipein
        if pipeout:
            pipeout.close()
        bb.utils.nonblockingfd(self.input)
        self.queue = b""

    def read(self):
        start = len(self.queue)
        try:
            self.queue = self.queue + (self.input.read(102400) or b"")
        except (OSError, IOError) as e:
            if e.errno != errno.EAGAIN:
                raise

        end = len(self.queue)
        index = self.queue.find(b"</event>")
        while index != -1:
            worker_fire_prepickled(self.queue[:index+8])
            self.queue = self.queue[index+8:]
            index = self.queue.find(b"</event>")
        return (end > start)

    def close(self):
        while self.read():
            continue
        if len(self.queue) > 0:
            print("Warning, worker child left partial message: %s" % self.queue)
        self.input.close()

normalexit = False

class BitbakeWorker(object):
    def __init__(self, din):
        self.input = din
        bb.utils.nonblockingfd(self.input)
        self.queue = b""
        self.cookercfg = None
        self.databuilder = None
        self.data = None
        self.extraconfigdata = None
        self.build_pids = {}
        self.build_pipes = {}
    
        signal.signal(signal.SIGTERM, self.sigterm_exception)
        # Let SIGHUP exit as SIGTERM
        signal.signal(signal.SIGHUP, self.sigterm_exception)
        if "beef" in sys.argv[1]:
            bb.utils.set_process_name("Worker (Fakeroot)")
        else:
            bb.utils.set_process_name("Worker")

    def sigterm_exception(self, signum, stackframe):
        if signum == signal.SIGTERM:
            bb.warn("Worker received SIGTERM, shutting down...")
        elif signum == signal.SIGHUP:
            bb.warn("Worker received SIGHUP, shutting down...")
        self.handle_finishnow(None)
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        os.kill(os.getpid(), signal.SIGTERM)

    def serve(self):        
        while True:
            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
            if self.input in ready:
                try:
                    r = self.input.read()
                    if len(r) == 0:
                        # EOF on pipe, server must have terminated
                        self.sigterm_exception(signal.SIGTERM, None)
                    self.queue = self.queue + r
                except (OSError, IOError):
                    pass
            if len(self.queue):
                self.handle_item(b"cookerconfig", self.handle_cookercfg)
                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
                self.handle_item(b"workerdata", self.handle_workerdata)
                self.handle_item(b"runtask", self.handle_runtask)
                self.handle_item(b"finishnow", self.handle_finishnow)
                self.handle_item(b"ping", self.handle_ping)
                self.handle_item(b"quit", self.handle_quit)

            for pipe in self.build_pipes:
                if self.build_pipes[pipe].input in ready:
                    self.build_pipes[pipe].read()
            if len(self.build_pids):
                while self.process_waitpid():
                    continue


    def handle_item(self, item, func):
        if self.queue.startswith(b"<" + item + b">"):
            index = self.queue.find(b"</" + item + b">")
            while index != -1:
                func(self.queue[(len(item) + 2):index])
                self.queue = self.queue[(index + len(item) + 3):]
                index = self.queue.find(b"</" + item + b">")

    def handle_cookercfg(self, data):
        self.cookercfg = pickle.loads(data)
        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
        self.databuilder.parseBaseConfiguration()
        self.data = self.databuilder.data

    def handle_extraconfigdata(self, data):
        self.extraconfigdata = pickle.loads(data)

    def handle_workerdata(self, data):
        self.workerdata = pickle.loads(data)
        bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
        bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
        bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
        for mc in self.databuilder.mcdata:
            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])

    def handle_ping(self, _):
        workerlog_write("Handling ping\n")

        logger.warning("Pong from bitbake-worker!")

    def handle_quit(self, data):
        workerlog_write("Handling quit\n")

        global normalexit
        normalexit = True
        sys.exit(0)

    def handle_runtask(self, data):
        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))

        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)

        self.build_pids[pid] = task
        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)

    def process_waitpid(self):
        """
        Return none is there are no processes awaiting result collection, otherwise
        collect the process exit codes and close the information pipe.
        """
        try:
            pid, status = os.waitpid(-1, os.WNOHANG)
            if pid == 0 or os.WIFSTOPPED(status):
                return False
        except OSError:
            return False

        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))

        if os.WIFEXITED(status):
            status = os.WEXITSTATUS(status)
        elif os.WIFSIGNALED(status):
            # Per shell conventions for $?, when a process exits due to
            # a signal, we return an exit code of 128 + SIGNUM
            status = 128 + os.WTERMSIG(status)

        task = self.build_pids[pid]
        del self.build_pids[pid]

        self.build_pipes[pid].close()
        del self.build_pipes[pid]

        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")

        return True

    def handle_finishnow(self, _):
        if self.build_pids:
            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
            for k, v in iter(self.build_pids.items()):
                try:
                    os.kill(-k, signal.SIGTERM)
                    os.waitpid(-1, 0)
                except:
                    pass
        for pipe in self.build_pipes:
            self.build_pipes[pipe].read()

try:
    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
    if not profiling:
        worker.serve()
    else:
        profname = "profile-worker.log"
        prof = profile.Profile()
        try:
            profile.Profile.runcall(prof, worker.serve)
        finally:
            prof.dump_stats(profname)
            bb.utils.process_profilelog(profname)
except BaseException as e:
    if not normalexit:
        import traceback
        sys.stderr.write(traceback.format_exc())
        sys.stderr.write(str(e))

worker_thread_exit = True
worker_thread.join()

workerlog_write("exitting")
sys.exit(0)
