Squashed 'import-layers/yocto-poky/' changes from dc8508f6099..67491b0c104
Yocto 2.2.2 (Morty)
Change-Id: Id9a452e28940d9f166957de243d9cb1d8818704e
git-subtree-dir: import-layers/yocto-poky
git-subtree-split: 67491b0c104101bb9f366d697edd23c895be4302
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
diff --git a/import-layers/yocto-poky/bitbake/bin/bitbake-worker b/import-layers/yocto-poky/bitbake/bin/bitbake-worker
index 500f2ad..db3c4b1 100755
--- a/import-layers/yocto-poky/bitbake/bin/bitbake-worker
+++ b/import-layers/yocto-poky/bitbake/bin/bitbake-worker
@@ -11,7 +11,10 @@
import errno
import signal
import pickle
+import traceback
+import queue
from multiprocessing import Lock
+from threading import Thread
if sys.getfilesystemencoding() != "utf-8":
sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
@@ -63,7 +66,7 @@
consolelog.setFormatter(conlogformat)
logger.addHandler(consolelog)
-worker_queue = b""
+worker_queue = queue.Queue()
def worker_fire(event, d):
data = b"<event>" + pickle.dumps(event) + b"</event>"
@@ -72,21 +75,38 @@
def worker_fire_prepickled(event):
global worker_queue
- worker_queue = worker_queue + event
- worker_flush()
+ worker_queue.put(event)
-def worker_flush():
- global worker_queue, worker_pipe
+#
+# We can end up with write contention with the cooker, it can be trying to send commands
+# and we can be trying to send event data back. Therefore use a separate thread for writing
+# back data to cooker.
+#
+worker_thread_exit = False
- if not worker_queue:
- return
+def worker_flush(worker_queue):
+ worker_queue_int = b""
+ global worker_pipe, worker_thread_exit
- try:
- written = os.write(worker_pipe, worker_queue)
- worker_queue = worker_queue[written:]
- except (IOError, OSError) as e:
- if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
- raise
+ while True:
+ try:
+ worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
+ except queue.Empty:
+ pass
+ while (worker_queue_int or not worker_queue.empty()):
+ try:
+ if not worker_queue.empty():
+ worker_queue_int = worker_queue_int + worker_queue.get()
+ written = os.write(worker_pipe, worker_queue_int)
+ worker_queue_int = worker_queue_int[written:]
+ except (IOError, OSError) as e:
+ if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
+ raise
+ if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
+ return
+
+worker_thread = Thread(target=worker_flush, args=(worker_queue,))
+worker_thread.start()
def worker_child_fire(event, d):
global worker_pipe
@@ -234,9 +254,9 @@
if quieterrors:
the_data.setVarFlag(taskname, "quieterrors", "1")
- except Exception as exc:
+ except Exception:
if not quieterrors:
- logger.critical(str(exc))
+ logger.critical(traceback.format_exc())
os._exit(1)
try:
if cfg.dry_run:
@@ -352,7 +372,6 @@
self.build_pipes[pipe].read()
if len(self.build_pids):
self.process_waitpid()
- worker_flush()
def handle_item(self, item, func):
@@ -457,8 +476,10 @@
import traceback
sys.stderr.write(traceback.format_exc())
sys.stderr.write(str(e))
-while len(worker_queue):
- worker_flush()
+
+worker_thread_exit = True
+worker_thread.join()
+
workerlog_write("exitting")
sys.exit(0)