Squashed 'import-layers/yocto-poky/' changes from dc8508f6099..67491b0c104

Yocto 2.2.2 (Morty)

Change-Id: Id9a452e28940d9f166957de243d9cb1d8818704e
git-subtree-dir: import-layers/yocto-poky
git-subtree-split: 67491b0c104101bb9f366d697edd23c895be4302
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
diff --git a/import-layers/yocto-poky/bitbake/bin/bitbake-worker b/import-layers/yocto-poky/bitbake/bin/bitbake-worker
index 500f2ad..db3c4b1 100755
--- a/import-layers/yocto-poky/bitbake/bin/bitbake-worker
+++ b/import-layers/yocto-poky/bitbake/bin/bitbake-worker
@@ -11,7 +11,10 @@
 import errno
 import signal
 import pickle
+import traceback
+import queue
 from multiprocessing import Lock
+from threading import Thread
 
 if sys.getfilesystemencoding() != "utf-8":
     sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
@@ -63,7 +66,7 @@
     consolelog.setFormatter(conlogformat)
     logger.addHandler(consolelog)
 
-worker_queue = b""
+worker_queue = queue.Queue()
 
 def worker_fire(event, d):
     data = b"<event>" + pickle.dumps(event) + b"</event>"
@@ -72,21 +75,38 @@
 def worker_fire_prepickled(event):
     global worker_queue
 
-    worker_queue = worker_queue + event
-    worker_flush()
+    worker_queue.put(event)
 
-def worker_flush():
-    global worker_queue, worker_pipe
+#
+# We can end up with write contention with the cooker, it can be trying to send commands
+# and we can be trying to send event data back. Therefore use a separate thread for writing 
+# back data to cooker.
+#
+worker_thread_exit = False
 
-    if not worker_queue:
-        return
+def worker_flush(worker_queue):
+    worker_queue_int = b""
+    global worker_pipe, worker_thread_exit
 
-    try:
-        written = os.write(worker_pipe, worker_queue)
-        worker_queue = worker_queue[written:]
-    except (IOError, OSError) as e:
-        if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
-            raise
+    while True:
+        try:
+            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
+        except queue.Empty:
+            pass
+        while (worker_queue_int or not worker_queue.empty()):
+            try:
+                if not worker_queue.empty():
+                    worker_queue_int = worker_queue_int + worker_queue.get()
+                written = os.write(worker_pipe, worker_queue_int)
+                worker_queue_int = worker_queue_int[written:]
+            except (IOError, OSError) as e:
+                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
+                    raise
+        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
+            return
+
+worker_thread = Thread(target=worker_flush, args=(worker_queue,))
+worker_thread.start()
 
 def worker_child_fire(event, d):
     global worker_pipe
@@ -234,9 +254,9 @@
                 if quieterrors:
                     the_data.setVarFlag(taskname, "quieterrors", "1")
 
-            except Exception as exc:
+            except Exception:
                 if not quieterrors:
-                    logger.critical(str(exc))
+                    logger.critical(traceback.format_exc())
                 os._exit(1)
             try:
                 if cfg.dry_run:
@@ -352,7 +372,6 @@
                 self.build_pipes[pipe].read()
             if len(self.build_pids):
                 self.process_waitpid()
-            worker_flush()
 
 
     def handle_item(self, item, func):
@@ -457,8 +476,10 @@
         import traceback
         sys.stderr.write(traceback.format_exc())
         sys.stderr.write(str(e))
-while len(worker_queue):
-    worker_flush()
+
+worker_thread_exit = True
+worker_thread.join()
+
 workerlog_write("exitting")
 sys.exit(0)