blob: 2b71eed06ea7b47cbe307eb2a00f3bac04ab8cad [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import re
34import bb
35from bb import msg, data, event
36from bb import monitordisk
37import subprocess
38
39try:
40 import cPickle as pickle
41except ImportError:
42 import pickle
43
44bblogger = logging.getLogger("BitBake")
45logger = logging.getLogger("BitBake.RunQueue")
46
47__find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' )
48
49class RunQueueStats:
50 """
51 Holds statistics on the tasks handled by the associated runQueue
52 """
53 def __init__(self, total):
54 self.completed = 0
55 self.skipped = 0
56 self.failed = 0
57 self.active = 0
58 self.total = total
59
60 def copy(self):
61 obj = self.__class__(self.total)
62 obj.__dict__.update(self.__dict__)
63 return obj
64
65 def taskFailed(self):
66 self.active = self.active - 1
67 self.failed = self.failed + 1
68
69 def taskCompleted(self, number = 1):
70 self.active = self.active - number
71 self.completed = self.completed + number
72
73 def taskSkipped(self, number = 1):
74 self.active = self.active + number
75 self.skipped = self.skipped + number
76
77 def taskActive(self):
78 self.active = self.active + 1
79
80# These values indicate the next step due to be run in the
81# runQueue state machine
82runQueuePrepare = 2
83runQueueSceneInit = 3
84runQueueSceneRun = 4
85runQueueRunInit = 5
86runQueueRunning = 6
87runQueueFailed = 7
88runQueueCleanUp = 8
89runQueueComplete = 9
90
91class RunQueueScheduler(object):
92 """
93 Control the order tasks are scheduled in.
94 """
95 name = "basic"
96
97 def __init__(self, runqueue, rqdata):
98 """
99 The default scheduler just returns the first buildable task (the
100 priority map is sorted by task number)
101 """
102 self.rq = runqueue
103 self.rqdata = rqdata
104 self.numTasks = len(self.rqdata.runq_fnid)
105
106 self.prio_map = []
107 self.prio_map.extend(range(self.numTasks))
108
109 self.buildable = []
110 self.stamps = {}
111 for taskid in xrange(self.numTasks):
112 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
113 taskname = self.rqdata.runq_task[taskid]
114 self.stamps[taskid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
115 if self.rq.runq_buildable[taskid] == 1:
116 self.buildable.append(taskid)
117
118 self.rev_prio_map = None
119
120 def next_buildable_task(self):
121 """
122 Return the id of the first task we find that is buildable
123 """
124 self.buildable = [x for x in self.buildable if not self.rq.runq_running[x] == 1]
125 if not self.buildable:
126 return None
127 if len(self.buildable) == 1:
128 taskid = self.buildable[0]
129 stamp = self.stamps[taskid]
130 if stamp not in self.rq.build_stamps.itervalues():
131 return taskid
132
133 if not self.rev_prio_map:
134 self.rev_prio_map = range(self.numTasks)
135 for taskid in xrange(self.numTasks):
136 self.rev_prio_map[self.prio_map[taskid]] = taskid
137
138 best = None
139 bestprio = None
140 for taskid in self.buildable:
141 prio = self.rev_prio_map[taskid]
142 if bestprio is None or bestprio > prio:
143 stamp = self.stamps[taskid]
144 if stamp in self.rq.build_stamps.itervalues():
145 continue
146 bestprio = prio
147 best = taskid
148
149 return best
150
151 def next(self):
152 """
153 Return the id of the task we should build next
154 """
155 if self.rq.stats.active < self.rq.number_tasks:
156 return self.next_buildable_task()
157
158 def newbuilable(self, task):
159 self.buildable.append(task)
160
161class RunQueueSchedulerSpeed(RunQueueScheduler):
162 """
163 A scheduler optimised for speed. The priority map is sorted by task weight,
164 heavier weighted tasks (tasks needed by the most other tasks) are run first.
165 """
166 name = "speed"
167
168 def __init__(self, runqueue, rqdata):
169 """
170 The priority map is sorted by task weight.
171 """
172 RunQueueScheduler.__init__(self, runqueue, rqdata)
173
174 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
175 copyweight = copy.deepcopy(self.rqdata.runq_weight)
176 self.prio_map = []
177
178 for weight in sortweight:
179 idx = copyweight.index(weight)
180 self.prio_map.append(idx)
181 copyweight[idx] = -1
182
183 self.prio_map.reverse()
184
185class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
186 """
187 A scheduler optimised to complete .bb files are quickly as possible. The
188 priority map is sorted by task weight, but then reordered so once a given
189 .bb file starts to build, it's completed as quickly as possible. This works
190 well where disk space is at a premium and classes like OE's rm_work are in
191 force.
192 """
193 name = "completion"
194
195 def __init__(self, runqueue, rqdata):
196 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
197
198 #FIXME - whilst this groups all fnids together it does not reorder the
199 #fnid groups optimally.
200
201 basemap = copy.deepcopy(self.prio_map)
202 self.prio_map = []
203 while (len(basemap) > 0):
204 entry = basemap.pop(0)
205 self.prio_map.append(entry)
206 fnid = self.rqdata.runq_fnid[entry]
207 todel = []
208 for entry in basemap:
209 entry_fnid = self.rqdata.runq_fnid[entry]
210 if entry_fnid == fnid:
211 todel.append(basemap.index(entry))
212 self.prio_map.append(entry)
213 todel.reverse()
214 for idx in todel:
215 del basemap[idx]
216
217class RunQueueData:
218 """
219 BitBake Run Queue implementation
220 """
221 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
222 self.cooker = cooker
223 self.dataCache = dataCache
224 self.taskData = taskData
225 self.targets = targets
226 self.rq = rq
227 self.warn_multi_bb = False
228
229 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
230 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
231
232 self.reset()
233
234 def reset(self):
235 self.runq_fnid = []
236 self.runq_task = []
237 self.runq_depends = []
238 self.runq_revdeps = []
239 self.runq_hash = []
240
241 def runq_depends_names(self, ids):
242 import re
243 ret = []
244 for id in self.runq_depends[ids]:
245 nam = os.path.basename(self.get_user_idstring(id))
246 nam = re.sub("_[^,]*,", ",", nam)
247 ret.extend([nam])
248 return ret
249
250 def get_task_name(self, task):
251 return self.runq_task[task]
252
253 def get_task_file(self, task):
254 return self.taskData.fn_index[self.runq_fnid[task]]
255
256 def get_task_hash(self, task):
257 return self.runq_hash[task]
258
259 def get_user_idstring(self, task, task_name_suffix = ""):
260 fn = self.taskData.fn_index[self.runq_fnid[task]]
261 taskname = self.runq_task[task] + task_name_suffix
262 return "%s, %s" % (fn, taskname)
263
264 def get_task_id(self, fnid, taskname):
265 for listid in xrange(len(self.runq_fnid)):
266 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
267 return listid
268 return None
269
270 def circular_depchains_handler(self, tasks):
271 """
272 Some tasks aren't buildable, likely due to circular dependency issues.
273 Identify the circular dependencies and print them in a user readable format.
274 """
275 from copy import deepcopy
276
277 valid_chains = []
278 explored_deps = {}
279 msgs = []
280
281 def chain_reorder(chain):
282 """
283 Reorder a dependency chain so the lowest task id is first
284 """
285 lowest = 0
286 new_chain = []
287 for entry in xrange(len(chain)):
288 if chain[entry] < chain[lowest]:
289 lowest = entry
290 new_chain.extend(chain[lowest:])
291 new_chain.extend(chain[:lowest])
292 return new_chain
293
294 def chain_compare_equal(chain1, chain2):
295 """
296 Compare two dependency chains and see if they're the same
297 """
298 if len(chain1) != len(chain2):
299 return False
300 for index in xrange(len(chain1)):
301 if chain1[index] != chain2[index]:
302 return False
303 return True
304
305 def chain_array_contains(chain, chain_array):
306 """
307 Return True if chain_array contains chain
308 """
309 for ch in chain_array:
310 if chain_compare_equal(ch, chain):
311 return True
312 return False
313
314 def find_chains(taskid, prev_chain):
315 prev_chain.append(taskid)
316 total_deps = []
317 total_deps.extend(self.runq_revdeps[taskid])
318 for revdep in self.runq_revdeps[taskid]:
319 if revdep in prev_chain:
320 idx = prev_chain.index(revdep)
321 # To prevent duplicates, reorder the chain to start with the lowest taskid
322 # and search through an array of those we've already printed
323 chain = prev_chain[idx:]
324 new_chain = chain_reorder(chain)
325 if not chain_array_contains(new_chain, valid_chains):
326 valid_chains.append(new_chain)
327 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
328 for dep in new_chain:
329 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
330 msgs.append("\n")
331 if len(valid_chains) > 10:
332 msgs.append("Aborted dependency loops search after 10 matches.\n")
333 return msgs
334 continue
335 scan = False
336 if revdep not in explored_deps:
337 scan = True
338 elif revdep in explored_deps[revdep]:
339 scan = True
340 else:
341 for dep in prev_chain:
342 if dep in explored_deps[revdep]:
343 scan = True
344 if scan:
345 find_chains(revdep, copy.deepcopy(prev_chain))
346 for dep in explored_deps[revdep]:
347 if dep not in total_deps:
348 total_deps.append(dep)
349
350 explored_deps[taskid] = total_deps
351
352 for task in tasks:
353 find_chains(task, [])
354
355 return msgs
356
357 def calculate_task_weights(self, endpoints):
358 """
359 Calculate a number representing the "weight" of each task. Heavier weighted tasks
360 have more dependencies and hence should be executed sooner for maximum speed.
361
362 This function also sanity checks the task list finding tasks that are not
363 possible to execute due to circular dependencies.
364 """
365
366 numTasks = len(self.runq_fnid)
367 weight = []
368 deps_left = []
369 task_done = []
370
371 for listid in xrange(numTasks):
372 task_done.append(False)
373 weight.append(1)
374 deps_left.append(len(self.runq_revdeps[listid]))
375
376 for listid in endpoints:
377 weight[listid] = 10
378 task_done[listid] = True
379
380 while True:
381 next_points = []
382 for listid in endpoints:
383 for revdep in self.runq_depends[listid]:
384 weight[revdep] = weight[revdep] + weight[listid]
385 deps_left[revdep] = deps_left[revdep] - 1
386 if deps_left[revdep] == 0:
387 next_points.append(revdep)
388 task_done[revdep] = True
389 endpoints = next_points
390 if len(next_points) == 0:
391 break
392
393 # Circular dependency sanity check
394 problem_tasks = []
395 for task in xrange(numTasks):
396 if task_done[task] is False or deps_left[task] != 0:
397 problem_tasks.append(task)
398 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
399 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
400
401 if problem_tasks:
402 message = "Unbuildable tasks were found.\n"
403 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
404 message = message + "Identifying dependency loops (this may take a short while)...\n"
405 logger.error(message)
406
407 msgs = self.circular_depchains_handler(problem_tasks)
408
409 message = "\n"
410 for msg in msgs:
411 message = message + msg
412 bb.msg.fatal("RunQueue", message)
413
414 return weight
415
416 def prepare(self):
417 """
418 Turn a set of taskData into a RunQueue and compute data needed
419 to optimise the execution order.
420 """
421
422 runq_build = []
423 recursivetasks = {}
424 recursiveitasks = {}
425 recursivetasksselfref = set()
426
427 taskData = self.taskData
428
429 if len(taskData.tasks_name) == 0:
430 # Nothing to do
431 return 0
432
433 logger.info("Preparing RunQueue")
434
435 # Step A - Work out a list of tasks to run
436 #
437 # Taskdata gives us a list of possible providers for every build and run
438 # target ordered by priority. It also gives information on each of those
439 # providers.
440 #
441 # To create the actual list of tasks to execute we fix the list of
442 # providers and then resolve the dependencies into task IDs. This
443 # process is repeated for each type of dependency (tdepends, deptask,
444 # rdeptast, recrdeptask, idepends).
445
446 def add_build_dependencies(depids, tasknames, depends):
447 for depid in depids:
448 # Won't be in build_targets if ASSUME_PROVIDED
449 if depid not in taskData.build_targets:
450 continue
451 depdata = taskData.build_targets[depid][0]
452 if depdata is None:
453 continue
454 for taskname in tasknames:
455 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
456 if taskid is not None:
457 depends.add(taskid)
458
459 def add_runtime_dependencies(depids, tasknames, depends):
460 for depid in depids:
461 if depid not in taskData.run_targets:
462 continue
463 depdata = taskData.run_targets[depid][0]
464 if depdata is None:
465 continue
466 for taskname in tasknames:
467 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
468 if taskid is not None:
469 depends.add(taskid)
470
471 def add_resolved_dependencies(depids, tasknames, depends):
472 for depid in depids:
473 for taskname in tasknames:
474 taskid = taskData.gettask_id_fromfnid(depid, taskname)
475 if taskid is not None:
476 depends.add(taskid)
477
478 for task in xrange(len(taskData.tasks_name)):
479 depends = set()
480 fnid = taskData.tasks_fnid[task]
481 fn = taskData.fn_index[fnid]
482 task_deps = self.dataCache.task_deps[fn]
483
484 #logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
485
486 if fnid not in taskData.failed_fnids:
487
488 # Resolve task internal dependencies
489 #
490 # e.g. addtask before X after Y
491 depends = set(taskData.tasks_tdepends[task])
492
493 # Resolve 'deptask' dependencies
494 #
495 # e.g. do_sometask[deptask] = "do_someothertask"
496 # (makes sure sometask runs after someothertask of all DEPENDS)
497 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
498 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
499 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
500
501 # Resolve 'rdeptask' dependencies
502 #
503 # e.g. do_sometask[rdeptask] = "do_someothertask"
504 # (makes sure sometask runs after someothertask of all RDEPENDS)
505 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
506 tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
507 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
508
509 # Resolve inter-task dependencies
510 #
511 # e.g. do_sometask[depends] = "targetname:do_someothertask"
512 # (makes sure sometask runs after targetname's someothertask)
513 idepends = taskData.tasks_idepends[task]
514 for (depid, idependtask) in idepends:
515 if depid in taskData.build_targets and not depid in taskData.failed_deps:
516 # Won't be in build_targets if ASSUME_PROVIDED
517 depdata = taskData.build_targets[depid][0]
518 if depdata is not None:
519 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
520 if taskid is None:
521 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
522 depends.add(taskid)
523 irdepends = taskData.tasks_irdepends[task]
524 for (depid, idependtask) in irdepends:
525 if depid in taskData.run_targets:
526 # Won't be in run_targets if ASSUME_PROVIDED
527 depdata = taskData.run_targets[depid][0]
528 if depdata is not None:
529 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
530 if taskid is None:
531 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
532 depends.add(taskid)
533
534 # Resolve recursive 'recrdeptask' dependencies (Part A)
535 #
536 # e.g. do_sometask[recrdeptask] = "do_someothertask"
537 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538 # We cover the recursive part of the dependencies below
539 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
540 tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
541 recursivetasks[task] = tasknames
542 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
543 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
544 if taskData.tasks_name[task] in tasknames:
545 recursivetasksselfref.add(task)
546
547 if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']:
548 recursiveitasks[task] = []
549 for t in task_deps['recideptask'][taskData.tasks_name[task]].split():
550 newdep = taskData.gettask_id_fromfnid(fnid, t)
551 recursiveitasks[task].append(newdep)
552
553 self.runq_fnid.append(taskData.tasks_fnid[task])
554 self.runq_task.append(taskData.tasks_name[task])
555 self.runq_depends.append(depends)
556 self.runq_revdeps.append(set())
557 self.runq_hash.append("")
558
559 runq_build.append(0)
560
561 # Resolve recursive 'recrdeptask' dependencies (Part B)
562 #
563 # e.g. do_sometask[recrdeptask] = "do_someothertask"
564 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
565 # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
566 extradeps = {}
567 for task in recursivetasks:
568 extradeps[task] = set(self.runq_depends[task])
569 tasknames = recursivetasks[task]
570 seendeps = set()
571 seenfnid = []
572
573 def generate_recdeps(t):
574 newdeps = set()
575 add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
576 extradeps[task].update(newdeps)
577 seendeps.add(t)
578 newdeps.add(t)
579 for i in newdeps:
580 for n in self.runq_depends[i]:
581 if n not in seendeps:
582 generate_recdeps(n)
583 generate_recdeps(task)
584
585 if task in recursiveitasks:
586 for dep in recursiveitasks[task]:
587 generate_recdeps(dep)
588
589 # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
590 for task in recursivetasks:
591 extradeps[task].difference_update(recursivetasksselfref)
592
593 for task in xrange(len(taskData.tasks_name)):
594 # Add in extra dependencies
595 if task in extradeps:
596 self.runq_depends[task] = extradeps[task]
597 # Remove all self references
598 if task in self.runq_depends[task]:
599 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
600 self.runq_depends[task].remove(task)
601
602 # Step B - Mark all active tasks
603 #
604 # Start with the tasks we were asked to run and mark all dependencies
605 # as active too. If the task is to be 'forced', clear its stamp. Once
606 # all active tasks are marked, prune the ones we don't need.
607
608 logger.verbose("Marking Active Tasks")
609
610 def mark_active(listid, depth):
611 """
612 Mark an item as active along with its depends
613 (calls itself recursively)
614 """
615
616 if runq_build[listid] == 1:
617 return
618
619 runq_build[listid] = 1
620
621 depends = self.runq_depends[listid]
622 for depend in depends:
623 mark_active(depend, depth+1)
624
625 self.target_pairs = []
626 for target in self.targets:
627 targetid = taskData.getbuild_id(target[0])
628
629 if targetid not in taskData.build_targets:
630 continue
631
632 if targetid in taskData.failed_deps:
633 continue
634
635 fnid = taskData.build_targets[targetid][0]
636 fn = taskData.fn_index[fnid]
637 self.target_pairs.append((fn, target[1]))
638
639 if fnid in taskData.failed_fnids:
640 continue
641
642 if target[1] not in taskData.tasks_lookup[fnid]:
643 import difflib
644 close_matches = difflib.get_close_matches(target[1], taskData.tasks_lookup[fnid], cutoff=0.7)
645 if close_matches:
646 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
647 else:
648 extra = ""
649 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (target[1], target[0], extra))
650
651 listid = taskData.tasks_lookup[fnid][target[1]]
652
653 mark_active(listid, 1)
654
655 # Step C - Prune all inactive tasks
656 #
657 # Once all active tasks are marked, prune the ones we don't need.
658
659 maps = []
660 delcount = 0
661 for listid in xrange(len(self.runq_fnid)):
662 if runq_build[listid-delcount] == 1:
663 maps.append(listid-delcount)
664 else:
665 del self.runq_fnid[listid-delcount]
666 del self.runq_task[listid-delcount]
667 del self.runq_depends[listid-delcount]
668 del runq_build[listid-delcount]
669 del self.runq_revdeps[listid-delcount]
670 del self.runq_hash[listid-delcount]
671 delcount = delcount + 1
672 maps.append(-1)
673
674 #
675 # Step D - Sanity checks and computation
676 #
677
678 # Check to make sure we still have tasks to run
679 if len(self.runq_fnid) == 0:
680 if not taskData.abort:
681 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
682 else:
683 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
684
685 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
686
687 # Remap the dependencies to account for the deleted tasks
688 # Check we didn't delete a task we depend on
689 for listid in xrange(len(self.runq_fnid)):
690 newdeps = []
691 origdeps = self.runq_depends[listid]
692 for origdep in origdeps:
693 if maps[origdep] == -1:
694 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
695 newdeps.append(maps[origdep])
696 self.runq_depends[listid] = set(newdeps)
697
698 logger.verbose("Assign Weightings")
699
700 # Generate a list of reverse dependencies to ease future calculations
701 for listid in xrange(len(self.runq_fnid)):
702 for dep in self.runq_depends[listid]:
703 self.runq_revdeps[dep].add(listid)
704
705 # Identify tasks at the end of dependency chains
706 # Error on circular dependency loops (length two)
707 endpoints = []
708 for listid in xrange(len(self.runq_fnid)):
709 revdeps = self.runq_revdeps[listid]
710 if len(revdeps) == 0:
711 endpoints.append(listid)
712 for dep in revdeps:
713 if dep in self.runq_depends[listid]:
714 #self.dump_data(taskData)
715 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
716
717 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
718
719 # Calculate task weights
720 # Check of higher length circular dependencies
721 self.runq_weight = self.calculate_task_weights(endpoints)
722
723 # Sanity Check - Check for multiple tasks building the same provider
724 prov_list = {}
725 seen_fn = []
726 for task in xrange(len(self.runq_fnid)):
727 fn = taskData.fn_index[self.runq_fnid[task]]
728 if fn in seen_fn:
729 continue
730 seen_fn.append(fn)
731 for prov in self.dataCache.fn_provides[fn]:
732 if prov not in prov_list:
733 prov_list[prov] = [fn]
734 elif fn not in prov_list[prov]:
735 prov_list[prov].append(fn)
736 for prov in prov_list:
737 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
738 seen_pn = []
739 # If two versions of the same PN are being built its fatal, we don't support it.
740 for fn in prov_list[prov]:
741 pn = self.dataCache.pkg_fn[fn]
742 if pn not in seen_pn:
743 seen_pn.append(pn)
744 else:
745 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
746 msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov]))
747 if self.warn_multi_bb:
748 logger.warn(msg)
749 else:
750 msg += "\n This usually means one provides something the other doesn't and should."
751 logger.error(msg)
752
753 # Create a whitelist usable by the stamp checks
754 stampfnwhitelist = []
755 for entry in self.stampwhitelist.split():
756 entryid = self.taskData.getbuild_id(entry)
757 if entryid not in self.taskData.build_targets:
758 continue
759 fnid = self.taskData.build_targets[entryid][0]
760 fn = self.taskData.fn_index[fnid]
761 stampfnwhitelist.append(fn)
762 self.stampfnwhitelist = stampfnwhitelist
763
764 # Iterate over the task list looking for tasks with a 'setscene' function
765 self.runq_setscene = []
766 if not self.cooker.configuration.nosetscene:
767 for task in range(len(self.runq_fnid)):
768 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
769 if not setscene:
770 continue
771 self.runq_setscene.append(task)
772
773 def invalidate_task(fn, taskname, error_nostamp):
774 taskdep = self.dataCache.task_deps[fn]
775 fnid = self.taskData.getfn_id(fn)
776 if taskname not in taskData.tasks_lookup[fnid]:
777 logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname)
778 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
779 if error_nostamp:
780 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
781 else:
782 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
783 else:
784 logger.verbose("Invalidate task %s, %s", taskname, fn)
785 bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
786
787 # Invalidate task if force mode active
788 if self.cooker.configuration.force:
789 for (fn, target) in self.target_pairs:
790 invalidate_task(fn, target, False)
791
792 # Invalidate task if invalidate mode active
793 if self.cooker.configuration.invalidate_stamp:
794 for (fn, target) in self.target_pairs:
795 for st in self.cooker.configuration.invalidate_stamp.split(','):
796 if not st.startswith("do_"):
797 st = "do_%s" % st
798 invalidate_task(fn, st, True)
799
800 # Iterate over the task list and call into the siggen code
801 dealtwith = set()
802 todeal = set(range(len(self.runq_fnid)))
803 while len(todeal) > 0:
804 for task in todeal.copy():
805 if len(self.runq_depends[task] - dealtwith) == 0:
806 dealtwith.add(task)
807 todeal.remove(task)
808 procdep = []
809 for dep in self.runq_depends[task]:
810 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
811 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
812
813 return len(self.runq_fnid)
814
815 def dump_data(self, taskQueue):
816 """
817 Dump some debug information on the internal data structures
818 """
819 logger.debug(3, "run_tasks:")
820 for task in xrange(len(self.rqdata.runq_task)):
821 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
822 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
823 self.rqdata.runq_task[task],
824 self.rqdata.runq_weight[task],
825 self.rqdata.runq_depends[task],
826 self.rqdata.runq_revdeps[task])
827
828 logger.debug(3, "sorted_tasks:")
829 for task1 in xrange(len(self.rqdata.runq_task)):
830 if task1 in self.prio_map:
831 task = self.prio_map[task1]
832 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
833 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
834 self.rqdata.runq_task[task],
835 self.rqdata.runq_weight[task],
836 self.rqdata.runq_depends[task],
837 self.rqdata.runq_revdeps[task])
838
839class RunQueue:
840 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
841
842 self.cooker = cooker
843 self.cfgData = cfgData
844 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
845
846 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
847 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
848 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
849 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
850
851 self.state = runQueuePrepare
852
853 # For disk space monitor
854 self.dm = monitordisk.diskMonitor(cfgData)
855
856 self.rqexe = None
857 self.worker = None
858 self.workerpipe = None
859 self.fakeworker = None
860 self.fakeworkerpipe = None
861
862 def _start_worker(self, fakeroot = False, rqexec = None):
863 logger.debug(1, "Starting bitbake-worker")
864 magic = "decafbad"
865 if self.cooker.configuration.profile:
866 magic = "decafbadbad"
867 if fakeroot:
868 fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
869 fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
870 env = os.environ.copy()
871 for key, value in (var.split('=') for var in fakerootenv):
872 env[key] = value
873 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
874 else:
875 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
876 bb.utils.nonblockingfd(worker.stdout)
877 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec)
878
879 workerdata = {
880 "taskdeps" : self.rqdata.dataCache.task_deps,
881 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
882 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
883 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
884 "sigdata" : bb.parse.siggen.get_taskdata(),
885 "runq_hash" : self.rqdata.runq_hash,
886 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
887 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
888 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
889 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
890 "prhost" : self.cooker.prhost,
891 "buildname" : self.cfgData.getVar("BUILDNAME", True),
892 "date" : self.cfgData.getVar("DATE", True),
893 "time" : self.cfgData.getVar("TIME", True),
894 }
895
896 worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
897 worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
898 worker.stdin.flush()
899
900 return worker, workerpipe
901
902 def _teardown_worker(self, worker, workerpipe):
903 if not worker:
904 return
905 logger.debug(1, "Teardown for bitbake-worker")
906 try:
907 worker.stdin.write("<quit></quit>")
908 worker.stdin.flush()
909 except IOError:
910 pass
911 while worker.returncode is None:
912 workerpipe.read()
913 worker.poll()
914 while workerpipe.read():
915 continue
916 workerpipe.close()
917
918 def start_worker(self):
919 if self.worker:
920 self.teardown_workers()
921 self.teardown = False
922 self.worker, self.workerpipe = self._start_worker()
923
924 def start_fakeworker(self, rqexec):
925 if not self.fakeworker:
926 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
927
928 def teardown_workers(self):
929 self.teardown = True
930 self._teardown_worker(self.worker, self.workerpipe)
931 self.worker = None
932 self.workerpipe = None
933 self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
934 self.fakeworker = None
935 self.fakeworkerpipe = None
936
937 def read_workers(self):
938 self.workerpipe.read()
939 if self.fakeworkerpipe:
940 self.fakeworkerpipe.read()
941
942 def active_fds(self):
943 fds = []
944 if self.workerpipe:
945 fds.append(self.workerpipe.input)
946 if self.fakeworkerpipe:
947 fds.append(self.fakeworkerpipe.input)
948 return fds
949
950 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
951 def get_timestamp(f):
952 try:
953 if not os.access(f, os.F_OK):
954 return None
955 return os.stat(f)[stat.ST_MTIME]
956 except:
957 return None
958
959 if self.stamppolicy == "perfile":
960 fulldeptree = False
961 else:
962 fulldeptree = True
963 stampwhitelist = []
964 if self.stamppolicy == "whitelist":
965 stampwhitelist = self.rqdata.stampfnwhitelist
966
967 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
968 if taskname is None:
969 taskname = self.rqdata.runq_task[task]
970
971 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
972
973 # If the stamp is missing, it's not current
974 if not os.access(stampfile, os.F_OK):
975 logger.debug(2, "Stampfile %s not available", stampfile)
976 return False
977 # If it's a 'nostamp' task, it's not current
978 taskdep = self.rqdata.dataCache.task_deps[fn]
979 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
980 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
981 return False
982
983 if taskname != "do_setscene" and taskname.endswith("_setscene"):
984 return True
985
986 if cache is None:
987 cache = {}
988
989 iscurrent = True
990 t1 = get_timestamp(stampfile)
991 for dep in self.rqdata.runq_depends[task]:
992 if iscurrent:
993 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
994 taskname2 = self.rqdata.runq_task[dep]
995 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
996 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
997 t2 = get_timestamp(stampfile2)
998 t3 = get_timestamp(stampfile3)
999 if t3 and t3 > t2:
1000 continue
1001 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
1002 if not t2:
1003 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
1004 iscurrent = False
1005 if t1 < t2:
1006 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
1007 iscurrent = False
1008 if recurse and iscurrent:
1009 if dep in cache:
1010 iscurrent = cache[dep]
1011 if not iscurrent:
1012 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1013 else:
1014 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1015 cache[dep] = iscurrent
1016 if recurse:
1017 cache[task] = iscurrent
1018 return iscurrent
1019
1020 def _execute_runqueue(self):
1021 """
1022 Run the tasks in a queue prepared by rqdata.prepare()
1023 Upon failure, optionally try to recover the build using any alternate providers
1024 (if the abort on failure configuration option isn't set)
1025 """
1026
1027 retval = True
1028
1029 if self.state is runQueuePrepare:
1030 self.rqexe = RunQueueExecuteDummy(self)
1031 if self.rqdata.prepare() == 0:
1032 self.state = runQueueComplete
1033 else:
1034 self.state = runQueueSceneInit
1035
1036 # we are ready to run, see if any UI client needs the dependency info
1037 if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset:
1038 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1039 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1040
1041 if self.state is runQueueSceneInit:
1042 dump = self.cooker.configuration.dump_signatures
1043 if dump:
1044 if 'printdiff' in dump:
1045 invalidtasks = self.print_diffscenetasks()
1046 self.dump_signatures(dump)
1047 if 'printdiff' in dump:
1048 self.write_diffscenetasks(invalidtasks)
1049 self.state = runQueueComplete
1050 else:
1051 self.start_worker()
1052 self.rqexe = RunQueueExecuteScenequeue(self)
1053
1054 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
1055 self.dm.check(self)
1056
1057 if self.state is runQueueSceneRun:
1058 retval = self.rqexe.execute()
1059
1060 if self.state is runQueueRunInit:
1061 logger.info("Executing RunQueue Tasks")
1062 self.rqexe = RunQueueExecuteTasks(self)
1063 self.state = runQueueRunning
1064
1065 if self.state is runQueueRunning:
1066 retval = self.rqexe.execute()
1067
1068 if self.state is runQueueCleanUp:
1069 retval = self.rqexe.finish()
1070
1071 if (self.state is runQueueComplete or self.state is runQueueFailed) and self.rqexe:
1072 self.teardown_workers()
1073 if self.rqexe.stats.failed:
1074 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1075 else:
1076 # Let's avoid the word "failed" if nothing actually did
1077 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1078
1079 if self.state is runQueueFailed:
1080 if not self.rqdata.taskData.tryaltconfigs:
1081 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
1082 for fnid in self.rqexe.failed_fnids:
1083 self.rqdata.taskData.fail_fnid(fnid)
1084 self.rqdata.reset()
1085
1086 if self.state is runQueueComplete:
1087 # All done
1088 return False
1089
1090 # Loop
1091 return retval
1092
1093 def execute_runqueue(self):
1094 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1095 try:
1096 return self._execute_runqueue()
1097 except bb.runqueue.TaskFailure:
1098 raise
1099 except SystemExit:
1100 raise
1101 except bb.BBHandledException:
1102 try:
1103 self.teardown_workers()
1104 except:
1105 pass
1106 self.state = runQueueComplete
1107 raise
1108 except:
1109 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
1110 try:
1111 self.teardown_workers()
1112 except:
1113 pass
1114 self.state = runQueueComplete
1115 raise
1116
1117 def finish_runqueue(self, now = False):
1118 if not self.rqexe:
1119 self.state = runQueueComplete
1120 return
1121
1122 if now:
1123 self.rqexe.finish_now()
1124 else:
1125 self.rqexe.finish()
1126
1127 def dump_signatures(self, options):
1128 done = set()
1129 bb.note("Reparsing files to collect dependency data")
1130 for task in range(len(self.rqdata.runq_fnid)):
1131 if self.rqdata.runq_fnid[task] not in done:
1132 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1133 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1134 done.add(self.rqdata.runq_fnid[task])
1135
1136 bb.parse.siggen.dump_sigs(self.rqdata.dataCache, options)
1137
1138 return
1139
1140 def print_diffscenetasks(self):
1141
1142 valid = []
1143 sq_hash = []
1144 sq_hashfn = []
1145 sq_fn = []
1146 sq_taskname = []
1147 sq_task = []
1148 noexec = []
1149 stamppresent = []
1150 valid_new = set()
1151
1152 for task in xrange(len(self.rqdata.runq_fnid)):
1153 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1154 taskname = self.rqdata.runq_task[task]
1155 taskdep = self.rqdata.dataCache.task_deps[fn]
1156
1157 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1158 noexec.append(task)
1159 continue
1160
1161 sq_fn.append(fn)
1162 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1163 sq_hash.append(self.rqdata.runq_hash[task])
1164 sq_taskname.append(taskname)
1165 sq_task.append(task)
1166 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.expanded_data }
1167 try:
1168 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=True)"
1169 valid = bb.utils.better_eval(call, locs)
1170 # Handle version with no siginfo parameter
1171 except TypeError:
1172 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1173 valid = bb.utils.better_eval(call, locs)
1174 for v in valid:
1175 valid_new.add(sq_task[v])
1176
1177 # Tasks which are both setscene and noexec never care about dependencies
1178 # We therefore find tasks which are setscene and noexec and mark their
1179 # unique dependencies as valid.
1180 for task in noexec:
1181 if task not in self.rqdata.runq_setscene:
1182 continue
1183 for dep in self.rqdata.runq_depends[task]:
1184 hasnoexecparents = True
1185 for dep2 in self.rqdata.runq_revdeps[dep]:
1186 if dep2 in self.rqdata.runq_setscene and dep2 in noexec:
1187 continue
1188 hasnoexecparents = False
1189 break
1190 if hasnoexecparents:
1191 valid_new.add(dep)
1192
1193 invalidtasks = set()
1194 for task in xrange(len(self.rqdata.runq_fnid)):
1195 if task not in valid_new and task not in noexec:
1196 invalidtasks.add(task)
1197
1198 found = set()
1199 processed = set()
1200 for task in invalidtasks:
1201 toprocess = set([task])
1202 while toprocess:
1203 next = set()
1204 for t in toprocess:
1205 for dep in self.rqdata.runq_depends[t]:
1206 if dep in invalidtasks:
1207 found.add(task)
1208 if dep not in processed:
1209 processed.add(dep)
1210 next.add(dep)
1211 toprocess = next
1212 if task in found:
1213 toprocess = set()
1214
1215 tasklist = []
1216 for task in invalidtasks.difference(found):
1217 tasklist.append(self.rqdata.get_user_idstring(task))
1218
1219 if tasklist:
1220 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1221
1222 return invalidtasks.difference(found)
1223
1224 def write_diffscenetasks(self, invalidtasks):
1225
1226 # Define recursion callback
1227 def recursecb(key, hash1, hash2):
1228 hashes = [hash1, hash2]
1229 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1230
1231 recout = []
1232 if len(hashfiles) == 2:
1233 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1234 recout.extend(list(' ' + l for l in out2))
1235 else:
1236 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1237
1238 return recout
1239
1240
1241 for task in invalidtasks:
1242 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1243 pn = self.rqdata.dataCache.pkg_fn[fn]
1244 taskname = self.rqdata.runq_task[task]
1245 h = self.rqdata.runq_hash[task]
1246 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1247 match = None
1248 for m in matches:
1249 if h in m:
1250 match = m
1251 if match is None:
1252 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1253 matches = {k : v for k, v in matches.iteritems() if h not in k}
1254 if matches:
1255 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1256 prevh = __find_md5__.search(latestmatch).group(0)
1257 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1258 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
1259
1260class RunQueueExecute:
1261
1262 def __init__(self, rq):
1263 self.rq = rq
1264 self.cooker = rq.cooker
1265 self.cfgData = rq.cfgData
1266 self.rqdata = rq.rqdata
1267
1268 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
1269 self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
1270
1271 self.runq_buildable = []
1272 self.runq_running = []
1273 self.runq_complete = []
1274
1275 self.build_stamps = {}
1276 self.build_stamps2 = []
1277 self.failed_fnids = []
1278
1279 self.stampcache = {}
1280
1281 rq.workerpipe.setrunqueueexec(self)
1282 if rq.fakeworkerpipe:
1283 rq.fakeworkerpipe.setrunqueueexec(self)
1284
1285 if self.number_tasks <= 0:
1286 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1287
1288 def runqueue_process_waitpid(self, task, status):
1289
1290 # self.build_stamps[pid] may not exist when use shared work directory.
1291 if task in self.build_stamps:
1292 self.build_stamps2.remove(self.build_stamps[task])
1293 del self.build_stamps[task]
1294
1295 if status != 0:
1296 self.task_fail(task, status)
1297 else:
1298 self.task_complete(task)
1299 return True
1300
1301 def finish_now(self):
1302
1303 for worker in [self.rq.worker, self.rq.fakeworker]:
1304 if not worker:
1305 continue
1306 try:
1307 worker.stdin.write("<finishnow></finishnow>")
1308 worker.stdin.flush()
1309 except IOError:
1310 # worker must have died?
1311 pass
1312
1313 if len(self.failed_fnids) != 0:
1314 self.rq.state = runQueueFailed
1315 return
1316
1317 self.rq.state = runQueueComplete
1318 return
1319
1320 def finish(self):
1321 self.rq.state = runQueueCleanUp
1322
1323 if self.stats.active > 0:
1324 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1325 self.rq.read_workers()
1326 return self.rq.active_fds()
1327
1328 if len(self.failed_fnids) != 0:
1329 self.rq.state = runQueueFailed
1330 return True
1331
1332 self.rq.state = runQueueComplete
1333 return True
1334
1335 def check_dependencies(self, task, taskdeps, setscene = False):
1336 if not self.rq.depvalidate:
1337 return False
1338
1339 taskdata = {}
1340 taskdeps.add(task)
1341 for dep in taskdeps:
1342 if setscene:
1343 depid = self.rqdata.runq_setscene[dep]
1344 else:
1345 depid = dep
1346 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
1347 pn = self.rqdata.dataCache.pkg_fn[fn]
1348 taskname = self.rqdata.runq_task[depid]
1349 taskdata[dep] = [pn, taskname, fn]
1350 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1351 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.expanded_data }
1352 valid = bb.utils.better_eval(call, locs)
1353 return valid
1354
1355class RunQueueExecuteDummy(RunQueueExecute):
1356 def __init__(self, rq):
1357 self.rq = rq
1358 self.stats = RunQueueStats(0)
1359
1360 def finish(self):
1361 self.rq.state = runQueueComplete
1362 return
1363
1364class RunQueueExecuteTasks(RunQueueExecute):
1365 def __init__(self, rq):
1366 RunQueueExecute.__init__(self, rq)
1367
1368 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1369
1370 self.stampcache = {}
1371
1372 initial_covered = self.rq.scenequeue_covered.copy()
1373
1374 # Mark initial buildable tasks
1375 for task in xrange(self.stats.total):
1376 self.runq_running.append(0)
1377 self.runq_complete.append(0)
1378 if len(self.rqdata.runq_depends[task]) == 0:
1379 self.runq_buildable.append(1)
1380 else:
1381 self.runq_buildable.append(0)
1382 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1383 self.rq.scenequeue_covered.add(task)
1384
1385 found = True
1386 while found:
1387 found = False
1388 for task in xrange(self.stats.total):
1389 if task in self.rq.scenequeue_covered:
1390 continue
1391 logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
1392
1393 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1394 found = True
1395 self.rq.scenequeue_covered.add(task)
1396
1397 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1398
1399 # Allow the metadata to elect for setscene tasks to run anyway
1400 covered_remove = set()
1401 if self.rq.setsceneverify:
1402 invalidtasks = []
1403 for task in xrange(len(self.rqdata.runq_task)):
1404 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1405 taskname = self.rqdata.runq_task[task]
1406 taskdep = self.rqdata.dataCache.task_deps[fn]
1407
1408 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1409 continue
1410 if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
1411 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1412 continue
1413 if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
1414 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1415 continue
1416 invalidtasks.append(task)
1417
1418 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
1419 call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1420 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.expanded_data, "invalidtasks" : invalidtasks }
1421 # Backwards compatibility with older versions without invalidtasks
1422 try:
1423 covered_remove = bb.utils.better_eval(call, locs)
1424 except TypeError:
1425 covered_remove = bb.utils.better_eval(call2, locs)
1426
1427 def removecoveredtask(task):
1428 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1429 taskname = self.rqdata.runq_task[task] + '_setscene'
1430 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1431 self.rq.scenequeue_covered.remove(task)
1432
1433 toremove = covered_remove
1434 for task in toremove:
1435 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1436 while toremove:
1437 covered_remove = []
1438 for task in toremove:
1439 removecoveredtask(task)
1440 for deptask in self.rqdata.runq_depends[task]:
1441 if deptask not in self.rq.scenequeue_covered:
1442 continue
1443 if deptask in toremove or deptask in covered_remove or deptask in initial_covered:
1444 continue
1445 logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask))
1446 covered_remove.append(deptask)
1447 toremove = covered_remove
1448
1449 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1450
1451 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1452
1453 schedulers = self.get_schedulers()
1454 for scheduler in schedulers:
1455 if self.scheduler == scheduler.name:
1456 self.sched = scheduler(self, self.rqdata)
1457 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1458 break
1459 else:
1460 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1461 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1462
1463 def get_schedulers(self):
1464 schedulers = set(obj for obj in globals().values()
1465 if type(obj) is type and
1466 issubclass(obj, RunQueueScheduler))
1467
1468 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
1469 if user_schedulers:
1470 for sched in user_schedulers.split():
1471 if not "." in sched:
1472 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1473 continue
1474
1475 modname, name = sched.rsplit(".", 1)
1476 try:
1477 module = __import__(modname, fromlist=(name,))
1478 except ImportError as exc:
1479 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1480 raise SystemExit(1)
1481 else:
1482 schedulers.add(getattr(module, name))
1483 return schedulers
1484
1485 def setbuildable(self, task):
1486 self.runq_buildable[task] = 1
1487 self.sched.newbuilable(task)
1488
1489 def task_completeoutright(self, task):
1490 """
1491 Mark a task as completed
1492 Look at the reverse dependencies and mark any task with
1493 completed dependencies as buildable
1494 """
1495 self.runq_complete[task] = 1
1496 for revdep in self.rqdata.runq_revdeps[task]:
1497 if self.runq_running[revdep] == 1:
1498 continue
1499 if self.runq_buildable[revdep] == 1:
1500 continue
1501 alldeps = 1
1502 for dep in self.rqdata.runq_depends[revdep]:
1503 if self.runq_complete[dep] != 1:
1504 alldeps = 0
1505 if alldeps == 1:
1506 self.setbuildable(revdep)
1507 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1508 taskname = self.rqdata.runq_task[revdep]
1509 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1510
1511 def task_complete(self, task):
1512 self.stats.taskCompleted()
1513 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1514 self.task_completeoutright(task)
1515
1516 def task_fail(self, task, exitcode):
1517 """
1518 Called when a task has failed
1519 Updates the state engine with the failure
1520 """
1521 self.stats.taskFailed()
1522 fnid = self.rqdata.runq_fnid[task]
1523 self.failed_fnids.append(fnid)
1524 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1525 if self.rqdata.taskData.abort:
1526 self.rq.state = runQueueCleanUp
1527
1528 def task_skip(self, task, reason):
1529 self.runq_running[task] = 1
1530 self.setbuildable(task)
1531 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1532 self.task_completeoutright(task)
1533 self.stats.taskCompleted()
1534 self.stats.taskSkipped()
1535
1536 def execute(self):
1537 """
1538 Run the tasks in a queue prepared by rqdata.prepare()
1539 """
1540
1541 self.rq.read_workers()
1542
1543
1544 if self.stats.total == 0:
1545 # nothing to do
1546 self.rq.state = runQueueCleanUp
1547
1548 task = self.sched.next()
1549 if task is not None:
1550 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1551 taskname = self.rqdata.runq_task[task]
1552
1553 if task in self.rq.scenequeue_covered:
1554 logger.debug(2, "Setscene covered task %s (%s)", task,
1555 self.rqdata.get_user_idstring(task))
1556 self.task_skip(task, "covered")
1557 return True
1558
1559 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
1560 logger.debug(2, "Stamp current task %s (%s)", task,
1561 self.rqdata.get_user_idstring(task))
1562 self.task_skip(task, "existing")
1563 return True
1564
1565 taskdep = self.rqdata.dataCache.task_deps[fn]
1566 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1567 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1568 noexec=True)
1569 bb.event.fire(startevent, self.cfgData)
1570 self.runq_running[task] = 1
1571 self.stats.taskActive()
1572 if not self.cooker.configuration.dry_run:
1573 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1574 self.task_complete(task)
1575 return True
1576 else:
1577 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1578 bb.event.fire(startevent, self.cfgData)
1579
1580 taskdepdata = self.build_taskdepdata(task)
1581
1582 taskdep = self.rqdata.dataCache.task_deps[fn]
1583 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
1584 if not self.rq.fakeworker:
1585 try:
1586 self.rq.start_fakeworker(self)
1587 except OSError as exc:
1588 logger.critical("Failed to spawn fakeroot worker to run %s:%s: %s" % (fn, taskname, str(exc)))
1589 self.rq.state = runQueueFailed
1590 return True
1591 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1592 self.rq.fakeworker.stdin.flush()
1593 else:
1594 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1595 self.rq.worker.stdin.flush()
1596
1597 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1598 self.build_stamps2.append(self.build_stamps[task])
1599 self.runq_running[task] = 1
1600 self.stats.taskActive()
1601 if self.stats.active < self.number_tasks:
1602 return True
1603
1604 if self.stats.active > 0:
1605 self.rq.read_workers()
1606 return self.rq.active_fds()
1607
1608 if len(self.failed_fnids) != 0:
1609 self.rq.state = runQueueFailed
1610 return True
1611
1612 # Sanity Checks
1613 for task in xrange(self.stats.total):
1614 if self.runq_buildable[task] == 0:
1615 logger.error("Task %s never buildable!", task)
1616 if self.runq_running[task] == 0:
1617 logger.error("Task %s never ran!", task)
1618 if self.runq_complete[task] == 0:
1619 logger.error("Task %s never completed!", task)
1620 self.rq.state = runQueueComplete
1621
1622 return True
1623
1624 def build_taskdepdata(self, task):
1625 taskdepdata = {}
1626 next = self.rqdata.runq_depends[task]
1627 next.add(task)
1628 while next:
1629 additional = []
1630 for revdep in next:
1631 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1632 pn = self.rqdata.dataCache.pkg_fn[fn]
1633 taskname = self.rqdata.runq_task[revdep]
1634 deps = self.rqdata.runq_depends[revdep]
1635 provides = self.rqdata.dataCache.fn_provides[fn]
1636 taskdepdata[revdep] = [pn, taskname, fn, deps, provides]
1637 for revdep2 in deps:
1638 if revdep2 not in taskdepdata:
1639 additional.append(revdep2)
1640 next = additional
1641
1642 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
1643 return taskdepdata
1644
1645class RunQueueExecuteScenequeue(RunQueueExecute):
1646 def __init__(self, rq):
1647 RunQueueExecute.__init__(self, rq)
1648
1649 self.scenequeue_covered = set()
1650 self.scenequeue_notcovered = set()
1651 self.scenequeue_notneeded = set()
1652
1653 # If we don't have any setscene functions, skip this step
1654 if len(self.rqdata.runq_setscene) == 0:
1655 rq.scenequeue_covered = set()
1656 rq.state = runQueueRunInit
1657 return
1658
1659 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1660
1661 sq_revdeps = []
1662 sq_revdeps_new = []
1663 sq_revdeps_squash = []
1664 self.sq_harddeps = {}
1665
1666 # We need to construct a dependency graph for the setscene functions. Intermediate
1667 # dependencies between the setscene tasks only complicate the code. This code
1668 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1669 # only containing the setscene functions.
1670
1671 for task in xrange(self.stats.total):
1672 self.runq_running.append(0)
1673 self.runq_complete.append(0)
1674 self.runq_buildable.append(0)
1675
1676 # First process the chains up to the first setscene task.
1677 endpoints = {}
1678 for task in xrange(len(self.rqdata.runq_fnid)):
1679 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1680 sq_revdeps_new.append(set())
1681 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1682 endpoints[task] = set()
1683
1684 # Secondly process the chains between setscene tasks.
1685 for task in self.rqdata.runq_setscene:
1686 for dep in self.rqdata.runq_depends[task]:
1687 if dep not in endpoints:
1688 endpoints[dep] = set()
1689 endpoints[dep].add(task)
1690
1691 def process_endpoints(endpoints):
1692 newendpoints = {}
1693 for point, task in endpoints.items():
1694 tasks = set()
1695 if task:
1696 tasks |= task
1697 if sq_revdeps_new[point]:
1698 tasks |= sq_revdeps_new[point]
1699 sq_revdeps_new[point] = set()
1700 if point in self.rqdata.runq_setscene:
1701 sq_revdeps_new[point] = tasks
1702 for dep in self.rqdata.runq_depends[point]:
1703 if point in sq_revdeps[dep]:
1704 sq_revdeps[dep].remove(point)
1705 if tasks:
1706 sq_revdeps_new[dep] |= tasks
1707 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1708 newendpoints[dep] = task
1709 if len(newendpoints) != 0:
1710 process_endpoints(newendpoints)
1711
1712 process_endpoints(endpoints)
1713
1714 # Build a list of setscene tasks which are "unskippable"
1715 # These are direct endpoints referenced by the build
1716 endpoints2 = {}
1717 sq_revdeps2 = []
1718 sq_revdeps_new2 = []
1719 def process_endpoints2(endpoints):
1720 newendpoints = {}
1721 for point, task in endpoints.items():
1722 tasks = set([point])
1723 if task:
1724 tasks |= task
1725 if sq_revdeps_new2[point]:
1726 tasks |= sq_revdeps_new2[point]
1727 sq_revdeps_new2[point] = set()
1728 if point in self.rqdata.runq_setscene:
1729 sq_revdeps_new2[point] = tasks
1730 for dep in self.rqdata.runq_depends[point]:
1731 if point in sq_revdeps2[dep]:
1732 sq_revdeps2[dep].remove(point)
1733 if tasks:
1734 sq_revdeps_new2[dep] |= tasks
1735 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1736 newendpoints[dep] = tasks
1737 if len(newendpoints) != 0:
1738 process_endpoints2(newendpoints)
1739 for task in xrange(len(self.rqdata.runq_fnid)):
1740 sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
1741 sq_revdeps_new2.append(set())
1742 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1743 endpoints2[task] = set()
1744 process_endpoints2(endpoints2)
1745 self.unskippable = []
1746 for task in self.rqdata.runq_setscene:
1747 if sq_revdeps_new2[task]:
1748 self.unskippable.append(self.rqdata.runq_setscene.index(task))
1749
1750 for task in xrange(len(self.rqdata.runq_fnid)):
1751 if task in self.rqdata.runq_setscene:
1752 deps = set()
1753 for dep in sq_revdeps_new[task]:
1754 deps.add(self.rqdata.runq_setscene.index(dep))
1755 sq_revdeps_squash.append(deps)
1756 elif len(sq_revdeps_new[task]) != 0:
1757 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1758
1759 # Resolve setscene inter-task dependencies
1760 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
1761 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
1762 for task in self.rqdata.runq_setscene:
1763 realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
1764 idepends = self.rqdata.taskData.tasks_idepends[realid]
1765 for (depid, idependtask) in idepends:
1766 if depid not in self.rqdata.taskData.build_targets:
1767 continue
1768
1769 depdata = self.rqdata.taskData.build_targets[depid][0]
1770 if depdata is None:
1771 continue
1772 dep = self.rqdata.taskData.fn_index[depdata]
1773 taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
1774 if taskid is None:
1775 bb.msg.fatal("RunQueue", "Task %s_setscene depends upon non-existent task %s:%s" % (self.rqdata.get_user_idstring(task), dep, idependtask))
1776
1777 if not self.rqdata.runq_setscene.index(taskid) in self.sq_harddeps:
1778 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)] = set()
1779 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)].add(self.rqdata.runq_setscene.index(task))
1780
1781 sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
1782 # Have to zero this to avoid circular dependencies
1783 sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
1784
1785 for task in self.sq_harddeps:
1786 for dep in self.sq_harddeps[task]:
1787 sq_revdeps_squash[dep].add(task)
1788
1789 #for task in xrange(len(sq_revdeps_squash)):
1790 # realtask = self.rqdata.runq_setscene[task]
1791 # bb.warn("Task %s: %s_setscene is %s " % (task, self.rqdata.get_user_idstring(realtask) , sq_revdeps_squash[task]))
1792
1793 self.sq_deps = []
1794 self.sq_revdeps = sq_revdeps_squash
1795 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1796
1797 for task in xrange(len(self.sq_revdeps)):
1798 self.sq_deps.append(set())
1799 for task in xrange(len(self.sq_revdeps)):
1800 for dep in self.sq_revdeps[task]:
1801 self.sq_deps[dep].add(task)
1802
1803 for task in xrange(len(self.sq_revdeps)):
1804 if len(self.sq_revdeps[task]) == 0:
1805 self.runq_buildable[task] = 1
1806
1807 self.outrightfail = []
1808 if self.rq.hashvalidate:
1809 sq_hash = []
1810 sq_hashfn = []
1811 sq_fn = []
1812 sq_taskname = []
1813 sq_task = []
1814 noexec = []
1815 stamppresent = []
1816 for task in xrange(len(self.sq_revdeps)):
1817 realtask = self.rqdata.runq_setscene[task]
1818 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1819 taskname = self.rqdata.runq_task[realtask]
1820 taskdep = self.rqdata.dataCache.task_deps[fn]
1821
1822 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1823 noexec.append(task)
1824 self.task_skip(task)
1825 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1826 continue
1827
1828 if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
1829 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1830 stamppresent.append(task)
1831 self.task_skip(task)
1832 continue
1833
1834 if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
1835 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1836 stamppresent.append(task)
1837 self.task_skip(task)
1838 continue
1839
1840 sq_fn.append(fn)
1841 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1842 sq_hash.append(self.rqdata.runq_hash[realtask])
1843 sq_taskname.append(taskname)
1844 sq_task.append(task)
1845 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1846 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.expanded_data }
1847 valid = bb.utils.better_eval(call, locs)
1848
1849 valid_new = stamppresent
1850 for v in valid:
1851 valid_new.append(sq_task[v])
1852
1853 for task in xrange(len(self.sq_revdeps)):
1854 if task not in valid_new and task not in noexec:
1855 realtask = self.rqdata.runq_setscene[task]
1856 logger.debug(2, 'No package found, so skipping setscene task %s',
1857 self.rqdata.get_user_idstring(realtask))
1858 self.outrightfail.append(task)
1859
1860 logger.info('Executing SetScene Tasks')
1861
1862 self.rq.state = runQueueSceneRun
1863
1864 def scenequeue_updatecounters(self, task, fail = False):
1865 for dep in self.sq_deps[task]:
1866 if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]:
1867 realtask = self.rqdata.runq_setscene[task]
1868 realdep = self.rqdata.runq_setscene[dep]
1869 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (self.rqdata.get_user_idstring(realtask), self.rqdata.get_user_idstring(realdep)))
1870 self.scenequeue_updatecounters(dep, fail)
1871 continue
1872 if task not in self.sq_revdeps2[dep]:
1873 # May already have been removed by the fail case above
1874 continue
1875 self.sq_revdeps2[dep].remove(task)
1876 if len(self.sq_revdeps2[dep]) == 0:
1877 self.runq_buildable[dep] = 1
1878
1879 def task_completeoutright(self, task):
1880 """
1881 Mark a task as completed
1882 Look at the reverse dependencies and mark any task with
1883 completed dependencies as buildable
1884 """
1885
1886 index = self.rqdata.runq_setscene[task]
1887 logger.debug(1, 'Found task %s which could be accelerated',
1888 self.rqdata.get_user_idstring(index))
1889
1890 self.scenequeue_covered.add(task)
1891 self.scenequeue_updatecounters(task)
1892
1893 def task_complete(self, task):
1894 self.stats.taskCompleted()
1895 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1896 self.task_completeoutright(task)
1897
1898 def task_fail(self, task, result):
1899 self.stats.taskFailed()
1900 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1901 self.scenequeue_notcovered.add(task)
1902 self.scenequeue_updatecounters(task, True)
1903
1904 def task_failoutright(self, task):
1905 self.runq_running[task] = 1
1906 self.runq_buildable[task] = 1
1907 self.stats.taskCompleted()
1908 self.stats.taskSkipped()
1909 index = self.rqdata.runq_setscene[task]
1910 self.scenequeue_notcovered.add(task)
1911 self.scenequeue_updatecounters(task, True)
1912
1913 def task_skip(self, task):
1914 self.runq_running[task] = 1
1915 self.runq_buildable[task] = 1
1916 self.task_completeoutright(task)
1917 self.stats.taskCompleted()
1918 self.stats.taskSkipped()
1919
1920 def execute(self):
1921 """
1922 Run the tasks in a queue prepared by prepare_runqueue
1923 """
1924
1925 self.rq.read_workers()
1926
1927 task = None
1928 if self.stats.active < self.number_tasks:
1929 # Find the next setscene to run
1930 for nexttask in xrange(self.stats.total):
1931 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1932 if nexttask in self.unskippable:
1933 logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1934 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
1935 realtask = self.rqdata.runq_setscene[nexttask]
1936 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1937 foundtarget = False
1938 for target in self.rqdata.target_pairs:
1939 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1940 foundtarget = True
1941 break
1942 if not foundtarget:
1943 logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1944 self.task_skip(nexttask)
1945 self.scenequeue_notneeded.add(nexttask)
1946 return True
1947 if nexttask in self.outrightfail:
1948 self.task_failoutright(nexttask)
1949 return True
1950 task = nexttask
1951 break
1952 if task is not None:
1953 realtask = self.rqdata.runq_setscene[task]
1954 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1955
1956 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1957 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
1958 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1959 task, self.rqdata.get_user_idstring(realtask))
1960 self.task_failoutright(task)
1961 return True
1962
1963 if self.cooker.configuration.force:
1964 for target in self.rqdata.target_pairs:
1965 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1966 self.task_failoutright(task)
1967 return True
1968
1969 if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
1970 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1971 task, self.rqdata.get_user_idstring(realtask))
1972 self.task_skip(task)
1973 return True
1974
1975 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1976 bb.event.fire(startevent, self.cfgData)
1977
1978 taskdep = self.rqdata.dataCache.task_deps[fn]
1979 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1980 if not self.rq.fakeworker:
1981 self.rq.start_fakeworker(self)
1982 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1983 self.rq.fakeworker.stdin.flush()
1984 else:
1985 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1986 self.rq.worker.stdin.flush()
1987
1988 self.runq_running[task] = 1
1989 self.stats.taskActive()
1990 if self.stats.active < self.number_tasks:
1991 return True
1992
1993 if self.stats.active > 0:
1994 self.rq.read_workers()
1995 return self.rq.active_fds()
1996
1997 #for task in xrange(self.stats.total):
1998 # if self.runq_running[task] != 1:
1999 # buildable = self.runq_buildable[task]
2000 # revdeps = self.sq_revdeps[task]
2001 # bb.warn("Found we didn't run %s %s %s %s" % (task, buildable, str(revdeps), self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
2002
2003 # Convert scenequeue_covered task numbers into full taskgraph ids
2004 oldcovered = self.scenequeue_covered
2005 self.rq.scenequeue_covered = set()
2006 for task in oldcovered:
2007 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
2008 self.rq.scenequeue_notcovered = set()
2009 for task in self.scenequeue_notcovered:
2010 self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task])
2011
2012 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
2013
2014 self.rq.state = runQueueRunInit
2015
2016 completeevent = sceneQueueComplete(self.stats, self.rq)
2017 bb.event.fire(completeevent, self.cfgData)
2018
2019 return True
2020
2021 def runqueue_process_waitpid(self, task, status):
2022 task = self.rq.rqdata.runq_setscene.index(task)
2023
2024 RunQueueExecute.runqueue_process_waitpid(self, task, status)
2025
2026class TaskFailure(Exception):
2027 """
2028 Exception raised when a task in a runqueue fails
2029 """
2030 def __init__(self, x):
2031 self.args = x
2032
2033
2034class runQueueExitWait(bb.event.Event):
2035 """
2036 Event when waiting for task processes to exit
2037 """
2038
2039 def __init__(self, remain):
2040 self.remain = remain
2041 self.message = "Waiting for %s active tasks to finish" % remain
2042 bb.event.Event.__init__(self)
2043
2044class runQueueEvent(bb.event.Event):
2045 """
2046 Base runQueue event class
2047 """
2048 def __init__(self, task, stats, rq):
2049 self.taskid = task
2050 self.taskstring = rq.rqdata.get_user_idstring(task)
2051 self.taskname = rq.rqdata.get_task_name(task)
2052 self.taskfile = rq.rqdata.get_task_file(task)
2053 self.taskhash = rq.rqdata.get_task_hash(task)
2054 self.stats = stats.copy()
2055 bb.event.Event.__init__(self)
2056
2057class sceneQueueEvent(runQueueEvent):
2058 """
2059 Base sceneQueue event class
2060 """
2061 def __init__(self, task, stats, rq, noexec=False):
2062 runQueueEvent.__init__(self, task, stats, rq)
2063 realtask = rq.rqdata.runq_setscene[task]
2064 self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
2065 self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene"
2066 self.taskfile = rq.rqdata.get_task_file(realtask)
2067 self.taskhash = rq.rqdata.get_task_hash(realtask)
2068
2069class runQueueTaskStarted(runQueueEvent):
2070 """
2071 Event notifying a task was started
2072 """
2073 def __init__(self, task, stats, rq, noexec=False):
2074 runQueueEvent.__init__(self, task, stats, rq)
2075 self.noexec = noexec
2076
2077class sceneQueueTaskStarted(sceneQueueEvent):
2078 """
2079 Event notifying a setscene task was started
2080 """
2081 def __init__(self, task, stats, rq, noexec=False):
2082 sceneQueueEvent.__init__(self, task, stats, rq)
2083 self.noexec = noexec
2084
2085class runQueueTaskFailed(runQueueEvent):
2086 """
2087 Event notifying a task failed
2088 """
2089 def __init__(self, task, stats, exitcode, rq):
2090 runQueueEvent.__init__(self, task, stats, rq)
2091 self.exitcode = exitcode
2092
2093class sceneQueueTaskFailed(sceneQueueEvent):
2094 """
2095 Event notifying a setscene task failed
2096 """
2097 def __init__(self, task, stats, exitcode, rq):
2098 sceneQueueEvent.__init__(self, task, stats, rq)
2099 self.exitcode = exitcode
2100
2101class sceneQueueComplete(sceneQueueEvent):
2102 """
2103 Event when all the sceneQueue tasks are complete
2104 """
2105 def __init__(self, stats, rq):
2106 self.stats = stats.copy()
2107 bb.event.Event.__init__(self)
2108
2109class runQueueTaskCompleted(runQueueEvent):
2110 """
2111 Event notifying a task completed
2112 """
2113
2114class sceneQueueTaskCompleted(sceneQueueEvent):
2115 """
2116 Event notifying a setscene task completed
2117 """
2118
2119class runQueueTaskSkipped(runQueueEvent):
2120 """
2121 Event notifying a task was skipped
2122 """
2123 def __init__(self, task, stats, rq, reason):
2124 runQueueEvent.__init__(self, task, stats, rq)
2125 self.reason = reason
2126
2127class runQueuePipe():
2128 """
2129 Abstraction for a pipe between a worker thread and the server
2130 """
2131 def __init__(self, pipein, pipeout, d, rq, rqexec):
2132 self.input = pipein
2133 if pipeout:
2134 pipeout.close()
2135 bb.utils.nonblockingfd(self.input)
2136 self.queue = ""
2137 self.d = d
2138 self.rq = rq
2139 self.rqexec = rqexec
2140
2141 def setrunqueueexec(self, rqexec):
2142 self.rqexec = rqexec
2143
2144 def read(self):
2145 for w in [self.rq.worker, self.rq.fakeworker]:
2146 if not w:
2147 continue
2148 w.poll()
2149 if w.returncode is not None and not self.rq.teardown:
2150 name = None
2151 if self.rq.worker and w.pid == self.rq.worker.pid:
2152 name = "Worker"
2153 elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
2154 name = "Fakeroot"
2155 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
2156 self.rq.finish_runqueue(True)
2157
2158 start = len(self.queue)
2159 try:
2160 self.queue = self.queue + self.input.read(102400)
2161 except (OSError, IOError) as e:
2162 if e.errno != errno.EAGAIN:
2163 raise
2164 end = len(self.queue)
2165 found = True
2166 while found and len(self.queue):
2167 found = False
2168 index = self.queue.find("</event>")
2169 while index != -1 and self.queue.startswith("<event>"):
2170 try:
2171 event = pickle.loads(self.queue[7:index])
2172 except ValueError as e:
2173 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2174 bb.event.fire_from_worker(event, self.d)
2175 found = True
2176 self.queue = self.queue[index+8:]
2177 index = self.queue.find("</event>")
2178 index = self.queue.find("</exitcode>")
2179 while index != -1 and self.queue.startswith("<exitcode>"):
2180 try:
2181 task, status = pickle.loads(self.queue[10:index])
2182 except ValueError as e:
2183 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
2184 self.rqexec.runqueue_process_waitpid(task, status)
2185 found = True
2186 self.queue = self.queue[index+11:]
2187 index = self.queue.find("</exitcode>")
2188 return (end > start)
2189
2190 def close(self):
2191 while self.read():
2192 continue
2193 if len(self.queue) > 0:
2194 print("Warning, worker left partial message: %s" % self.queue)
2195 self.input.close()