blob: 878028aa97e3820fdff5e6e9e57425e6b511b309 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import re
34import bb
35from bb import msg, data, event
36from bb import monitordisk
37import subprocess
38
39try:
40 import cPickle as pickle
41except ImportError:
42 import pickle
43
44bblogger = logging.getLogger("BitBake")
45logger = logging.getLogger("BitBake.RunQueue")
46
47__find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' )
48
49class RunQueueStats:
50 """
51 Holds statistics on the tasks handled by the associated runQueue
52 """
53 def __init__(self, total):
54 self.completed = 0
55 self.skipped = 0
56 self.failed = 0
57 self.active = 0
58 self.total = total
59
60 def copy(self):
61 obj = self.__class__(self.total)
62 obj.__dict__.update(self.__dict__)
63 return obj
64
65 def taskFailed(self):
66 self.active = self.active - 1
67 self.failed = self.failed + 1
68
69 def taskCompleted(self, number = 1):
70 self.active = self.active - number
71 self.completed = self.completed + number
72
73 def taskSkipped(self, number = 1):
74 self.active = self.active + number
75 self.skipped = self.skipped + number
76
77 def taskActive(self):
78 self.active = self.active + 1
79
80# These values indicate the next step due to be run in the
81# runQueue state machine
82runQueuePrepare = 2
83runQueueSceneInit = 3
84runQueueSceneRun = 4
85runQueueRunInit = 5
86runQueueRunning = 6
87runQueueFailed = 7
88runQueueCleanUp = 8
89runQueueComplete = 9
90
91class RunQueueScheduler(object):
92 """
93 Control the order tasks are scheduled in.
94 """
95 name = "basic"
96
97 def __init__(self, runqueue, rqdata):
98 """
99 The default scheduler just returns the first buildable task (the
100 priority map is sorted by task number)
101 """
102 self.rq = runqueue
103 self.rqdata = rqdata
104 self.numTasks = len(self.rqdata.runq_fnid)
105
106 self.prio_map = []
107 self.prio_map.extend(range(self.numTasks))
108
109 self.buildable = []
110 self.stamps = {}
111 for taskid in xrange(self.numTasks):
112 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
113 taskname = self.rqdata.runq_task[taskid]
114 self.stamps[taskid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
115 if self.rq.runq_buildable[taskid] == 1:
116 self.buildable.append(taskid)
117
118 self.rev_prio_map = None
119
120 def next_buildable_task(self):
121 """
122 Return the id of the first task we find that is buildable
123 """
124 self.buildable = [x for x in self.buildable if not self.rq.runq_running[x] == 1]
125 if not self.buildable:
126 return None
127 if len(self.buildable) == 1:
128 taskid = self.buildable[0]
129 stamp = self.stamps[taskid]
130 if stamp not in self.rq.build_stamps.itervalues():
131 return taskid
132
133 if not self.rev_prio_map:
134 self.rev_prio_map = range(self.numTasks)
135 for taskid in xrange(self.numTasks):
136 self.rev_prio_map[self.prio_map[taskid]] = taskid
137
138 best = None
139 bestprio = None
140 for taskid in self.buildable:
141 prio = self.rev_prio_map[taskid]
142 if bestprio is None or bestprio > prio:
143 stamp = self.stamps[taskid]
144 if stamp in self.rq.build_stamps.itervalues():
145 continue
146 bestprio = prio
147 best = taskid
148
149 return best
150
151 def next(self):
152 """
153 Return the id of the task we should build next
154 """
155 if self.rq.stats.active < self.rq.number_tasks:
156 return self.next_buildable_task()
157
158 def newbuilable(self, task):
159 self.buildable.append(task)
160
161class RunQueueSchedulerSpeed(RunQueueScheduler):
162 """
163 A scheduler optimised for speed. The priority map is sorted by task weight,
164 heavier weighted tasks (tasks needed by the most other tasks) are run first.
165 """
166 name = "speed"
167
168 def __init__(self, runqueue, rqdata):
169 """
170 The priority map is sorted by task weight.
171 """
172 RunQueueScheduler.__init__(self, runqueue, rqdata)
173
174 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
175 copyweight = copy.deepcopy(self.rqdata.runq_weight)
176 self.prio_map = []
177
178 for weight in sortweight:
179 idx = copyweight.index(weight)
180 self.prio_map.append(idx)
181 copyweight[idx] = -1
182
183 self.prio_map.reverse()
184
185class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
186 """
187 A scheduler optimised to complete .bb files are quickly as possible. The
188 priority map is sorted by task weight, but then reordered so once a given
189 .bb file starts to build, it's completed as quickly as possible. This works
190 well where disk space is at a premium and classes like OE's rm_work are in
191 force.
192 """
193 name = "completion"
194
195 def __init__(self, runqueue, rqdata):
196 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
197
198 #FIXME - whilst this groups all fnids together it does not reorder the
199 #fnid groups optimally.
200
201 basemap = copy.deepcopy(self.prio_map)
202 self.prio_map = []
203 while (len(basemap) > 0):
204 entry = basemap.pop(0)
205 self.prio_map.append(entry)
206 fnid = self.rqdata.runq_fnid[entry]
207 todel = []
208 for entry in basemap:
209 entry_fnid = self.rqdata.runq_fnid[entry]
210 if entry_fnid == fnid:
211 todel.append(basemap.index(entry))
212 self.prio_map.append(entry)
213 todel.reverse()
214 for idx in todel:
215 del basemap[idx]
216
217class RunQueueData:
218 """
219 BitBake Run Queue implementation
220 """
221 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
222 self.cooker = cooker
223 self.dataCache = dataCache
224 self.taskData = taskData
225 self.targets = targets
226 self.rq = rq
227 self.warn_multi_bb = False
228
229 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
230 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
231
232 self.reset()
233
234 def reset(self):
235 self.runq_fnid = []
236 self.runq_task = []
237 self.runq_depends = []
238 self.runq_revdeps = []
239 self.runq_hash = []
240
241 def runq_depends_names(self, ids):
242 import re
243 ret = []
244 for id in self.runq_depends[ids]:
245 nam = os.path.basename(self.get_user_idstring(id))
246 nam = re.sub("_[^,]*,", ",", nam)
247 ret.extend([nam])
248 return ret
249
250 def get_task_name(self, task):
251 return self.runq_task[task]
252
253 def get_task_file(self, task):
254 return self.taskData.fn_index[self.runq_fnid[task]]
255
256 def get_task_hash(self, task):
257 return self.runq_hash[task]
258
259 def get_user_idstring(self, task, task_name_suffix = ""):
260 fn = self.taskData.fn_index[self.runq_fnid[task]]
261 taskname = self.runq_task[task] + task_name_suffix
262 return "%s, %s" % (fn, taskname)
263
264 def get_task_id(self, fnid, taskname):
265 for listid in xrange(len(self.runq_fnid)):
266 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
267 return listid
268 return None
269
270 def circular_depchains_handler(self, tasks):
271 """
272 Some tasks aren't buildable, likely due to circular dependency issues.
273 Identify the circular dependencies and print them in a user readable format.
274 """
275 from copy import deepcopy
276
277 valid_chains = []
278 explored_deps = {}
279 msgs = []
280
281 def chain_reorder(chain):
282 """
283 Reorder a dependency chain so the lowest task id is first
284 """
285 lowest = 0
286 new_chain = []
287 for entry in xrange(len(chain)):
288 if chain[entry] < chain[lowest]:
289 lowest = entry
290 new_chain.extend(chain[lowest:])
291 new_chain.extend(chain[:lowest])
292 return new_chain
293
294 def chain_compare_equal(chain1, chain2):
295 """
296 Compare two dependency chains and see if they're the same
297 """
298 if len(chain1) != len(chain2):
299 return False
300 for index in xrange(len(chain1)):
301 if chain1[index] != chain2[index]:
302 return False
303 return True
304
305 def chain_array_contains(chain, chain_array):
306 """
307 Return True if chain_array contains chain
308 """
309 for ch in chain_array:
310 if chain_compare_equal(ch, chain):
311 return True
312 return False
313
314 def find_chains(taskid, prev_chain):
315 prev_chain.append(taskid)
316 total_deps = []
317 total_deps.extend(self.runq_revdeps[taskid])
318 for revdep in self.runq_revdeps[taskid]:
319 if revdep in prev_chain:
320 idx = prev_chain.index(revdep)
321 # To prevent duplicates, reorder the chain to start with the lowest taskid
322 # and search through an array of those we've already printed
323 chain = prev_chain[idx:]
324 new_chain = chain_reorder(chain)
325 if not chain_array_contains(new_chain, valid_chains):
326 valid_chains.append(new_chain)
327 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
328 for dep in new_chain:
329 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
330 msgs.append("\n")
331 if len(valid_chains) > 10:
332 msgs.append("Aborted dependency loops search after 10 matches.\n")
333 return msgs
334 continue
335 scan = False
336 if revdep not in explored_deps:
337 scan = True
338 elif revdep in explored_deps[revdep]:
339 scan = True
340 else:
341 for dep in prev_chain:
342 if dep in explored_deps[revdep]:
343 scan = True
344 if scan:
345 find_chains(revdep, copy.deepcopy(prev_chain))
346 for dep in explored_deps[revdep]:
347 if dep not in total_deps:
348 total_deps.append(dep)
349
350 explored_deps[taskid] = total_deps
351
352 for task in tasks:
353 find_chains(task, [])
354
355 return msgs
356
357 def calculate_task_weights(self, endpoints):
358 """
359 Calculate a number representing the "weight" of each task. Heavier weighted tasks
360 have more dependencies and hence should be executed sooner for maximum speed.
361
362 This function also sanity checks the task list finding tasks that are not
363 possible to execute due to circular dependencies.
364 """
365
366 numTasks = len(self.runq_fnid)
367 weight = []
368 deps_left = []
369 task_done = []
370
371 for listid in xrange(numTasks):
372 task_done.append(False)
373 weight.append(1)
374 deps_left.append(len(self.runq_revdeps[listid]))
375
376 for listid in endpoints:
377 weight[listid] = 10
378 task_done[listid] = True
379
380 while True:
381 next_points = []
382 for listid in endpoints:
383 for revdep in self.runq_depends[listid]:
384 weight[revdep] = weight[revdep] + weight[listid]
385 deps_left[revdep] = deps_left[revdep] - 1
386 if deps_left[revdep] == 0:
387 next_points.append(revdep)
388 task_done[revdep] = True
389 endpoints = next_points
390 if len(next_points) == 0:
391 break
392
393 # Circular dependency sanity check
394 problem_tasks = []
395 for task in xrange(numTasks):
396 if task_done[task] is False or deps_left[task] != 0:
397 problem_tasks.append(task)
398 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
399 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
400
401 if problem_tasks:
402 message = "Unbuildable tasks were found.\n"
403 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
404 message = message + "Identifying dependency loops (this may take a short while)...\n"
405 logger.error(message)
406
407 msgs = self.circular_depchains_handler(problem_tasks)
408
409 message = "\n"
410 for msg in msgs:
411 message = message + msg
412 bb.msg.fatal("RunQueue", message)
413
414 return weight
415
416 def prepare(self):
417 """
418 Turn a set of taskData into a RunQueue and compute data needed
419 to optimise the execution order.
420 """
421
422 runq_build = []
423 recursivetasks = {}
424 recursiveitasks = {}
425 recursivetasksselfref = set()
426
427 taskData = self.taskData
428
429 if len(taskData.tasks_name) == 0:
430 # Nothing to do
431 return 0
432
433 logger.info("Preparing RunQueue")
434
435 # Step A - Work out a list of tasks to run
436 #
437 # Taskdata gives us a list of possible providers for every build and run
438 # target ordered by priority. It also gives information on each of those
439 # providers.
440 #
441 # To create the actual list of tasks to execute we fix the list of
442 # providers and then resolve the dependencies into task IDs. This
443 # process is repeated for each type of dependency (tdepends, deptask,
444 # rdeptast, recrdeptask, idepends).
445
446 def add_build_dependencies(depids, tasknames, depends):
447 for depid in depids:
448 # Won't be in build_targets if ASSUME_PROVIDED
449 if depid not in taskData.build_targets:
450 continue
451 depdata = taskData.build_targets[depid][0]
452 if depdata is None:
453 continue
454 for taskname in tasknames:
455 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
456 if taskid is not None:
457 depends.add(taskid)
458
459 def add_runtime_dependencies(depids, tasknames, depends):
460 for depid in depids:
461 if depid not in taskData.run_targets:
462 continue
463 depdata = taskData.run_targets[depid][0]
464 if depdata is None:
465 continue
466 for taskname in tasknames:
467 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
468 if taskid is not None:
469 depends.add(taskid)
470
471 def add_resolved_dependencies(depids, tasknames, depends):
472 for depid in depids:
473 for taskname in tasknames:
474 taskid = taskData.gettask_id_fromfnid(depid, taskname)
475 if taskid is not None:
476 depends.add(taskid)
477
478 for task in xrange(len(taskData.tasks_name)):
479 depends = set()
480 fnid = taskData.tasks_fnid[task]
481 fn = taskData.fn_index[fnid]
482 task_deps = self.dataCache.task_deps[fn]
483
484 #logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
485
486 if fnid not in taskData.failed_fnids:
487
488 # Resolve task internal dependencies
489 #
490 # e.g. addtask before X after Y
491 depends = set(taskData.tasks_tdepends[task])
492
493 # Resolve 'deptask' dependencies
494 #
495 # e.g. do_sometask[deptask] = "do_someothertask"
496 # (makes sure sometask runs after someothertask of all DEPENDS)
497 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
498 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
499 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
500
501 # Resolve 'rdeptask' dependencies
502 #
503 # e.g. do_sometask[rdeptask] = "do_someothertask"
504 # (makes sure sometask runs after someothertask of all RDEPENDS)
505 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
506 tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
507 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
508
509 # Resolve inter-task dependencies
510 #
511 # e.g. do_sometask[depends] = "targetname:do_someothertask"
512 # (makes sure sometask runs after targetname's someothertask)
513 idepends = taskData.tasks_idepends[task]
514 for (depid, idependtask) in idepends:
515 if depid in taskData.build_targets and not depid in taskData.failed_deps:
516 # Won't be in build_targets if ASSUME_PROVIDED
517 depdata = taskData.build_targets[depid][0]
518 if depdata is not None:
519 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
520 if taskid is None:
521 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
522 depends.add(taskid)
523 irdepends = taskData.tasks_irdepends[task]
524 for (depid, idependtask) in irdepends:
525 if depid in taskData.run_targets:
526 # Won't be in run_targets if ASSUME_PROVIDED
527 depdata = taskData.run_targets[depid][0]
528 if depdata is not None:
529 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
530 if taskid is None:
531 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
532 depends.add(taskid)
533
534 # Resolve recursive 'recrdeptask' dependencies (Part A)
535 #
536 # e.g. do_sometask[recrdeptask] = "do_someothertask"
537 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538 # We cover the recursive part of the dependencies below
539 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
540 tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
541 recursivetasks[task] = tasknames
542 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
543 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
544 if taskData.tasks_name[task] in tasknames:
545 recursivetasksselfref.add(task)
546
547 if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']:
548 recursiveitasks[task] = []
549 for t in task_deps['recideptask'][taskData.tasks_name[task]].split():
550 newdep = taskData.gettask_id_fromfnid(fnid, t)
551 recursiveitasks[task].append(newdep)
552
553 self.runq_fnid.append(taskData.tasks_fnid[task])
554 self.runq_task.append(taskData.tasks_name[task])
555 self.runq_depends.append(depends)
556 self.runq_revdeps.append(set())
557 self.runq_hash.append("")
558
559 runq_build.append(0)
560
561 # Resolve recursive 'recrdeptask' dependencies (Part B)
562 #
563 # e.g. do_sometask[recrdeptask] = "do_someothertask"
564 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
565 # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
566 extradeps = {}
567 for task in recursivetasks:
568 extradeps[task] = set(self.runq_depends[task])
569 tasknames = recursivetasks[task]
570 seendeps = set()
571 seenfnid = []
572
573 def generate_recdeps(t):
574 newdeps = set()
575 add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
576 extradeps[task].update(newdeps)
577 seendeps.add(t)
578 newdeps.add(t)
579 for i in newdeps:
580 for n in self.runq_depends[i]:
581 if n not in seendeps:
582 generate_recdeps(n)
583 generate_recdeps(task)
584
585 if task in recursiveitasks:
586 for dep in recursiveitasks[task]:
587 generate_recdeps(dep)
588
589 # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
590 for task in recursivetasks:
591 extradeps[task].difference_update(recursivetasksselfref)
592
593 for task in xrange(len(taskData.tasks_name)):
594 # Add in extra dependencies
595 if task in extradeps:
596 self.runq_depends[task] = extradeps[task]
597 # Remove all self references
598 if task in self.runq_depends[task]:
599 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
600 self.runq_depends[task].remove(task)
601
602 # Step B - Mark all active tasks
603 #
604 # Start with the tasks we were asked to run and mark all dependencies
605 # as active too. If the task is to be 'forced', clear its stamp. Once
606 # all active tasks are marked, prune the ones we don't need.
607
608 logger.verbose("Marking Active Tasks")
609
610 def mark_active(listid, depth):
611 """
612 Mark an item as active along with its depends
613 (calls itself recursively)
614 """
615
616 if runq_build[listid] == 1:
617 return
618
619 runq_build[listid] = 1
620
621 depends = self.runq_depends[listid]
622 for depend in depends:
623 mark_active(depend, depth+1)
624
625 self.target_pairs = []
626 for target in self.targets:
627 targetid = taskData.getbuild_id(target[0])
628
629 if targetid not in taskData.build_targets:
630 continue
631
632 if targetid in taskData.failed_deps:
633 continue
634
635 fnid = taskData.build_targets[targetid][0]
636 fn = taskData.fn_index[fnid]
637 self.target_pairs.append((fn, target[1]))
638
639 if fnid in taskData.failed_fnids:
640 continue
641
642 if target[1] not in taskData.tasks_lookup[fnid]:
643 import difflib
644 close_matches = difflib.get_close_matches(target[1], taskData.tasks_lookup[fnid], cutoff=0.7)
645 if close_matches:
646 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
647 else:
648 extra = ""
649 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (target[1], target[0], extra))
650
651 listid = taskData.tasks_lookup[fnid][target[1]]
652
653 mark_active(listid, 1)
654
655 # Step C - Prune all inactive tasks
656 #
657 # Once all active tasks are marked, prune the ones we don't need.
658
659 maps = []
660 delcount = 0
661 for listid in xrange(len(self.runq_fnid)):
662 if runq_build[listid-delcount] == 1:
663 maps.append(listid-delcount)
664 else:
665 del self.runq_fnid[listid-delcount]
666 del self.runq_task[listid-delcount]
667 del self.runq_depends[listid-delcount]
668 del runq_build[listid-delcount]
669 del self.runq_revdeps[listid-delcount]
670 del self.runq_hash[listid-delcount]
671 delcount = delcount + 1
672 maps.append(-1)
673
674 #
675 # Step D - Sanity checks and computation
676 #
677
678 # Check to make sure we still have tasks to run
679 if len(self.runq_fnid) == 0:
680 if not taskData.abort:
681 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
682 else:
683 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
684
685 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
686
687 # Remap the dependencies to account for the deleted tasks
688 # Check we didn't delete a task we depend on
689 for listid in xrange(len(self.runq_fnid)):
690 newdeps = []
691 origdeps = self.runq_depends[listid]
692 for origdep in origdeps:
693 if maps[origdep] == -1:
694 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
695 newdeps.append(maps[origdep])
696 self.runq_depends[listid] = set(newdeps)
697
698 logger.verbose("Assign Weightings")
699
700 # Generate a list of reverse dependencies to ease future calculations
701 for listid in xrange(len(self.runq_fnid)):
702 for dep in self.runq_depends[listid]:
703 self.runq_revdeps[dep].add(listid)
704
705 # Identify tasks at the end of dependency chains
706 # Error on circular dependency loops (length two)
707 endpoints = []
708 for listid in xrange(len(self.runq_fnid)):
709 revdeps = self.runq_revdeps[listid]
710 if len(revdeps) == 0:
711 endpoints.append(listid)
712 for dep in revdeps:
713 if dep in self.runq_depends[listid]:
714 #self.dump_data(taskData)
715 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
716
717 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
718
719 # Calculate task weights
720 # Check of higher length circular dependencies
721 self.runq_weight = self.calculate_task_weights(endpoints)
722
723 # Sanity Check - Check for multiple tasks building the same provider
724 prov_list = {}
725 seen_fn = []
726 for task in xrange(len(self.runq_fnid)):
727 fn = taskData.fn_index[self.runq_fnid[task]]
728 if fn in seen_fn:
729 continue
730 seen_fn.append(fn)
731 for prov in self.dataCache.fn_provides[fn]:
732 if prov not in prov_list:
733 prov_list[prov] = [fn]
734 elif fn not in prov_list[prov]:
735 prov_list[prov].append(fn)
736 for prov in prov_list:
737 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
738 seen_pn = []
739 # If two versions of the same PN are being built its fatal, we don't support it.
740 for fn in prov_list[prov]:
741 pn = self.dataCache.pkg_fn[fn]
742 if pn not in seen_pn:
743 seen_pn.append(pn)
744 else:
745 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
746 msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov]))
747 if self.warn_multi_bb:
748 logger.warn(msg)
749 else:
750 msg += "\n This usually means one provides something the other doesn't and should."
751 logger.error(msg)
752
753 # Create a whitelist usable by the stamp checks
754 stampfnwhitelist = []
755 for entry in self.stampwhitelist.split():
756 entryid = self.taskData.getbuild_id(entry)
757 if entryid not in self.taskData.build_targets:
758 continue
759 fnid = self.taskData.build_targets[entryid][0]
760 fn = self.taskData.fn_index[fnid]
761 stampfnwhitelist.append(fn)
762 self.stampfnwhitelist = stampfnwhitelist
763
764 # Iterate over the task list looking for tasks with a 'setscene' function
765 self.runq_setscene = []
766 if not self.cooker.configuration.nosetscene:
767 for task in range(len(self.runq_fnid)):
768 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
769 if not setscene:
770 continue
771 self.runq_setscene.append(task)
772
773 def invalidate_task(fn, taskname, error_nostamp):
774 taskdep = self.dataCache.task_deps[fn]
775 fnid = self.taskData.getfn_id(fn)
776 if taskname not in taskData.tasks_lookup[fnid]:
777 logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname)
778 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
779 if error_nostamp:
780 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
781 else:
782 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
783 else:
784 logger.verbose("Invalidate task %s, %s", taskname, fn)
785 bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
786
787 # Invalidate task if force mode active
788 if self.cooker.configuration.force:
789 for (fn, target) in self.target_pairs:
790 invalidate_task(fn, target, False)
791
792 # Invalidate task if invalidate mode active
793 if self.cooker.configuration.invalidate_stamp:
794 for (fn, target) in self.target_pairs:
795 for st in self.cooker.configuration.invalidate_stamp.split(','):
796 if not st.startswith("do_"):
797 st = "do_%s" % st
798 invalidate_task(fn, st, True)
799
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500800 # Create and print to the logs a virtual/xxxx -> PN (fn) table
801 virtmap = taskData.get_providermap()
802 virtpnmap = {}
803 for v in virtmap:
804 virtpnmap[v] = self.dataCache.pkg_fn[virtmap[v]]
805 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
806 if hasattr(bb.parse.siggen, "tasks_resolved"):
807 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCache)
808
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500809 # Iterate over the task list and call into the siggen code
810 dealtwith = set()
811 todeal = set(range(len(self.runq_fnid)))
812 while len(todeal) > 0:
813 for task in todeal.copy():
814 if len(self.runq_depends[task] - dealtwith) == 0:
815 dealtwith.add(task)
816 todeal.remove(task)
817 procdep = []
818 for dep in self.runq_depends[task]:
819 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
820 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
821
822 return len(self.runq_fnid)
823
824 def dump_data(self, taskQueue):
825 """
826 Dump some debug information on the internal data structures
827 """
828 logger.debug(3, "run_tasks:")
829 for task in xrange(len(self.rqdata.runq_task)):
830 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
831 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
832 self.rqdata.runq_task[task],
833 self.rqdata.runq_weight[task],
834 self.rqdata.runq_depends[task],
835 self.rqdata.runq_revdeps[task])
836
837 logger.debug(3, "sorted_tasks:")
838 for task1 in xrange(len(self.rqdata.runq_task)):
839 if task1 in self.prio_map:
840 task = self.prio_map[task1]
841 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
842 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
843 self.rqdata.runq_task[task],
844 self.rqdata.runq_weight[task],
845 self.rqdata.runq_depends[task],
846 self.rqdata.runq_revdeps[task])
847
848class RunQueue:
849 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
850
851 self.cooker = cooker
852 self.cfgData = cfgData
853 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
854
855 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
856 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
857 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
858 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
859
860 self.state = runQueuePrepare
861
862 # For disk space monitor
863 self.dm = monitordisk.diskMonitor(cfgData)
864
865 self.rqexe = None
866 self.worker = None
867 self.workerpipe = None
868 self.fakeworker = None
869 self.fakeworkerpipe = None
870
871 def _start_worker(self, fakeroot = False, rqexec = None):
872 logger.debug(1, "Starting bitbake-worker")
873 magic = "decafbad"
874 if self.cooker.configuration.profile:
875 magic = "decafbadbad"
876 if fakeroot:
877 fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
878 fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
879 env = os.environ.copy()
880 for key, value in (var.split('=') for var in fakerootenv):
881 env[key] = value
882 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
883 else:
884 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
885 bb.utils.nonblockingfd(worker.stdout)
886 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec)
887
888 workerdata = {
889 "taskdeps" : self.rqdata.dataCache.task_deps,
890 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
891 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
892 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
893 "sigdata" : bb.parse.siggen.get_taskdata(),
894 "runq_hash" : self.rqdata.runq_hash,
895 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
896 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
897 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
898 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
899 "prhost" : self.cooker.prhost,
900 "buildname" : self.cfgData.getVar("BUILDNAME", True),
901 "date" : self.cfgData.getVar("DATE", True),
902 "time" : self.cfgData.getVar("TIME", True),
903 }
904
905 worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
906 worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
907 worker.stdin.flush()
908
909 return worker, workerpipe
910
911 def _teardown_worker(self, worker, workerpipe):
912 if not worker:
913 return
914 logger.debug(1, "Teardown for bitbake-worker")
915 try:
916 worker.stdin.write("<quit></quit>")
917 worker.stdin.flush()
918 except IOError:
919 pass
920 while worker.returncode is None:
921 workerpipe.read()
922 worker.poll()
923 while workerpipe.read():
924 continue
925 workerpipe.close()
926
927 def start_worker(self):
928 if self.worker:
929 self.teardown_workers()
930 self.teardown = False
931 self.worker, self.workerpipe = self._start_worker()
932
933 def start_fakeworker(self, rqexec):
934 if not self.fakeworker:
935 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
936
937 def teardown_workers(self):
938 self.teardown = True
939 self._teardown_worker(self.worker, self.workerpipe)
940 self.worker = None
941 self.workerpipe = None
942 self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
943 self.fakeworker = None
944 self.fakeworkerpipe = None
945
946 def read_workers(self):
947 self.workerpipe.read()
948 if self.fakeworkerpipe:
949 self.fakeworkerpipe.read()
950
951 def active_fds(self):
952 fds = []
953 if self.workerpipe:
954 fds.append(self.workerpipe.input)
955 if self.fakeworkerpipe:
956 fds.append(self.fakeworkerpipe.input)
957 return fds
958
959 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
960 def get_timestamp(f):
961 try:
962 if not os.access(f, os.F_OK):
963 return None
964 return os.stat(f)[stat.ST_MTIME]
965 except:
966 return None
967
968 if self.stamppolicy == "perfile":
969 fulldeptree = False
970 else:
971 fulldeptree = True
972 stampwhitelist = []
973 if self.stamppolicy == "whitelist":
974 stampwhitelist = self.rqdata.stampfnwhitelist
975
976 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
977 if taskname is None:
978 taskname = self.rqdata.runq_task[task]
979
980 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
981
982 # If the stamp is missing, it's not current
983 if not os.access(stampfile, os.F_OK):
984 logger.debug(2, "Stampfile %s not available", stampfile)
985 return False
986 # If it's a 'nostamp' task, it's not current
987 taskdep = self.rqdata.dataCache.task_deps[fn]
988 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
989 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
990 return False
991
992 if taskname != "do_setscene" and taskname.endswith("_setscene"):
993 return True
994
995 if cache is None:
996 cache = {}
997
998 iscurrent = True
999 t1 = get_timestamp(stampfile)
1000 for dep in self.rqdata.runq_depends[task]:
1001 if iscurrent:
1002 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
1003 taskname2 = self.rqdata.runq_task[dep]
1004 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
1005 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
1006 t2 = get_timestamp(stampfile2)
1007 t3 = get_timestamp(stampfile3)
1008 if t3 and t3 > t2:
1009 continue
1010 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
1011 if not t2:
1012 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
1013 iscurrent = False
1014 if t1 < t2:
1015 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
1016 iscurrent = False
1017 if recurse and iscurrent:
1018 if dep in cache:
1019 iscurrent = cache[dep]
1020 if not iscurrent:
1021 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1022 else:
1023 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1024 cache[dep] = iscurrent
1025 if recurse:
1026 cache[task] = iscurrent
1027 return iscurrent
1028
1029 def _execute_runqueue(self):
1030 """
1031 Run the tasks in a queue prepared by rqdata.prepare()
1032 Upon failure, optionally try to recover the build using any alternate providers
1033 (if the abort on failure configuration option isn't set)
1034 """
1035
1036 retval = True
1037
1038 if self.state is runQueuePrepare:
1039 self.rqexe = RunQueueExecuteDummy(self)
1040 if self.rqdata.prepare() == 0:
1041 self.state = runQueueComplete
1042 else:
1043 self.state = runQueueSceneInit
1044
1045 # we are ready to run, see if any UI client needs the dependency info
1046 if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset:
1047 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1048 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1049
1050 if self.state is runQueueSceneInit:
1051 dump = self.cooker.configuration.dump_signatures
1052 if dump:
1053 if 'printdiff' in dump:
1054 invalidtasks = self.print_diffscenetasks()
1055 self.dump_signatures(dump)
1056 if 'printdiff' in dump:
1057 self.write_diffscenetasks(invalidtasks)
1058 self.state = runQueueComplete
1059 else:
1060 self.start_worker()
1061 self.rqexe = RunQueueExecuteScenequeue(self)
1062
1063 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
1064 self.dm.check(self)
1065
1066 if self.state is runQueueSceneRun:
1067 retval = self.rqexe.execute()
1068
1069 if self.state is runQueueRunInit:
1070 logger.info("Executing RunQueue Tasks")
1071 self.rqexe = RunQueueExecuteTasks(self)
1072 self.state = runQueueRunning
1073
1074 if self.state is runQueueRunning:
1075 retval = self.rqexe.execute()
1076
1077 if self.state is runQueueCleanUp:
1078 retval = self.rqexe.finish()
1079
1080 if (self.state is runQueueComplete or self.state is runQueueFailed) and self.rqexe:
1081 self.teardown_workers()
1082 if self.rqexe.stats.failed:
1083 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1084 else:
1085 # Let's avoid the word "failed" if nothing actually did
1086 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1087
1088 if self.state is runQueueFailed:
1089 if not self.rqdata.taskData.tryaltconfigs:
1090 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
1091 for fnid in self.rqexe.failed_fnids:
1092 self.rqdata.taskData.fail_fnid(fnid)
1093 self.rqdata.reset()
1094
1095 if self.state is runQueueComplete:
1096 # All done
1097 return False
1098
1099 # Loop
1100 return retval
1101
1102 def execute_runqueue(self):
1103 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1104 try:
1105 return self._execute_runqueue()
1106 except bb.runqueue.TaskFailure:
1107 raise
1108 except SystemExit:
1109 raise
1110 except bb.BBHandledException:
1111 try:
1112 self.teardown_workers()
1113 except:
1114 pass
1115 self.state = runQueueComplete
1116 raise
1117 except:
1118 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
1119 try:
1120 self.teardown_workers()
1121 except:
1122 pass
1123 self.state = runQueueComplete
1124 raise
1125
1126 def finish_runqueue(self, now = False):
1127 if not self.rqexe:
1128 self.state = runQueueComplete
1129 return
1130
1131 if now:
1132 self.rqexe.finish_now()
1133 else:
1134 self.rqexe.finish()
1135
1136 def dump_signatures(self, options):
1137 done = set()
1138 bb.note("Reparsing files to collect dependency data")
1139 for task in range(len(self.rqdata.runq_fnid)):
1140 if self.rqdata.runq_fnid[task] not in done:
1141 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1142 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1143 done.add(self.rqdata.runq_fnid[task])
1144
1145 bb.parse.siggen.dump_sigs(self.rqdata.dataCache, options)
1146
1147 return
1148
1149 def print_diffscenetasks(self):
1150
1151 valid = []
1152 sq_hash = []
1153 sq_hashfn = []
1154 sq_fn = []
1155 sq_taskname = []
1156 sq_task = []
1157 noexec = []
1158 stamppresent = []
1159 valid_new = set()
1160
1161 for task in xrange(len(self.rqdata.runq_fnid)):
1162 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1163 taskname = self.rqdata.runq_task[task]
1164 taskdep = self.rqdata.dataCache.task_deps[fn]
1165
1166 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1167 noexec.append(task)
1168 continue
1169
1170 sq_fn.append(fn)
1171 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1172 sq_hash.append(self.rqdata.runq_hash[task])
1173 sq_taskname.append(taskname)
1174 sq_task.append(task)
1175 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.expanded_data }
1176 try:
1177 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=True)"
1178 valid = bb.utils.better_eval(call, locs)
1179 # Handle version with no siginfo parameter
1180 except TypeError:
1181 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1182 valid = bb.utils.better_eval(call, locs)
1183 for v in valid:
1184 valid_new.add(sq_task[v])
1185
1186 # Tasks which are both setscene and noexec never care about dependencies
1187 # We therefore find tasks which are setscene and noexec and mark their
1188 # unique dependencies as valid.
1189 for task in noexec:
1190 if task not in self.rqdata.runq_setscene:
1191 continue
1192 for dep in self.rqdata.runq_depends[task]:
1193 hasnoexecparents = True
1194 for dep2 in self.rqdata.runq_revdeps[dep]:
1195 if dep2 in self.rqdata.runq_setscene and dep2 in noexec:
1196 continue
1197 hasnoexecparents = False
1198 break
1199 if hasnoexecparents:
1200 valid_new.add(dep)
1201
1202 invalidtasks = set()
1203 for task in xrange(len(self.rqdata.runq_fnid)):
1204 if task not in valid_new and task not in noexec:
1205 invalidtasks.add(task)
1206
1207 found = set()
1208 processed = set()
1209 for task in invalidtasks:
1210 toprocess = set([task])
1211 while toprocess:
1212 next = set()
1213 for t in toprocess:
1214 for dep in self.rqdata.runq_depends[t]:
1215 if dep in invalidtasks:
1216 found.add(task)
1217 if dep not in processed:
1218 processed.add(dep)
1219 next.add(dep)
1220 toprocess = next
1221 if task in found:
1222 toprocess = set()
1223
1224 tasklist = []
1225 for task in invalidtasks.difference(found):
1226 tasklist.append(self.rqdata.get_user_idstring(task))
1227
1228 if tasklist:
1229 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1230
1231 return invalidtasks.difference(found)
1232
1233 def write_diffscenetasks(self, invalidtasks):
1234
1235 # Define recursion callback
1236 def recursecb(key, hash1, hash2):
1237 hashes = [hash1, hash2]
1238 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1239
1240 recout = []
1241 if len(hashfiles) == 2:
1242 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1243 recout.extend(list(' ' + l for l in out2))
1244 else:
1245 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1246
1247 return recout
1248
1249
1250 for task in invalidtasks:
1251 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1252 pn = self.rqdata.dataCache.pkg_fn[fn]
1253 taskname = self.rqdata.runq_task[task]
1254 h = self.rqdata.runq_hash[task]
1255 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1256 match = None
1257 for m in matches:
1258 if h in m:
1259 match = m
1260 if match is None:
1261 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1262 matches = {k : v for k, v in matches.iteritems() if h not in k}
1263 if matches:
1264 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1265 prevh = __find_md5__.search(latestmatch).group(0)
1266 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1267 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
1268
1269class RunQueueExecute:
1270
1271 def __init__(self, rq):
1272 self.rq = rq
1273 self.cooker = rq.cooker
1274 self.cfgData = rq.cfgData
1275 self.rqdata = rq.rqdata
1276
1277 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
1278 self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
1279
1280 self.runq_buildable = []
1281 self.runq_running = []
1282 self.runq_complete = []
1283
1284 self.build_stamps = {}
1285 self.build_stamps2 = []
1286 self.failed_fnids = []
1287
1288 self.stampcache = {}
1289
1290 rq.workerpipe.setrunqueueexec(self)
1291 if rq.fakeworkerpipe:
1292 rq.fakeworkerpipe.setrunqueueexec(self)
1293
1294 if self.number_tasks <= 0:
1295 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1296
1297 def runqueue_process_waitpid(self, task, status):
1298
1299 # self.build_stamps[pid] may not exist when use shared work directory.
1300 if task in self.build_stamps:
1301 self.build_stamps2.remove(self.build_stamps[task])
1302 del self.build_stamps[task]
1303
1304 if status != 0:
1305 self.task_fail(task, status)
1306 else:
1307 self.task_complete(task)
1308 return True
1309
1310 def finish_now(self):
1311
1312 for worker in [self.rq.worker, self.rq.fakeworker]:
1313 if not worker:
1314 continue
1315 try:
1316 worker.stdin.write("<finishnow></finishnow>")
1317 worker.stdin.flush()
1318 except IOError:
1319 # worker must have died?
1320 pass
1321
1322 if len(self.failed_fnids) != 0:
1323 self.rq.state = runQueueFailed
1324 return
1325
1326 self.rq.state = runQueueComplete
1327 return
1328
1329 def finish(self):
1330 self.rq.state = runQueueCleanUp
1331
1332 if self.stats.active > 0:
1333 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1334 self.rq.read_workers()
1335 return self.rq.active_fds()
1336
1337 if len(self.failed_fnids) != 0:
1338 self.rq.state = runQueueFailed
1339 return True
1340
1341 self.rq.state = runQueueComplete
1342 return True
1343
1344 def check_dependencies(self, task, taskdeps, setscene = False):
1345 if not self.rq.depvalidate:
1346 return False
1347
1348 taskdata = {}
1349 taskdeps.add(task)
1350 for dep in taskdeps:
1351 if setscene:
1352 depid = self.rqdata.runq_setscene[dep]
1353 else:
1354 depid = dep
1355 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
1356 pn = self.rqdata.dataCache.pkg_fn[fn]
1357 taskname = self.rqdata.runq_task[depid]
1358 taskdata[dep] = [pn, taskname, fn]
1359 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1360 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.expanded_data }
1361 valid = bb.utils.better_eval(call, locs)
1362 return valid
1363
1364class RunQueueExecuteDummy(RunQueueExecute):
1365 def __init__(self, rq):
1366 self.rq = rq
1367 self.stats = RunQueueStats(0)
1368
1369 def finish(self):
1370 self.rq.state = runQueueComplete
1371 return
1372
1373class RunQueueExecuteTasks(RunQueueExecute):
1374 def __init__(self, rq):
1375 RunQueueExecute.__init__(self, rq)
1376
1377 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1378
1379 self.stampcache = {}
1380
1381 initial_covered = self.rq.scenequeue_covered.copy()
1382
1383 # Mark initial buildable tasks
1384 for task in xrange(self.stats.total):
1385 self.runq_running.append(0)
1386 self.runq_complete.append(0)
1387 if len(self.rqdata.runq_depends[task]) == 0:
1388 self.runq_buildable.append(1)
1389 else:
1390 self.runq_buildable.append(0)
1391 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1392 self.rq.scenequeue_covered.add(task)
1393
1394 found = True
1395 while found:
1396 found = False
1397 for task in xrange(self.stats.total):
1398 if task in self.rq.scenequeue_covered:
1399 continue
1400 logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
1401
1402 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1403 found = True
1404 self.rq.scenequeue_covered.add(task)
1405
1406 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1407
1408 # Allow the metadata to elect for setscene tasks to run anyway
1409 covered_remove = set()
1410 if self.rq.setsceneverify:
1411 invalidtasks = []
1412 for task in xrange(len(self.rqdata.runq_task)):
1413 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1414 taskname = self.rqdata.runq_task[task]
1415 taskdep = self.rqdata.dataCache.task_deps[fn]
1416
1417 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1418 continue
1419 if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
1420 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1421 continue
1422 if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
1423 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1424 continue
1425 invalidtasks.append(task)
1426
1427 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
1428 call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1429 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.expanded_data, "invalidtasks" : invalidtasks }
1430 # Backwards compatibility with older versions without invalidtasks
1431 try:
1432 covered_remove = bb.utils.better_eval(call, locs)
1433 except TypeError:
1434 covered_remove = bb.utils.better_eval(call2, locs)
1435
1436 def removecoveredtask(task):
1437 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1438 taskname = self.rqdata.runq_task[task] + '_setscene'
1439 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1440 self.rq.scenequeue_covered.remove(task)
1441
1442 toremove = covered_remove
1443 for task in toremove:
1444 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1445 while toremove:
1446 covered_remove = []
1447 for task in toremove:
1448 removecoveredtask(task)
1449 for deptask in self.rqdata.runq_depends[task]:
1450 if deptask not in self.rq.scenequeue_covered:
1451 continue
1452 if deptask in toremove or deptask in covered_remove or deptask in initial_covered:
1453 continue
1454 logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask))
1455 covered_remove.append(deptask)
1456 toremove = covered_remove
1457
1458 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1459
1460 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1461
1462 schedulers = self.get_schedulers()
1463 for scheduler in schedulers:
1464 if self.scheduler == scheduler.name:
1465 self.sched = scheduler(self, self.rqdata)
1466 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1467 break
1468 else:
1469 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1470 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1471
1472 def get_schedulers(self):
1473 schedulers = set(obj for obj in globals().values()
1474 if type(obj) is type and
1475 issubclass(obj, RunQueueScheduler))
1476
1477 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
1478 if user_schedulers:
1479 for sched in user_schedulers.split():
1480 if not "." in sched:
1481 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1482 continue
1483
1484 modname, name = sched.rsplit(".", 1)
1485 try:
1486 module = __import__(modname, fromlist=(name,))
1487 except ImportError as exc:
1488 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1489 raise SystemExit(1)
1490 else:
1491 schedulers.add(getattr(module, name))
1492 return schedulers
1493
1494 def setbuildable(self, task):
1495 self.runq_buildable[task] = 1
1496 self.sched.newbuilable(task)
1497
1498 def task_completeoutright(self, task):
1499 """
1500 Mark a task as completed
1501 Look at the reverse dependencies and mark any task with
1502 completed dependencies as buildable
1503 """
1504 self.runq_complete[task] = 1
1505 for revdep in self.rqdata.runq_revdeps[task]:
1506 if self.runq_running[revdep] == 1:
1507 continue
1508 if self.runq_buildable[revdep] == 1:
1509 continue
1510 alldeps = 1
1511 for dep in self.rqdata.runq_depends[revdep]:
1512 if self.runq_complete[dep] != 1:
1513 alldeps = 0
1514 if alldeps == 1:
1515 self.setbuildable(revdep)
1516 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1517 taskname = self.rqdata.runq_task[revdep]
1518 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1519
1520 def task_complete(self, task):
1521 self.stats.taskCompleted()
1522 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1523 self.task_completeoutright(task)
1524
1525 def task_fail(self, task, exitcode):
1526 """
1527 Called when a task has failed
1528 Updates the state engine with the failure
1529 """
1530 self.stats.taskFailed()
1531 fnid = self.rqdata.runq_fnid[task]
1532 self.failed_fnids.append(fnid)
1533 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1534 if self.rqdata.taskData.abort:
1535 self.rq.state = runQueueCleanUp
1536
1537 def task_skip(self, task, reason):
1538 self.runq_running[task] = 1
1539 self.setbuildable(task)
1540 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1541 self.task_completeoutright(task)
1542 self.stats.taskCompleted()
1543 self.stats.taskSkipped()
1544
1545 def execute(self):
1546 """
1547 Run the tasks in a queue prepared by rqdata.prepare()
1548 """
1549
1550 self.rq.read_workers()
1551
1552
1553 if self.stats.total == 0:
1554 # nothing to do
1555 self.rq.state = runQueueCleanUp
1556
1557 task = self.sched.next()
1558 if task is not None:
1559 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1560 taskname = self.rqdata.runq_task[task]
1561
1562 if task in self.rq.scenequeue_covered:
1563 logger.debug(2, "Setscene covered task %s (%s)", task,
1564 self.rqdata.get_user_idstring(task))
1565 self.task_skip(task, "covered")
1566 return True
1567
1568 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
1569 logger.debug(2, "Stamp current task %s (%s)", task,
1570 self.rqdata.get_user_idstring(task))
1571 self.task_skip(task, "existing")
1572 return True
1573
1574 taskdep = self.rqdata.dataCache.task_deps[fn]
1575 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1576 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1577 noexec=True)
1578 bb.event.fire(startevent, self.cfgData)
1579 self.runq_running[task] = 1
1580 self.stats.taskActive()
1581 if not self.cooker.configuration.dry_run:
1582 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1583 self.task_complete(task)
1584 return True
1585 else:
1586 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1587 bb.event.fire(startevent, self.cfgData)
1588
1589 taskdepdata = self.build_taskdepdata(task)
1590
1591 taskdep = self.rqdata.dataCache.task_deps[fn]
1592 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
1593 if not self.rq.fakeworker:
1594 try:
1595 self.rq.start_fakeworker(self)
1596 except OSError as exc:
1597 logger.critical("Failed to spawn fakeroot worker to run %s:%s: %s" % (fn, taskname, str(exc)))
1598 self.rq.state = runQueueFailed
1599 return True
1600 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1601 self.rq.fakeworker.stdin.flush()
1602 else:
1603 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1604 self.rq.worker.stdin.flush()
1605
1606 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1607 self.build_stamps2.append(self.build_stamps[task])
1608 self.runq_running[task] = 1
1609 self.stats.taskActive()
1610 if self.stats.active < self.number_tasks:
1611 return True
1612
1613 if self.stats.active > 0:
1614 self.rq.read_workers()
1615 return self.rq.active_fds()
1616
1617 if len(self.failed_fnids) != 0:
1618 self.rq.state = runQueueFailed
1619 return True
1620
1621 # Sanity Checks
1622 for task in xrange(self.stats.total):
1623 if self.runq_buildable[task] == 0:
1624 logger.error("Task %s never buildable!", task)
1625 if self.runq_running[task] == 0:
1626 logger.error("Task %s never ran!", task)
1627 if self.runq_complete[task] == 0:
1628 logger.error("Task %s never completed!", task)
1629 self.rq.state = runQueueComplete
1630
1631 return True
1632
1633 def build_taskdepdata(self, task):
1634 taskdepdata = {}
1635 next = self.rqdata.runq_depends[task]
1636 next.add(task)
1637 while next:
1638 additional = []
1639 for revdep in next:
1640 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1641 pn = self.rqdata.dataCache.pkg_fn[fn]
1642 taskname = self.rqdata.runq_task[revdep]
1643 deps = self.rqdata.runq_depends[revdep]
1644 provides = self.rqdata.dataCache.fn_provides[fn]
1645 taskdepdata[revdep] = [pn, taskname, fn, deps, provides]
1646 for revdep2 in deps:
1647 if revdep2 not in taskdepdata:
1648 additional.append(revdep2)
1649 next = additional
1650
1651 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
1652 return taskdepdata
1653
1654class RunQueueExecuteScenequeue(RunQueueExecute):
1655 def __init__(self, rq):
1656 RunQueueExecute.__init__(self, rq)
1657
1658 self.scenequeue_covered = set()
1659 self.scenequeue_notcovered = set()
1660 self.scenequeue_notneeded = set()
1661
1662 # If we don't have any setscene functions, skip this step
1663 if len(self.rqdata.runq_setscene) == 0:
1664 rq.scenequeue_covered = set()
1665 rq.state = runQueueRunInit
1666 return
1667
1668 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1669
1670 sq_revdeps = []
1671 sq_revdeps_new = []
1672 sq_revdeps_squash = []
1673 self.sq_harddeps = {}
1674
1675 # We need to construct a dependency graph for the setscene functions. Intermediate
1676 # dependencies between the setscene tasks only complicate the code. This code
1677 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1678 # only containing the setscene functions.
1679
1680 for task in xrange(self.stats.total):
1681 self.runq_running.append(0)
1682 self.runq_complete.append(0)
1683 self.runq_buildable.append(0)
1684
1685 # First process the chains up to the first setscene task.
1686 endpoints = {}
1687 for task in xrange(len(self.rqdata.runq_fnid)):
1688 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1689 sq_revdeps_new.append(set())
1690 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1691 endpoints[task] = set()
1692
1693 # Secondly process the chains between setscene tasks.
1694 for task in self.rqdata.runq_setscene:
1695 for dep in self.rqdata.runq_depends[task]:
1696 if dep not in endpoints:
1697 endpoints[dep] = set()
1698 endpoints[dep].add(task)
1699
1700 def process_endpoints(endpoints):
1701 newendpoints = {}
1702 for point, task in endpoints.items():
1703 tasks = set()
1704 if task:
1705 tasks |= task
1706 if sq_revdeps_new[point]:
1707 tasks |= sq_revdeps_new[point]
1708 sq_revdeps_new[point] = set()
1709 if point in self.rqdata.runq_setscene:
1710 sq_revdeps_new[point] = tasks
1711 for dep in self.rqdata.runq_depends[point]:
1712 if point in sq_revdeps[dep]:
1713 sq_revdeps[dep].remove(point)
1714 if tasks:
1715 sq_revdeps_new[dep] |= tasks
1716 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1717 newendpoints[dep] = task
1718 if len(newendpoints) != 0:
1719 process_endpoints(newendpoints)
1720
1721 process_endpoints(endpoints)
1722
1723 # Build a list of setscene tasks which are "unskippable"
1724 # These are direct endpoints referenced by the build
1725 endpoints2 = {}
1726 sq_revdeps2 = []
1727 sq_revdeps_new2 = []
1728 def process_endpoints2(endpoints):
1729 newendpoints = {}
1730 for point, task in endpoints.items():
1731 tasks = set([point])
1732 if task:
1733 tasks |= task
1734 if sq_revdeps_new2[point]:
1735 tasks |= sq_revdeps_new2[point]
1736 sq_revdeps_new2[point] = set()
1737 if point in self.rqdata.runq_setscene:
1738 sq_revdeps_new2[point] = tasks
1739 for dep in self.rqdata.runq_depends[point]:
1740 if point in sq_revdeps2[dep]:
1741 sq_revdeps2[dep].remove(point)
1742 if tasks:
1743 sq_revdeps_new2[dep] |= tasks
1744 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1745 newendpoints[dep] = tasks
1746 if len(newendpoints) != 0:
1747 process_endpoints2(newendpoints)
1748 for task in xrange(len(self.rqdata.runq_fnid)):
1749 sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
1750 sq_revdeps_new2.append(set())
1751 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1752 endpoints2[task] = set()
1753 process_endpoints2(endpoints2)
1754 self.unskippable = []
1755 for task in self.rqdata.runq_setscene:
1756 if sq_revdeps_new2[task]:
1757 self.unskippable.append(self.rqdata.runq_setscene.index(task))
1758
1759 for task in xrange(len(self.rqdata.runq_fnid)):
1760 if task in self.rqdata.runq_setscene:
1761 deps = set()
1762 for dep in sq_revdeps_new[task]:
1763 deps.add(self.rqdata.runq_setscene.index(dep))
1764 sq_revdeps_squash.append(deps)
1765 elif len(sq_revdeps_new[task]) != 0:
1766 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1767
1768 # Resolve setscene inter-task dependencies
1769 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
1770 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
1771 for task in self.rqdata.runq_setscene:
1772 realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
1773 idepends = self.rqdata.taskData.tasks_idepends[realid]
1774 for (depid, idependtask) in idepends:
1775 if depid not in self.rqdata.taskData.build_targets:
1776 continue
1777
1778 depdata = self.rqdata.taskData.build_targets[depid][0]
1779 if depdata is None:
1780 continue
1781 dep = self.rqdata.taskData.fn_index[depdata]
1782 taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
1783 if taskid is None:
1784 bb.msg.fatal("RunQueue", "Task %s_setscene depends upon non-existent task %s:%s" % (self.rqdata.get_user_idstring(task), dep, idependtask))
1785
1786 if not self.rqdata.runq_setscene.index(taskid) in self.sq_harddeps:
1787 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)] = set()
1788 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)].add(self.rqdata.runq_setscene.index(task))
1789
1790 sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
1791 # Have to zero this to avoid circular dependencies
1792 sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
1793
1794 for task in self.sq_harddeps:
1795 for dep in self.sq_harddeps[task]:
1796 sq_revdeps_squash[dep].add(task)
1797
1798 #for task in xrange(len(sq_revdeps_squash)):
1799 # realtask = self.rqdata.runq_setscene[task]
1800 # bb.warn("Task %s: %s_setscene is %s " % (task, self.rqdata.get_user_idstring(realtask) , sq_revdeps_squash[task]))
1801
1802 self.sq_deps = []
1803 self.sq_revdeps = sq_revdeps_squash
1804 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1805
1806 for task in xrange(len(self.sq_revdeps)):
1807 self.sq_deps.append(set())
1808 for task in xrange(len(self.sq_revdeps)):
1809 for dep in self.sq_revdeps[task]:
1810 self.sq_deps[dep].add(task)
1811
1812 for task in xrange(len(self.sq_revdeps)):
1813 if len(self.sq_revdeps[task]) == 0:
1814 self.runq_buildable[task] = 1
1815
1816 self.outrightfail = []
1817 if self.rq.hashvalidate:
1818 sq_hash = []
1819 sq_hashfn = []
1820 sq_fn = []
1821 sq_taskname = []
1822 sq_task = []
1823 noexec = []
1824 stamppresent = []
1825 for task in xrange(len(self.sq_revdeps)):
1826 realtask = self.rqdata.runq_setscene[task]
1827 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1828 taskname = self.rqdata.runq_task[realtask]
1829 taskdep = self.rqdata.dataCache.task_deps[fn]
1830
1831 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1832 noexec.append(task)
1833 self.task_skip(task)
1834 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1835 continue
1836
1837 if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
1838 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1839 stamppresent.append(task)
1840 self.task_skip(task)
1841 continue
1842
1843 if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
1844 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1845 stamppresent.append(task)
1846 self.task_skip(task)
1847 continue
1848
1849 sq_fn.append(fn)
1850 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1851 sq_hash.append(self.rqdata.runq_hash[realtask])
1852 sq_taskname.append(taskname)
1853 sq_task.append(task)
1854 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1855 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.expanded_data }
1856 valid = bb.utils.better_eval(call, locs)
1857
1858 valid_new = stamppresent
1859 for v in valid:
1860 valid_new.append(sq_task[v])
1861
1862 for task in xrange(len(self.sq_revdeps)):
1863 if task not in valid_new and task not in noexec:
1864 realtask = self.rqdata.runq_setscene[task]
1865 logger.debug(2, 'No package found, so skipping setscene task %s',
1866 self.rqdata.get_user_idstring(realtask))
1867 self.outrightfail.append(task)
1868
1869 logger.info('Executing SetScene Tasks')
1870
1871 self.rq.state = runQueueSceneRun
1872
1873 def scenequeue_updatecounters(self, task, fail = False):
1874 for dep in self.sq_deps[task]:
1875 if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]:
1876 realtask = self.rqdata.runq_setscene[task]
1877 realdep = self.rqdata.runq_setscene[dep]
1878 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (self.rqdata.get_user_idstring(realtask), self.rqdata.get_user_idstring(realdep)))
1879 self.scenequeue_updatecounters(dep, fail)
1880 continue
1881 if task not in self.sq_revdeps2[dep]:
1882 # May already have been removed by the fail case above
1883 continue
1884 self.sq_revdeps2[dep].remove(task)
1885 if len(self.sq_revdeps2[dep]) == 0:
1886 self.runq_buildable[dep] = 1
1887
1888 def task_completeoutright(self, task):
1889 """
1890 Mark a task as completed
1891 Look at the reverse dependencies and mark any task with
1892 completed dependencies as buildable
1893 """
1894
1895 index = self.rqdata.runq_setscene[task]
1896 logger.debug(1, 'Found task %s which could be accelerated',
1897 self.rqdata.get_user_idstring(index))
1898
1899 self.scenequeue_covered.add(task)
1900 self.scenequeue_updatecounters(task)
1901
1902 def task_complete(self, task):
1903 self.stats.taskCompleted()
1904 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1905 self.task_completeoutright(task)
1906
1907 def task_fail(self, task, result):
1908 self.stats.taskFailed()
1909 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1910 self.scenequeue_notcovered.add(task)
1911 self.scenequeue_updatecounters(task, True)
1912
1913 def task_failoutright(self, task):
1914 self.runq_running[task] = 1
1915 self.runq_buildable[task] = 1
1916 self.stats.taskCompleted()
1917 self.stats.taskSkipped()
1918 index = self.rqdata.runq_setscene[task]
1919 self.scenequeue_notcovered.add(task)
1920 self.scenequeue_updatecounters(task, True)
1921
1922 def task_skip(self, task):
1923 self.runq_running[task] = 1
1924 self.runq_buildable[task] = 1
1925 self.task_completeoutright(task)
1926 self.stats.taskCompleted()
1927 self.stats.taskSkipped()
1928
1929 def execute(self):
1930 """
1931 Run the tasks in a queue prepared by prepare_runqueue
1932 """
1933
1934 self.rq.read_workers()
1935
1936 task = None
1937 if self.stats.active < self.number_tasks:
1938 # Find the next setscene to run
1939 for nexttask in xrange(self.stats.total):
1940 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1941 if nexttask in self.unskippable:
1942 logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1943 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
1944 realtask = self.rqdata.runq_setscene[nexttask]
1945 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1946 foundtarget = False
1947 for target in self.rqdata.target_pairs:
1948 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1949 foundtarget = True
1950 break
1951 if not foundtarget:
1952 logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1953 self.task_skip(nexttask)
1954 self.scenequeue_notneeded.add(nexttask)
1955 return True
1956 if nexttask in self.outrightfail:
1957 self.task_failoutright(nexttask)
1958 return True
1959 task = nexttask
1960 break
1961 if task is not None:
1962 realtask = self.rqdata.runq_setscene[task]
1963 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1964
1965 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1966 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
1967 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1968 task, self.rqdata.get_user_idstring(realtask))
1969 self.task_failoutright(task)
1970 return True
1971
1972 if self.cooker.configuration.force:
1973 for target in self.rqdata.target_pairs:
1974 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1975 self.task_failoutright(task)
1976 return True
1977
1978 if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
1979 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1980 task, self.rqdata.get_user_idstring(realtask))
1981 self.task_skip(task)
1982 return True
1983
1984 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1985 bb.event.fire(startevent, self.cfgData)
1986
1987 taskdep = self.rqdata.dataCache.task_deps[fn]
1988 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1989 if not self.rq.fakeworker:
1990 self.rq.start_fakeworker(self)
1991 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1992 self.rq.fakeworker.stdin.flush()
1993 else:
1994 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1995 self.rq.worker.stdin.flush()
1996
1997 self.runq_running[task] = 1
1998 self.stats.taskActive()
1999 if self.stats.active < self.number_tasks:
2000 return True
2001
2002 if self.stats.active > 0:
2003 self.rq.read_workers()
2004 return self.rq.active_fds()
2005
2006 #for task in xrange(self.stats.total):
2007 # if self.runq_running[task] != 1:
2008 # buildable = self.runq_buildable[task]
2009 # revdeps = self.sq_revdeps[task]
2010 # bb.warn("Found we didn't run %s %s %s %s" % (task, buildable, str(revdeps), self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
2011
2012 # Convert scenequeue_covered task numbers into full taskgraph ids
2013 oldcovered = self.scenequeue_covered
2014 self.rq.scenequeue_covered = set()
2015 for task in oldcovered:
2016 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
2017 self.rq.scenequeue_notcovered = set()
2018 for task in self.scenequeue_notcovered:
2019 self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task])
2020
2021 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
2022
2023 self.rq.state = runQueueRunInit
2024
2025 completeevent = sceneQueueComplete(self.stats, self.rq)
2026 bb.event.fire(completeevent, self.cfgData)
2027
2028 return True
2029
2030 def runqueue_process_waitpid(self, task, status):
2031 task = self.rq.rqdata.runq_setscene.index(task)
2032
2033 RunQueueExecute.runqueue_process_waitpid(self, task, status)
2034
2035class TaskFailure(Exception):
2036 """
2037 Exception raised when a task in a runqueue fails
2038 """
2039 def __init__(self, x):
2040 self.args = x
2041
2042
2043class runQueueExitWait(bb.event.Event):
2044 """
2045 Event when waiting for task processes to exit
2046 """
2047
2048 def __init__(self, remain):
2049 self.remain = remain
2050 self.message = "Waiting for %s active tasks to finish" % remain
2051 bb.event.Event.__init__(self)
2052
2053class runQueueEvent(bb.event.Event):
2054 """
2055 Base runQueue event class
2056 """
2057 def __init__(self, task, stats, rq):
2058 self.taskid = task
2059 self.taskstring = rq.rqdata.get_user_idstring(task)
2060 self.taskname = rq.rqdata.get_task_name(task)
2061 self.taskfile = rq.rqdata.get_task_file(task)
2062 self.taskhash = rq.rqdata.get_task_hash(task)
2063 self.stats = stats.copy()
2064 bb.event.Event.__init__(self)
2065
2066class sceneQueueEvent(runQueueEvent):
2067 """
2068 Base sceneQueue event class
2069 """
2070 def __init__(self, task, stats, rq, noexec=False):
2071 runQueueEvent.__init__(self, task, stats, rq)
2072 realtask = rq.rqdata.runq_setscene[task]
2073 self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
2074 self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene"
2075 self.taskfile = rq.rqdata.get_task_file(realtask)
2076 self.taskhash = rq.rqdata.get_task_hash(realtask)
2077
2078class runQueueTaskStarted(runQueueEvent):
2079 """
2080 Event notifying a task was started
2081 """
2082 def __init__(self, task, stats, rq, noexec=False):
2083 runQueueEvent.__init__(self, task, stats, rq)
2084 self.noexec = noexec
2085
2086class sceneQueueTaskStarted(sceneQueueEvent):
2087 """
2088 Event notifying a setscene task was started
2089 """
2090 def __init__(self, task, stats, rq, noexec=False):
2091 sceneQueueEvent.__init__(self, task, stats, rq)
2092 self.noexec = noexec
2093
2094class runQueueTaskFailed(runQueueEvent):
2095 """
2096 Event notifying a task failed
2097 """
2098 def __init__(self, task, stats, exitcode, rq):
2099 runQueueEvent.__init__(self, task, stats, rq)
2100 self.exitcode = exitcode
2101
2102class sceneQueueTaskFailed(sceneQueueEvent):
2103 """
2104 Event notifying a setscene task failed
2105 """
2106 def __init__(self, task, stats, exitcode, rq):
2107 sceneQueueEvent.__init__(self, task, stats, rq)
2108 self.exitcode = exitcode
2109
2110class sceneQueueComplete(sceneQueueEvent):
2111 """
2112 Event when all the sceneQueue tasks are complete
2113 """
2114 def __init__(self, stats, rq):
2115 self.stats = stats.copy()
2116 bb.event.Event.__init__(self)
2117
2118class runQueueTaskCompleted(runQueueEvent):
2119 """
2120 Event notifying a task completed
2121 """
2122
2123class sceneQueueTaskCompleted(sceneQueueEvent):
2124 """
2125 Event notifying a setscene task completed
2126 """
2127
2128class runQueueTaskSkipped(runQueueEvent):
2129 """
2130 Event notifying a task was skipped
2131 """
2132 def __init__(self, task, stats, rq, reason):
2133 runQueueEvent.__init__(self, task, stats, rq)
2134 self.reason = reason
2135
2136class runQueuePipe():
2137 """
2138 Abstraction for a pipe between a worker thread and the server
2139 """
2140 def __init__(self, pipein, pipeout, d, rq, rqexec):
2141 self.input = pipein
2142 if pipeout:
2143 pipeout.close()
2144 bb.utils.nonblockingfd(self.input)
2145 self.queue = ""
2146 self.d = d
2147 self.rq = rq
2148 self.rqexec = rqexec
2149
2150 def setrunqueueexec(self, rqexec):
2151 self.rqexec = rqexec
2152
2153 def read(self):
2154 for w in [self.rq.worker, self.rq.fakeworker]:
2155 if not w:
2156 continue
2157 w.poll()
2158 if w.returncode is not None and not self.rq.teardown:
2159 name = None
2160 if self.rq.worker and w.pid == self.rq.worker.pid:
2161 name = "Worker"
2162 elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
2163 name = "Fakeroot"
2164 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
2165 self.rq.finish_runqueue(True)
2166
2167 start = len(self.queue)
2168 try:
2169 self.queue = self.queue + self.input.read(102400)
2170 except (OSError, IOError) as e:
2171 if e.errno != errno.EAGAIN:
2172 raise
2173 end = len(self.queue)
2174 found = True
2175 while found and len(self.queue):
2176 found = False
2177 index = self.queue.find("</event>")
2178 while index != -1 and self.queue.startswith("<event>"):
2179 try:
2180 event = pickle.loads(self.queue[7:index])
2181 except ValueError as e:
2182 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2183 bb.event.fire_from_worker(event, self.d)
2184 found = True
2185 self.queue = self.queue[index+8:]
2186 index = self.queue.find("</event>")
2187 index = self.queue.find("</exitcode>")
2188 while index != -1 and self.queue.startswith("<exitcode>"):
2189 try:
2190 task, status = pickle.loads(self.queue[10:index])
2191 except ValueError as e:
2192 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
2193 self.rqexec.runqueue_process_waitpid(task, status)
2194 found = True
2195 self.queue = self.queue[index+11:]
2196 index = self.queue.find("</exitcode>")
2197 return (end > start)
2198
2199 def close(self):
2200 while self.read():
2201 continue
2202 if len(self.queue) > 0:
2203 print("Warning, worker left partial message: %s" % self.queue)
2204 self.input.close()