blob: e1b9b2e66180fa1a6b8456b5a8e671045a62ddb1 [file] [log] [blame]
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import re
34import bb
35from bb import msg, data, event
36from bb import monitordisk
37import subprocess
38
39try:
40 import cPickle as pickle
41except ImportError:
42 import pickle
43
44bblogger = logging.getLogger("BitBake")
45logger = logging.getLogger("BitBake.RunQueue")
46
47__find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' )
48
49class RunQueueStats:
50 """
51 Holds statistics on the tasks handled by the associated runQueue
52 """
53 def __init__(self, total):
54 self.completed = 0
55 self.skipped = 0
56 self.failed = 0
57 self.active = 0
58 self.total = total
59
60 def copy(self):
61 obj = self.__class__(self.total)
62 obj.__dict__.update(self.__dict__)
63 return obj
64
65 def taskFailed(self):
66 self.active = self.active - 1
67 self.failed = self.failed + 1
68
69 def taskCompleted(self, number = 1):
70 self.active = self.active - number
71 self.completed = self.completed + number
72
73 def taskSkipped(self, number = 1):
74 self.active = self.active + number
75 self.skipped = self.skipped + number
76
77 def taskActive(self):
78 self.active = self.active + 1
79
80# These values indicate the next step due to be run in the
81# runQueue state machine
82runQueuePrepare = 2
83runQueueSceneInit = 3
84runQueueSceneRun = 4
85runQueueRunInit = 5
86runQueueRunning = 6
87runQueueFailed = 7
88runQueueCleanUp = 8
89runQueueComplete = 9
90
91class RunQueueScheduler(object):
92 """
93 Control the order tasks are scheduled in.
94 """
95 name = "basic"
96
97 def __init__(self, runqueue, rqdata):
98 """
99 The default scheduler just returns the first buildable task (the
100 priority map is sorted by task number)
101 """
102 self.rq = runqueue
103 self.rqdata = rqdata
104 self.numTasks = len(self.rqdata.runq_fnid)
105
106 self.prio_map = []
107 self.prio_map.extend(range(self.numTasks))
108
109 self.buildable = []
110 self.stamps = {}
111 for taskid in xrange(self.numTasks):
112 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
113 taskname = self.rqdata.runq_task[taskid]
114 self.stamps[taskid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
115 if self.rq.runq_buildable[taskid] == 1:
116 self.buildable.append(taskid)
117
118 self.rev_prio_map = None
119
120 def next_buildable_task(self):
121 """
122 Return the id of the first task we find that is buildable
123 """
124 self.buildable = [x for x in self.buildable if not self.rq.runq_running[x] == 1]
125 if not self.buildable:
126 return None
127 if len(self.buildable) == 1:
128 taskid = self.buildable[0]
129 stamp = self.stamps[taskid]
130 if stamp not in self.rq.build_stamps.itervalues():
131 return taskid
132
133 if not self.rev_prio_map:
134 self.rev_prio_map = range(self.numTasks)
135 for taskid in xrange(self.numTasks):
136 self.rev_prio_map[self.prio_map[taskid]] = taskid
137
138 best = None
139 bestprio = None
140 for taskid in self.buildable:
141 prio = self.rev_prio_map[taskid]
142 if bestprio is None or bestprio > prio:
143 stamp = self.stamps[taskid]
144 if stamp in self.rq.build_stamps.itervalues():
145 continue
146 bestprio = prio
147 best = taskid
148
149 return best
150
151 def next(self):
152 """
153 Return the id of the task we should build next
154 """
155 if self.rq.stats.active < self.rq.number_tasks:
156 return self.next_buildable_task()
157
158 def newbuilable(self, task):
159 self.buildable.append(task)
160
161class RunQueueSchedulerSpeed(RunQueueScheduler):
162 """
163 A scheduler optimised for speed. The priority map is sorted by task weight,
164 heavier weighted tasks (tasks needed by the most other tasks) are run first.
165 """
166 name = "speed"
167
168 def __init__(self, runqueue, rqdata):
169 """
170 The priority map is sorted by task weight.
171 """
172 RunQueueScheduler.__init__(self, runqueue, rqdata)
173
174 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
175 copyweight = copy.deepcopy(self.rqdata.runq_weight)
176 self.prio_map = []
177
178 for weight in sortweight:
179 idx = copyweight.index(weight)
180 self.prio_map.append(idx)
181 copyweight[idx] = -1
182
183 self.prio_map.reverse()
184
185class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
186 """
187 A scheduler optimised to complete .bb files are quickly as possible. The
188 priority map is sorted by task weight, but then reordered so once a given
189 .bb file starts to build, it's completed as quickly as possible. This works
190 well where disk space is at a premium and classes like OE's rm_work are in
191 force.
192 """
193 name = "completion"
194
195 def __init__(self, runqueue, rqdata):
196 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
197
198 #FIXME - whilst this groups all fnids together it does not reorder the
199 #fnid groups optimally.
200
201 basemap = copy.deepcopy(self.prio_map)
202 self.prio_map = []
203 while (len(basemap) > 0):
204 entry = basemap.pop(0)
205 self.prio_map.append(entry)
206 fnid = self.rqdata.runq_fnid[entry]
207 todel = []
208 for entry in basemap:
209 entry_fnid = self.rqdata.runq_fnid[entry]
210 if entry_fnid == fnid:
211 todel.append(basemap.index(entry))
212 self.prio_map.append(entry)
213 todel.reverse()
214 for idx in todel:
215 del basemap[idx]
216
217class RunQueueData:
218 """
219 BitBake Run Queue implementation
220 """
221 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
222 self.cooker = cooker
223 self.dataCache = dataCache
224 self.taskData = taskData
225 self.targets = targets
226 self.rq = rq
227 self.warn_multi_bb = False
228
229 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
230 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
231
232 self.reset()
233
234 def reset(self):
235 self.runq_fnid = []
236 self.runq_task = []
237 self.runq_depends = []
238 self.runq_revdeps = []
239 self.runq_hash = []
240
241 def runq_depends_names(self, ids):
242 import re
243 ret = []
244 for id in self.runq_depends[ids]:
245 nam = os.path.basename(self.get_user_idstring(id))
246 nam = re.sub("_[^,]*,", ",", nam)
247 ret.extend([nam])
248 return ret
249
250 def get_task_name(self, task):
251 return self.runq_task[task]
252
253 def get_task_file(self, task):
254 return self.taskData.fn_index[self.runq_fnid[task]]
255
256 def get_task_hash(self, task):
257 return self.runq_hash[task]
258
259 def get_user_idstring(self, task, task_name_suffix = ""):
260 fn = self.taskData.fn_index[self.runq_fnid[task]]
261 taskname = self.runq_task[task] + task_name_suffix
262 return "%s, %s" % (fn, taskname)
263
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500264 def get_short_user_idstring(self, task, task_name_suffix = ""):
265 fn = self.taskData.fn_index[self.runq_fnid[task]]
266 pn = self.dataCache.pkg_fn[fn]
267 taskname = self.runq_task[task] + task_name_suffix
268 return "%s:%s" % (pn, taskname)
269
270
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500271 def get_task_id(self, fnid, taskname):
272 for listid in xrange(len(self.runq_fnid)):
273 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
274 return listid
275 return None
276
277 def circular_depchains_handler(self, tasks):
278 """
279 Some tasks aren't buildable, likely due to circular dependency issues.
280 Identify the circular dependencies and print them in a user readable format.
281 """
282 from copy import deepcopy
283
284 valid_chains = []
285 explored_deps = {}
286 msgs = []
287
288 def chain_reorder(chain):
289 """
290 Reorder a dependency chain so the lowest task id is first
291 """
292 lowest = 0
293 new_chain = []
294 for entry in xrange(len(chain)):
295 if chain[entry] < chain[lowest]:
296 lowest = entry
297 new_chain.extend(chain[lowest:])
298 new_chain.extend(chain[:lowest])
299 return new_chain
300
301 def chain_compare_equal(chain1, chain2):
302 """
303 Compare two dependency chains and see if they're the same
304 """
305 if len(chain1) != len(chain2):
306 return False
307 for index in xrange(len(chain1)):
308 if chain1[index] != chain2[index]:
309 return False
310 return True
311
312 def chain_array_contains(chain, chain_array):
313 """
314 Return True if chain_array contains chain
315 """
316 for ch in chain_array:
317 if chain_compare_equal(ch, chain):
318 return True
319 return False
320
321 def find_chains(taskid, prev_chain):
322 prev_chain.append(taskid)
323 total_deps = []
324 total_deps.extend(self.runq_revdeps[taskid])
325 for revdep in self.runq_revdeps[taskid]:
326 if revdep in prev_chain:
327 idx = prev_chain.index(revdep)
328 # To prevent duplicates, reorder the chain to start with the lowest taskid
329 # and search through an array of those we've already printed
330 chain = prev_chain[idx:]
331 new_chain = chain_reorder(chain)
332 if not chain_array_contains(new_chain, valid_chains):
333 valid_chains.append(new_chain)
334 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
335 for dep in new_chain:
336 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
337 msgs.append("\n")
338 if len(valid_chains) > 10:
339 msgs.append("Aborted dependency loops search after 10 matches.\n")
340 return msgs
341 continue
342 scan = False
343 if revdep not in explored_deps:
344 scan = True
345 elif revdep in explored_deps[revdep]:
346 scan = True
347 else:
348 for dep in prev_chain:
349 if dep in explored_deps[revdep]:
350 scan = True
351 if scan:
352 find_chains(revdep, copy.deepcopy(prev_chain))
353 for dep in explored_deps[revdep]:
354 if dep not in total_deps:
355 total_deps.append(dep)
356
357 explored_deps[taskid] = total_deps
358
359 for task in tasks:
360 find_chains(task, [])
361
362 return msgs
363
364 def calculate_task_weights(self, endpoints):
365 """
366 Calculate a number representing the "weight" of each task. Heavier weighted tasks
367 have more dependencies and hence should be executed sooner for maximum speed.
368
369 This function also sanity checks the task list finding tasks that are not
370 possible to execute due to circular dependencies.
371 """
372
373 numTasks = len(self.runq_fnid)
374 weight = []
375 deps_left = []
376 task_done = []
377
378 for listid in xrange(numTasks):
379 task_done.append(False)
380 weight.append(1)
381 deps_left.append(len(self.runq_revdeps[listid]))
382
383 for listid in endpoints:
384 weight[listid] = 10
385 task_done[listid] = True
386
387 while True:
388 next_points = []
389 for listid in endpoints:
390 for revdep in self.runq_depends[listid]:
391 weight[revdep] = weight[revdep] + weight[listid]
392 deps_left[revdep] = deps_left[revdep] - 1
393 if deps_left[revdep] == 0:
394 next_points.append(revdep)
395 task_done[revdep] = True
396 endpoints = next_points
397 if len(next_points) == 0:
398 break
399
400 # Circular dependency sanity check
401 problem_tasks = []
402 for task in xrange(numTasks):
403 if task_done[task] is False or deps_left[task] != 0:
404 problem_tasks.append(task)
405 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
406 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
407
408 if problem_tasks:
409 message = "Unbuildable tasks were found.\n"
410 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
411 message = message + "Identifying dependency loops (this may take a short while)...\n"
412 logger.error(message)
413
414 msgs = self.circular_depchains_handler(problem_tasks)
415
416 message = "\n"
417 for msg in msgs:
418 message = message + msg
419 bb.msg.fatal("RunQueue", message)
420
421 return weight
422
423 def prepare(self):
424 """
425 Turn a set of taskData into a RunQueue and compute data needed
426 to optimise the execution order.
427 """
428
429 runq_build = []
430 recursivetasks = {}
431 recursiveitasks = {}
432 recursivetasksselfref = set()
433
434 taskData = self.taskData
435
436 if len(taskData.tasks_name) == 0:
437 # Nothing to do
438 return 0
439
440 logger.info("Preparing RunQueue")
441
442 # Step A - Work out a list of tasks to run
443 #
444 # Taskdata gives us a list of possible providers for every build and run
445 # target ordered by priority. It also gives information on each of those
446 # providers.
447 #
448 # To create the actual list of tasks to execute we fix the list of
449 # providers and then resolve the dependencies into task IDs. This
450 # process is repeated for each type of dependency (tdepends, deptask,
451 # rdeptast, recrdeptask, idepends).
452
453 def add_build_dependencies(depids, tasknames, depends):
454 for depid in depids:
455 # Won't be in build_targets if ASSUME_PROVIDED
456 if depid not in taskData.build_targets:
457 continue
458 depdata = taskData.build_targets[depid][0]
459 if depdata is None:
460 continue
461 for taskname in tasknames:
462 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
463 if taskid is not None:
464 depends.add(taskid)
465
466 def add_runtime_dependencies(depids, tasknames, depends):
467 for depid in depids:
468 if depid not in taskData.run_targets:
469 continue
470 depdata = taskData.run_targets[depid][0]
471 if depdata is None:
472 continue
473 for taskname in tasknames:
474 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
475 if taskid is not None:
476 depends.add(taskid)
477
478 def add_resolved_dependencies(depids, tasknames, depends):
479 for depid in depids:
480 for taskname in tasknames:
481 taskid = taskData.gettask_id_fromfnid(depid, taskname)
482 if taskid is not None:
483 depends.add(taskid)
484
485 for task in xrange(len(taskData.tasks_name)):
486 depends = set()
487 fnid = taskData.tasks_fnid[task]
488 fn = taskData.fn_index[fnid]
489 task_deps = self.dataCache.task_deps[fn]
490
491 #logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
492
493 if fnid not in taskData.failed_fnids:
494
495 # Resolve task internal dependencies
496 #
497 # e.g. addtask before X after Y
498 depends = set(taskData.tasks_tdepends[task])
499
500 # Resolve 'deptask' dependencies
501 #
502 # e.g. do_sometask[deptask] = "do_someothertask"
503 # (makes sure sometask runs after someothertask of all DEPENDS)
504 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
505 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
506 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
507
508 # Resolve 'rdeptask' dependencies
509 #
510 # e.g. do_sometask[rdeptask] = "do_someothertask"
511 # (makes sure sometask runs after someothertask of all RDEPENDS)
512 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
513 tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
514 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
515
516 # Resolve inter-task dependencies
517 #
518 # e.g. do_sometask[depends] = "targetname:do_someothertask"
519 # (makes sure sometask runs after targetname's someothertask)
520 idepends = taskData.tasks_idepends[task]
521 for (depid, idependtask) in idepends:
522 if depid in taskData.build_targets and not depid in taskData.failed_deps:
523 # Won't be in build_targets if ASSUME_PROVIDED
524 depdata = taskData.build_targets[depid][0]
525 if depdata is not None:
526 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
527 if taskid is None:
528 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
529 depends.add(taskid)
530 irdepends = taskData.tasks_irdepends[task]
531 for (depid, idependtask) in irdepends:
532 if depid in taskData.run_targets:
533 # Won't be in run_targets if ASSUME_PROVIDED
534 depdata = taskData.run_targets[depid][0]
535 if depdata is not None:
536 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
537 if taskid is None:
538 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
539 depends.add(taskid)
540
541 # Resolve recursive 'recrdeptask' dependencies (Part A)
542 #
543 # e.g. do_sometask[recrdeptask] = "do_someothertask"
544 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
545 # We cover the recursive part of the dependencies below
546 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
547 tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
548 recursivetasks[task] = tasknames
549 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
550 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
551 if taskData.tasks_name[task] in tasknames:
552 recursivetasksselfref.add(task)
553
554 if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']:
555 recursiveitasks[task] = []
556 for t in task_deps['recideptask'][taskData.tasks_name[task]].split():
557 newdep = taskData.gettask_id_fromfnid(fnid, t)
558 recursiveitasks[task].append(newdep)
559
560 self.runq_fnid.append(taskData.tasks_fnid[task])
561 self.runq_task.append(taskData.tasks_name[task])
562 self.runq_depends.append(depends)
563 self.runq_revdeps.append(set())
564 self.runq_hash.append("")
565
566 runq_build.append(0)
567
568 # Resolve recursive 'recrdeptask' dependencies (Part B)
569 #
570 # e.g. do_sometask[recrdeptask] = "do_someothertask"
571 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
572 # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
573 extradeps = {}
574 for task in recursivetasks:
575 extradeps[task] = set(self.runq_depends[task])
576 tasknames = recursivetasks[task]
577 seendeps = set()
578 seenfnid = []
579
580 def generate_recdeps(t):
581 newdeps = set()
582 add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
583 extradeps[task].update(newdeps)
584 seendeps.add(t)
585 newdeps.add(t)
586 for i in newdeps:
587 for n in self.runq_depends[i]:
588 if n not in seendeps:
589 generate_recdeps(n)
590 generate_recdeps(task)
591
592 if task in recursiveitasks:
593 for dep in recursiveitasks[task]:
594 generate_recdeps(dep)
595
596 # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
597 for task in recursivetasks:
598 extradeps[task].difference_update(recursivetasksselfref)
599
600 for task in xrange(len(taskData.tasks_name)):
601 # Add in extra dependencies
602 if task in extradeps:
603 self.runq_depends[task] = extradeps[task]
604 # Remove all self references
605 if task in self.runq_depends[task]:
606 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
607 self.runq_depends[task].remove(task)
608
609 # Step B - Mark all active tasks
610 #
611 # Start with the tasks we were asked to run and mark all dependencies
612 # as active too. If the task is to be 'forced', clear its stamp. Once
613 # all active tasks are marked, prune the ones we don't need.
614
615 logger.verbose("Marking Active Tasks")
616
617 def mark_active(listid, depth):
618 """
619 Mark an item as active along with its depends
620 (calls itself recursively)
621 """
622
623 if runq_build[listid] == 1:
624 return
625
626 runq_build[listid] = 1
627
628 depends = self.runq_depends[listid]
629 for depend in depends:
630 mark_active(depend, depth+1)
631
632 self.target_pairs = []
633 for target in self.targets:
634 targetid = taskData.getbuild_id(target[0])
635
636 if targetid not in taskData.build_targets:
637 continue
638
639 if targetid in taskData.failed_deps:
640 continue
641
642 fnid = taskData.build_targets[targetid][0]
643 fn = taskData.fn_index[fnid]
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500644 task = target[1]
645 parents = False
646 if task.endswith('-'):
647 parents = True
648 task = task[:-1]
649
650 self.target_pairs.append((fn, task))
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500651
652 if fnid in taskData.failed_fnids:
653 continue
654
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500655 if task not in taskData.tasks_lookup[fnid]:
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500656 import difflib
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500657 close_matches = difflib.get_close_matches(task, taskData.tasks_lookup[fnid], cutoff=0.7)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500658 if close_matches:
659 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
660 else:
661 extra = ""
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500662 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (task, target[0], extra))
663
664 # For tasks called "XXXX-", ony run their dependencies
665 listid = taskData.tasks_lookup[fnid][task]
666 if parents:
667 for i in self.runq_depends[listid]:
668 mark_active(i, 1)
669 else:
670 mark_active(listid, 1)
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500671
672 # Step C - Prune all inactive tasks
673 #
674 # Once all active tasks are marked, prune the ones we don't need.
675
676 maps = []
677 delcount = 0
678 for listid in xrange(len(self.runq_fnid)):
679 if runq_build[listid-delcount] == 1:
680 maps.append(listid-delcount)
681 else:
682 del self.runq_fnid[listid-delcount]
683 del self.runq_task[listid-delcount]
684 del self.runq_depends[listid-delcount]
685 del runq_build[listid-delcount]
686 del self.runq_revdeps[listid-delcount]
687 del self.runq_hash[listid-delcount]
688 delcount = delcount + 1
689 maps.append(-1)
690
691 #
692 # Step D - Sanity checks and computation
693 #
694
695 # Check to make sure we still have tasks to run
696 if len(self.runq_fnid) == 0:
697 if not taskData.abort:
698 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
699 else:
700 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
701
702 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
703
704 # Remap the dependencies to account for the deleted tasks
705 # Check we didn't delete a task we depend on
706 for listid in xrange(len(self.runq_fnid)):
707 newdeps = []
708 origdeps = self.runq_depends[listid]
709 for origdep in origdeps:
710 if maps[origdep] == -1:
711 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
712 newdeps.append(maps[origdep])
713 self.runq_depends[listid] = set(newdeps)
714
715 logger.verbose("Assign Weightings")
716
717 # Generate a list of reverse dependencies to ease future calculations
718 for listid in xrange(len(self.runq_fnid)):
719 for dep in self.runq_depends[listid]:
720 self.runq_revdeps[dep].add(listid)
721
722 # Identify tasks at the end of dependency chains
723 # Error on circular dependency loops (length two)
724 endpoints = []
725 for listid in xrange(len(self.runq_fnid)):
726 revdeps = self.runq_revdeps[listid]
727 if len(revdeps) == 0:
728 endpoints.append(listid)
729 for dep in revdeps:
730 if dep in self.runq_depends[listid]:
731 #self.dump_data(taskData)
732 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
733
734 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
735
736 # Calculate task weights
737 # Check of higher length circular dependencies
738 self.runq_weight = self.calculate_task_weights(endpoints)
739
740 # Sanity Check - Check for multiple tasks building the same provider
741 prov_list = {}
742 seen_fn = []
743 for task in xrange(len(self.runq_fnid)):
744 fn = taskData.fn_index[self.runq_fnid[task]]
745 if fn in seen_fn:
746 continue
747 seen_fn.append(fn)
748 for prov in self.dataCache.fn_provides[fn]:
749 if prov not in prov_list:
750 prov_list[prov] = [fn]
751 elif fn not in prov_list[prov]:
752 prov_list[prov].append(fn)
753 for prov in prov_list:
754 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
755 seen_pn = []
756 # If two versions of the same PN are being built its fatal, we don't support it.
757 for fn in prov_list[prov]:
758 pn = self.dataCache.pkg_fn[fn]
759 if pn not in seen_pn:
760 seen_pn.append(pn)
761 else:
762 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500763 msg = "Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))
764 #
765 # Construct a list of things which uniquely depend on each provider
766 # since this may help the user figure out which dependency is triggering this warning
767 #
768 msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from."
769 deplist = {}
770 commondeps = None
771 for provfn in prov_list[prov]:
772 deps = set()
773 for task, fnid in enumerate(self.runq_fnid):
774 fn = taskData.fn_index[fnid]
775 if fn != provfn:
776 continue
777 for dep in self.runq_revdeps[task]:
778 fn = taskData.fn_index[self.runq_fnid[dep]]
779 if fn == provfn:
780 continue
781 deps.add(self.get_short_user_idstring(dep))
782 if not commondeps:
783 commondeps = set(deps)
784 else:
785 commondeps &= deps
786 deplist[provfn] = deps
787 for provfn in deplist:
788 msg += "\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))
789 #
790 # Construct a list of provides and runtime providers for each recipe
791 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
792 #
793 msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful."
794 provide_results = {}
795 rprovide_results = {}
796 commonprovs = None
797 commonrprovs = None
798 for provfn in prov_list[prov]:
799 provides = set(self.dataCache.fn_provides[provfn])
800 rprovides = set()
801 for rprovide in self.dataCache.rproviders:
802 if provfn in self.dataCache.rproviders[rprovide]:
803 rprovides.add(rprovide)
804 for package in self.dataCache.packages:
805 if provfn in self.dataCache.packages[package]:
806 rprovides.add(package)
807 for package in self.dataCache.packages_dynamic:
808 if provfn in self.dataCache.packages_dynamic[package]:
809 rprovides.add(package)
810 if not commonprovs:
811 commonprovs = set(provides)
812 else:
813 commonprovs &= provides
814 provide_results[provfn] = provides
815 if not commonrprovs:
816 commonrprovs = set(rprovides)
817 else:
818 commonrprovs &= rprovides
819 rprovide_results[provfn] = rprovides
820 #msg += "\nCommon provides:\n %s" % ("\n ".join(commonprovs))
821 #msg += "\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))
822 for provfn in prov_list[prov]:
823 msg += "\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))
824 msg += "\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))
825
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500826 if self.warn_multi_bb:
827 logger.warn(msg)
828 else:
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500829 logger.error(msg)
830
831 # Create a whitelist usable by the stamp checks
832 stampfnwhitelist = []
833 for entry in self.stampwhitelist.split():
834 entryid = self.taskData.getbuild_id(entry)
835 if entryid not in self.taskData.build_targets:
836 continue
837 fnid = self.taskData.build_targets[entryid][0]
838 fn = self.taskData.fn_index[fnid]
839 stampfnwhitelist.append(fn)
840 self.stampfnwhitelist = stampfnwhitelist
841
842 # Iterate over the task list looking for tasks with a 'setscene' function
843 self.runq_setscene = []
844 if not self.cooker.configuration.nosetscene:
845 for task in range(len(self.runq_fnid)):
846 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
847 if not setscene:
848 continue
849 self.runq_setscene.append(task)
850
851 def invalidate_task(fn, taskname, error_nostamp):
852 taskdep = self.dataCache.task_deps[fn]
853 fnid = self.taskData.getfn_id(fn)
854 if taskname not in taskData.tasks_lookup[fnid]:
855 logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname)
856 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
857 if error_nostamp:
858 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
859 else:
860 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
861 else:
862 logger.verbose("Invalidate task %s, %s", taskname, fn)
863 bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
864
865 # Invalidate task if force mode active
866 if self.cooker.configuration.force:
867 for (fn, target) in self.target_pairs:
868 invalidate_task(fn, target, False)
869
870 # Invalidate task if invalidate mode active
871 if self.cooker.configuration.invalidate_stamp:
872 for (fn, target) in self.target_pairs:
873 for st in self.cooker.configuration.invalidate_stamp.split(','):
874 if not st.startswith("do_"):
875 st = "do_%s" % st
876 invalidate_task(fn, st, True)
877
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500878 # Create and print to the logs a virtual/xxxx -> PN (fn) table
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500879 virtmap = taskData.get_providermap(prefix="virtual/")
Patrick Williamsf1e5d692016-03-30 15:21:19 -0500880 virtpnmap = {}
881 for v in virtmap:
882 virtpnmap[v] = self.dataCache.pkg_fn[virtmap[v]]
883 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
884 if hasattr(bb.parse.siggen, "tasks_resolved"):
885 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCache)
886
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500887 # Iterate over the task list and call into the siggen code
888 dealtwith = set()
889 todeal = set(range(len(self.runq_fnid)))
890 while len(todeal) > 0:
891 for task in todeal.copy():
892 if len(self.runq_depends[task] - dealtwith) == 0:
893 dealtwith.add(task)
894 todeal.remove(task)
895 procdep = []
896 for dep in self.runq_depends[task]:
897 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
898 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
899
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500900 bb.parse.siggen.writeout_file_checksum_cache()
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500901 return len(self.runq_fnid)
902
903 def dump_data(self, taskQueue):
904 """
905 Dump some debug information on the internal data structures
906 """
907 logger.debug(3, "run_tasks:")
908 for task in xrange(len(self.rqdata.runq_task)):
909 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
910 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
911 self.rqdata.runq_task[task],
912 self.rqdata.runq_weight[task],
913 self.rqdata.runq_depends[task],
914 self.rqdata.runq_revdeps[task])
915
916 logger.debug(3, "sorted_tasks:")
917 for task1 in xrange(len(self.rqdata.runq_task)):
918 if task1 in self.prio_map:
919 task = self.prio_map[task1]
920 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
921 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
922 self.rqdata.runq_task[task],
923 self.rqdata.runq_weight[task],
924 self.rqdata.runq_depends[task],
925 self.rqdata.runq_revdeps[task])
926
927class RunQueue:
928 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
929
930 self.cooker = cooker
931 self.cfgData = cfgData
932 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
933
934 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
935 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
936 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
937 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
938
939 self.state = runQueuePrepare
940
941 # For disk space monitor
942 self.dm = monitordisk.diskMonitor(cfgData)
943
944 self.rqexe = None
945 self.worker = None
946 self.workerpipe = None
947 self.fakeworker = None
948 self.fakeworkerpipe = None
949
950 def _start_worker(self, fakeroot = False, rqexec = None):
951 logger.debug(1, "Starting bitbake-worker")
952 magic = "decafbad"
953 if self.cooker.configuration.profile:
954 magic = "decafbadbad"
955 if fakeroot:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -0500956 magic = magic + "beef"
Patrick Williamsc124f4f2015-09-15 14:41:29 -0500957 fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
958 fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
959 env = os.environ.copy()
960 for key, value in (var.split('=') for var in fakerootenv):
961 env[key] = value
962 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
963 else:
964 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
965 bb.utils.nonblockingfd(worker.stdout)
966 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec)
967
968 workerdata = {
969 "taskdeps" : self.rqdata.dataCache.task_deps,
970 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
971 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
972 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
973 "sigdata" : bb.parse.siggen.get_taskdata(),
974 "runq_hash" : self.rqdata.runq_hash,
975 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
976 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
977 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
978 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
979 "prhost" : self.cooker.prhost,
980 "buildname" : self.cfgData.getVar("BUILDNAME", True),
981 "date" : self.cfgData.getVar("DATE", True),
982 "time" : self.cfgData.getVar("TIME", True),
983 }
984
985 worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
986 worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
987 worker.stdin.flush()
988
989 return worker, workerpipe
990
991 def _teardown_worker(self, worker, workerpipe):
992 if not worker:
993 return
994 logger.debug(1, "Teardown for bitbake-worker")
995 try:
996 worker.stdin.write("<quit></quit>")
997 worker.stdin.flush()
998 except IOError:
999 pass
1000 while worker.returncode is None:
1001 workerpipe.read()
1002 worker.poll()
1003 while workerpipe.read():
1004 continue
1005 workerpipe.close()
1006
1007 def start_worker(self):
1008 if self.worker:
1009 self.teardown_workers()
1010 self.teardown = False
1011 self.worker, self.workerpipe = self._start_worker()
1012
1013 def start_fakeworker(self, rqexec):
1014 if not self.fakeworker:
1015 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
1016
1017 def teardown_workers(self):
1018 self.teardown = True
1019 self._teardown_worker(self.worker, self.workerpipe)
1020 self.worker = None
1021 self.workerpipe = None
1022 self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
1023 self.fakeworker = None
1024 self.fakeworkerpipe = None
1025
1026 def read_workers(self):
1027 self.workerpipe.read()
1028 if self.fakeworkerpipe:
1029 self.fakeworkerpipe.read()
1030
1031 def active_fds(self):
1032 fds = []
1033 if self.workerpipe:
1034 fds.append(self.workerpipe.input)
1035 if self.fakeworkerpipe:
1036 fds.append(self.fakeworkerpipe.input)
1037 return fds
1038
1039 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
1040 def get_timestamp(f):
1041 try:
1042 if not os.access(f, os.F_OK):
1043 return None
1044 return os.stat(f)[stat.ST_MTIME]
1045 except:
1046 return None
1047
1048 if self.stamppolicy == "perfile":
1049 fulldeptree = False
1050 else:
1051 fulldeptree = True
1052 stampwhitelist = []
1053 if self.stamppolicy == "whitelist":
1054 stampwhitelist = self.rqdata.stampfnwhitelist
1055
1056 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1057 if taskname is None:
1058 taskname = self.rqdata.runq_task[task]
1059
1060 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1061
1062 # If the stamp is missing, it's not current
1063 if not os.access(stampfile, os.F_OK):
1064 logger.debug(2, "Stampfile %s not available", stampfile)
1065 return False
1066 # If it's a 'nostamp' task, it's not current
1067 taskdep = self.rqdata.dataCache.task_deps[fn]
1068 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1069 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
1070 return False
1071
1072 if taskname != "do_setscene" and taskname.endswith("_setscene"):
1073 return True
1074
1075 if cache is None:
1076 cache = {}
1077
1078 iscurrent = True
1079 t1 = get_timestamp(stampfile)
1080 for dep in self.rqdata.runq_depends[task]:
1081 if iscurrent:
1082 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
1083 taskname2 = self.rqdata.runq_task[dep]
1084 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
1085 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
1086 t2 = get_timestamp(stampfile2)
1087 t3 = get_timestamp(stampfile3)
1088 if t3 and t3 > t2:
1089 continue
1090 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
1091 if not t2:
1092 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
1093 iscurrent = False
1094 if t1 < t2:
1095 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
1096 iscurrent = False
1097 if recurse and iscurrent:
1098 if dep in cache:
1099 iscurrent = cache[dep]
1100 if not iscurrent:
1101 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1102 else:
1103 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1104 cache[dep] = iscurrent
1105 if recurse:
1106 cache[task] = iscurrent
1107 return iscurrent
1108
1109 def _execute_runqueue(self):
1110 """
1111 Run the tasks in a queue prepared by rqdata.prepare()
1112 Upon failure, optionally try to recover the build using any alternate providers
1113 (if the abort on failure configuration option isn't set)
1114 """
1115
1116 retval = True
1117
1118 if self.state is runQueuePrepare:
1119 self.rqexe = RunQueueExecuteDummy(self)
1120 if self.rqdata.prepare() == 0:
1121 self.state = runQueueComplete
1122 else:
1123 self.state = runQueueSceneInit
1124
1125 # we are ready to run, see if any UI client needs the dependency info
1126 if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset:
1127 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1128 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1129
1130 if self.state is runQueueSceneInit:
1131 dump = self.cooker.configuration.dump_signatures
1132 if dump:
1133 if 'printdiff' in dump:
1134 invalidtasks = self.print_diffscenetasks()
1135 self.dump_signatures(dump)
1136 if 'printdiff' in dump:
1137 self.write_diffscenetasks(invalidtasks)
1138 self.state = runQueueComplete
1139 else:
1140 self.start_worker()
1141 self.rqexe = RunQueueExecuteScenequeue(self)
1142
1143 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
1144 self.dm.check(self)
1145
1146 if self.state is runQueueSceneRun:
1147 retval = self.rqexe.execute()
1148
1149 if self.state is runQueueRunInit:
Patrick Williamsd8c66bc2016-06-20 12:57:21 -05001150 if self.cooker.configuration.setsceneonly:
1151 self.state = runQueueComplete
1152 else:
1153 logger.info("Executing RunQueue Tasks")
1154 self.rqexe = RunQueueExecuteTasks(self)
1155 self.state = runQueueRunning
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001156
1157 if self.state is runQueueRunning:
1158 retval = self.rqexe.execute()
1159
1160 if self.state is runQueueCleanUp:
1161 retval = self.rqexe.finish()
1162
1163 if (self.state is runQueueComplete or self.state is runQueueFailed) and self.rqexe:
1164 self.teardown_workers()
1165 if self.rqexe.stats.failed:
1166 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1167 else:
1168 # Let's avoid the word "failed" if nothing actually did
1169 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1170
1171 if self.state is runQueueFailed:
1172 if not self.rqdata.taskData.tryaltconfigs:
1173 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
1174 for fnid in self.rqexe.failed_fnids:
1175 self.rqdata.taskData.fail_fnid(fnid)
1176 self.rqdata.reset()
1177
1178 if self.state is runQueueComplete:
1179 # All done
1180 return False
1181
1182 # Loop
1183 return retval
1184
1185 def execute_runqueue(self):
1186 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1187 try:
1188 return self._execute_runqueue()
1189 except bb.runqueue.TaskFailure:
1190 raise
1191 except SystemExit:
1192 raise
1193 except bb.BBHandledException:
1194 try:
1195 self.teardown_workers()
1196 except:
1197 pass
1198 self.state = runQueueComplete
1199 raise
1200 except:
1201 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
1202 try:
1203 self.teardown_workers()
1204 except:
1205 pass
1206 self.state = runQueueComplete
1207 raise
1208
1209 def finish_runqueue(self, now = False):
1210 if not self.rqexe:
1211 self.state = runQueueComplete
1212 return
1213
1214 if now:
1215 self.rqexe.finish_now()
1216 else:
1217 self.rqexe.finish()
1218
1219 def dump_signatures(self, options):
1220 done = set()
1221 bb.note("Reparsing files to collect dependency data")
1222 for task in range(len(self.rqdata.runq_fnid)):
1223 if self.rqdata.runq_fnid[task] not in done:
1224 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1225 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1226 done.add(self.rqdata.runq_fnid[task])
1227
1228 bb.parse.siggen.dump_sigs(self.rqdata.dataCache, options)
1229
1230 return
1231
1232 def print_diffscenetasks(self):
1233
1234 valid = []
1235 sq_hash = []
1236 sq_hashfn = []
1237 sq_fn = []
1238 sq_taskname = []
1239 sq_task = []
1240 noexec = []
1241 stamppresent = []
1242 valid_new = set()
1243
1244 for task in xrange(len(self.rqdata.runq_fnid)):
1245 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1246 taskname = self.rqdata.runq_task[task]
1247 taskdep = self.rqdata.dataCache.task_deps[fn]
1248
1249 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1250 noexec.append(task)
1251 continue
1252
1253 sq_fn.append(fn)
1254 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1255 sq_hash.append(self.rqdata.runq_hash[task])
1256 sq_taskname.append(taskname)
1257 sq_task.append(task)
1258 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.expanded_data }
1259 try:
1260 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=True)"
1261 valid = bb.utils.better_eval(call, locs)
1262 # Handle version with no siginfo parameter
1263 except TypeError:
1264 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1265 valid = bb.utils.better_eval(call, locs)
1266 for v in valid:
1267 valid_new.add(sq_task[v])
1268
1269 # Tasks which are both setscene and noexec never care about dependencies
1270 # We therefore find tasks which are setscene and noexec and mark their
1271 # unique dependencies as valid.
1272 for task in noexec:
1273 if task not in self.rqdata.runq_setscene:
1274 continue
1275 for dep in self.rqdata.runq_depends[task]:
1276 hasnoexecparents = True
1277 for dep2 in self.rqdata.runq_revdeps[dep]:
1278 if dep2 in self.rqdata.runq_setscene and dep2 in noexec:
1279 continue
1280 hasnoexecparents = False
1281 break
1282 if hasnoexecparents:
1283 valid_new.add(dep)
1284
1285 invalidtasks = set()
1286 for task in xrange(len(self.rqdata.runq_fnid)):
1287 if task not in valid_new and task not in noexec:
1288 invalidtasks.add(task)
1289
1290 found = set()
1291 processed = set()
1292 for task in invalidtasks:
1293 toprocess = set([task])
1294 while toprocess:
1295 next = set()
1296 for t in toprocess:
1297 for dep in self.rqdata.runq_depends[t]:
1298 if dep in invalidtasks:
1299 found.add(task)
1300 if dep not in processed:
1301 processed.add(dep)
1302 next.add(dep)
1303 toprocess = next
1304 if task in found:
1305 toprocess = set()
1306
1307 tasklist = []
1308 for task in invalidtasks.difference(found):
1309 tasklist.append(self.rqdata.get_user_idstring(task))
1310
1311 if tasklist:
1312 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1313
1314 return invalidtasks.difference(found)
1315
1316 def write_diffscenetasks(self, invalidtasks):
1317
1318 # Define recursion callback
1319 def recursecb(key, hash1, hash2):
1320 hashes = [hash1, hash2]
1321 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1322
1323 recout = []
1324 if len(hashfiles) == 2:
1325 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1326 recout.extend(list(' ' + l for l in out2))
1327 else:
1328 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1329
1330 return recout
1331
1332
1333 for task in invalidtasks:
1334 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1335 pn = self.rqdata.dataCache.pkg_fn[fn]
1336 taskname = self.rqdata.runq_task[task]
1337 h = self.rqdata.runq_hash[task]
1338 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1339 match = None
1340 for m in matches:
1341 if h in m:
1342 match = m
1343 if match is None:
1344 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1345 matches = {k : v for k, v in matches.iteritems() if h not in k}
1346 if matches:
1347 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1348 prevh = __find_md5__.search(latestmatch).group(0)
1349 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1350 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
1351
1352class RunQueueExecute:
1353
1354 def __init__(self, rq):
1355 self.rq = rq
1356 self.cooker = rq.cooker
1357 self.cfgData = rq.cfgData
1358 self.rqdata = rq.rqdata
1359
1360 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
1361 self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
1362
1363 self.runq_buildable = []
1364 self.runq_running = []
1365 self.runq_complete = []
1366
1367 self.build_stamps = {}
1368 self.build_stamps2 = []
1369 self.failed_fnids = []
1370
1371 self.stampcache = {}
1372
1373 rq.workerpipe.setrunqueueexec(self)
1374 if rq.fakeworkerpipe:
1375 rq.fakeworkerpipe.setrunqueueexec(self)
1376
1377 if self.number_tasks <= 0:
1378 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1379
1380 def runqueue_process_waitpid(self, task, status):
1381
1382 # self.build_stamps[pid] may not exist when use shared work directory.
1383 if task in self.build_stamps:
1384 self.build_stamps2.remove(self.build_stamps[task])
1385 del self.build_stamps[task]
1386
1387 if status != 0:
1388 self.task_fail(task, status)
1389 else:
1390 self.task_complete(task)
1391 return True
1392
1393 def finish_now(self):
1394
1395 for worker in [self.rq.worker, self.rq.fakeworker]:
1396 if not worker:
1397 continue
1398 try:
1399 worker.stdin.write("<finishnow></finishnow>")
1400 worker.stdin.flush()
1401 except IOError:
1402 # worker must have died?
1403 pass
1404
1405 if len(self.failed_fnids) != 0:
1406 self.rq.state = runQueueFailed
1407 return
1408
1409 self.rq.state = runQueueComplete
1410 return
1411
1412 def finish(self):
1413 self.rq.state = runQueueCleanUp
1414
1415 if self.stats.active > 0:
1416 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1417 self.rq.read_workers()
1418 return self.rq.active_fds()
1419
1420 if len(self.failed_fnids) != 0:
1421 self.rq.state = runQueueFailed
1422 return True
1423
1424 self.rq.state = runQueueComplete
1425 return True
1426
1427 def check_dependencies(self, task, taskdeps, setscene = False):
1428 if not self.rq.depvalidate:
1429 return False
1430
1431 taskdata = {}
1432 taskdeps.add(task)
1433 for dep in taskdeps:
1434 if setscene:
1435 depid = self.rqdata.runq_setscene[dep]
1436 else:
1437 depid = dep
1438 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
1439 pn = self.rqdata.dataCache.pkg_fn[fn]
1440 taskname = self.rqdata.runq_task[depid]
1441 taskdata[dep] = [pn, taskname, fn]
1442 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1443 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.expanded_data }
1444 valid = bb.utils.better_eval(call, locs)
1445 return valid
1446
1447class RunQueueExecuteDummy(RunQueueExecute):
1448 def __init__(self, rq):
1449 self.rq = rq
1450 self.stats = RunQueueStats(0)
1451
1452 def finish(self):
1453 self.rq.state = runQueueComplete
1454 return
1455
1456class RunQueueExecuteTasks(RunQueueExecute):
1457 def __init__(self, rq):
1458 RunQueueExecute.__init__(self, rq)
1459
1460 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1461
1462 self.stampcache = {}
1463
1464 initial_covered = self.rq.scenequeue_covered.copy()
1465
1466 # Mark initial buildable tasks
1467 for task in xrange(self.stats.total):
1468 self.runq_running.append(0)
1469 self.runq_complete.append(0)
1470 if len(self.rqdata.runq_depends[task]) == 0:
1471 self.runq_buildable.append(1)
1472 else:
1473 self.runq_buildable.append(0)
Patrick Williamsd8c66bc2016-06-20 12:57:21 -05001474 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001475 self.rq.scenequeue_covered.add(task)
1476
1477 found = True
1478 while found:
1479 found = False
1480 for task in xrange(self.stats.total):
1481 if task in self.rq.scenequeue_covered:
1482 continue
1483 logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
1484
Patrick Williamsd8c66bc2016-06-20 12:57:21 -05001485 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001486 found = True
1487 self.rq.scenequeue_covered.add(task)
1488
1489 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1490
1491 # Allow the metadata to elect for setscene tasks to run anyway
1492 covered_remove = set()
1493 if self.rq.setsceneverify:
1494 invalidtasks = []
1495 for task in xrange(len(self.rqdata.runq_task)):
1496 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1497 taskname = self.rqdata.runq_task[task]
1498 taskdep = self.rqdata.dataCache.task_deps[fn]
1499
1500 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1501 continue
1502 if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
1503 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1504 continue
1505 if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
1506 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1507 continue
1508 invalidtasks.append(task)
1509
1510 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
1511 call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1512 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.expanded_data, "invalidtasks" : invalidtasks }
1513 # Backwards compatibility with older versions without invalidtasks
1514 try:
1515 covered_remove = bb.utils.better_eval(call, locs)
1516 except TypeError:
1517 covered_remove = bb.utils.better_eval(call2, locs)
1518
1519 def removecoveredtask(task):
1520 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1521 taskname = self.rqdata.runq_task[task] + '_setscene'
1522 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1523 self.rq.scenequeue_covered.remove(task)
1524
1525 toremove = covered_remove
1526 for task in toremove:
1527 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1528 while toremove:
1529 covered_remove = []
1530 for task in toremove:
1531 removecoveredtask(task)
1532 for deptask in self.rqdata.runq_depends[task]:
1533 if deptask not in self.rq.scenequeue_covered:
1534 continue
1535 if deptask in toremove or deptask in covered_remove or deptask in initial_covered:
1536 continue
1537 logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask))
1538 covered_remove.append(deptask)
1539 toremove = covered_remove
1540
1541 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1542
1543 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1544
1545 schedulers = self.get_schedulers()
1546 for scheduler in schedulers:
1547 if self.scheduler == scheduler.name:
1548 self.sched = scheduler(self, self.rqdata)
1549 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1550 break
1551 else:
1552 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1553 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1554
1555 def get_schedulers(self):
1556 schedulers = set(obj for obj in globals().values()
1557 if type(obj) is type and
1558 issubclass(obj, RunQueueScheduler))
1559
1560 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
1561 if user_schedulers:
1562 for sched in user_schedulers.split():
1563 if not "." in sched:
1564 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1565 continue
1566
1567 modname, name = sched.rsplit(".", 1)
1568 try:
1569 module = __import__(modname, fromlist=(name,))
1570 except ImportError as exc:
1571 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1572 raise SystemExit(1)
1573 else:
1574 schedulers.add(getattr(module, name))
1575 return schedulers
1576
1577 def setbuildable(self, task):
1578 self.runq_buildable[task] = 1
1579 self.sched.newbuilable(task)
1580
1581 def task_completeoutright(self, task):
1582 """
1583 Mark a task as completed
1584 Look at the reverse dependencies and mark any task with
1585 completed dependencies as buildable
1586 """
1587 self.runq_complete[task] = 1
1588 for revdep in self.rqdata.runq_revdeps[task]:
1589 if self.runq_running[revdep] == 1:
1590 continue
1591 if self.runq_buildable[revdep] == 1:
1592 continue
1593 alldeps = 1
1594 for dep in self.rqdata.runq_depends[revdep]:
1595 if self.runq_complete[dep] != 1:
1596 alldeps = 0
1597 if alldeps == 1:
1598 self.setbuildable(revdep)
1599 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1600 taskname = self.rqdata.runq_task[revdep]
1601 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1602
1603 def task_complete(self, task):
1604 self.stats.taskCompleted()
1605 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1606 self.task_completeoutright(task)
1607
1608 def task_fail(self, task, exitcode):
1609 """
1610 Called when a task has failed
1611 Updates the state engine with the failure
1612 """
1613 self.stats.taskFailed()
1614 fnid = self.rqdata.runq_fnid[task]
1615 self.failed_fnids.append(fnid)
1616 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1617 if self.rqdata.taskData.abort:
1618 self.rq.state = runQueueCleanUp
1619
1620 def task_skip(self, task, reason):
1621 self.runq_running[task] = 1
1622 self.setbuildable(task)
1623 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1624 self.task_completeoutright(task)
1625 self.stats.taskCompleted()
1626 self.stats.taskSkipped()
1627
1628 def execute(self):
1629 """
1630 Run the tasks in a queue prepared by rqdata.prepare()
1631 """
1632
1633 self.rq.read_workers()
1634
1635
1636 if self.stats.total == 0:
1637 # nothing to do
1638 self.rq.state = runQueueCleanUp
1639
1640 task = self.sched.next()
1641 if task is not None:
1642 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1643 taskname = self.rqdata.runq_task[task]
1644
1645 if task in self.rq.scenequeue_covered:
1646 logger.debug(2, "Setscene covered task %s (%s)", task,
1647 self.rqdata.get_user_idstring(task))
1648 self.task_skip(task, "covered")
1649 return True
1650
1651 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
1652 logger.debug(2, "Stamp current task %s (%s)", task,
1653 self.rqdata.get_user_idstring(task))
1654 self.task_skip(task, "existing")
1655 return True
1656
1657 taskdep = self.rqdata.dataCache.task_deps[fn]
1658 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1659 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1660 noexec=True)
1661 bb.event.fire(startevent, self.cfgData)
1662 self.runq_running[task] = 1
1663 self.stats.taskActive()
1664 if not self.cooker.configuration.dry_run:
1665 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1666 self.task_complete(task)
1667 return True
1668 else:
1669 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1670 bb.event.fire(startevent, self.cfgData)
1671
1672 taskdepdata = self.build_taskdepdata(task)
1673
1674 taskdep = self.rqdata.dataCache.task_deps[fn]
1675 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
1676 if not self.rq.fakeworker:
1677 try:
1678 self.rq.start_fakeworker(self)
1679 except OSError as exc:
1680 logger.critical("Failed to spawn fakeroot worker to run %s:%s: %s" % (fn, taskname, str(exc)))
1681 self.rq.state = runQueueFailed
1682 return True
1683 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1684 self.rq.fakeworker.stdin.flush()
1685 else:
1686 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1687 self.rq.worker.stdin.flush()
1688
1689 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1690 self.build_stamps2.append(self.build_stamps[task])
1691 self.runq_running[task] = 1
1692 self.stats.taskActive()
1693 if self.stats.active < self.number_tasks:
1694 return True
1695
1696 if self.stats.active > 0:
1697 self.rq.read_workers()
1698 return self.rq.active_fds()
1699
1700 if len(self.failed_fnids) != 0:
1701 self.rq.state = runQueueFailed
1702 return True
1703
1704 # Sanity Checks
1705 for task in xrange(self.stats.total):
1706 if self.runq_buildable[task] == 0:
1707 logger.error("Task %s never buildable!", task)
1708 if self.runq_running[task] == 0:
1709 logger.error("Task %s never ran!", task)
1710 if self.runq_complete[task] == 0:
1711 logger.error("Task %s never completed!", task)
1712 self.rq.state = runQueueComplete
1713
1714 return True
1715
1716 def build_taskdepdata(self, task):
1717 taskdepdata = {}
1718 next = self.rqdata.runq_depends[task]
1719 next.add(task)
1720 while next:
1721 additional = []
1722 for revdep in next:
1723 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1724 pn = self.rqdata.dataCache.pkg_fn[fn]
1725 taskname = self.rqdata.runq_task[revdep]
1726 deps = self.rqdata.runq_depends[revdep]
1727 provides = self.rqdata.dataCache.fn_provides[fn]
1728 taskdepdata[revdep] = [pn, taskname, fn, deps, provides]
1729 for revdep2 in deps:
1730 if revdep2 not in taskdepdata:
1731 additional.append(revdep2)
1732 next = additional
1733
1734 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
1735 return taskdepdata
1736
1737class RunQueueExecuteScenequeue(RunQueueExecute):
1738 def __init__(self, rq):
1739 RunQueueExecute.__init__(self, rq)
1740
1741 self.scenequeue_covered = set()
1742 self.scenequeue_notcovered = set()
1743 self.scenequeue_notneeded = set()
1744
1745 # If we don't have any setscene functions, skip this step
1746 if len(self.rqdata.runq_setscene) == 0:
1747 rq.scenequeue_covered = set()
1748 rq.state = runQueueRunInit
1749 return
1750
1751 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1752
1753 sq_revdeps = []
1754 sq_revdeps_new = []
1755 sq_revdeps_squash = []
1756 self.sq_harddeps = {}
1757
1758 # We need to construct a dependency graph for the setscene functions. Intermediate
1759 # dependencies between the setscene tasks only complicate the code. This code
1760 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1761 # only containing the setscene functions.
1762
1763 for task in xrange(self.stats.total):
1764 self.runq_running.append(0)
1765 self.runq_complete.append(0)
1766 self.runq_buildable.append(0)
1767
1768 # First process the chains up to the first setscene task.
1769 endpoints = {}
1770 for task in xrange(len(self.rqdata.runq_fnid)):
1771 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1772 sq_revdeps_new.append(set())
1773 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1774 endpoints[task] = set()
1775
1776 # Secondly process the chains between setscene tasks.
1777 for task in self.rqdata.runq_setscene:
1778 for dep in self.rqdata.runq_depends[task]:
1779 if dep not in endpoints:
1780 endpoints[dep] = set()
1781 endpoints[dep].add(task)
1782
1783 def process_endpoints(endpoints):
1784 newendpoints = {}
1785 for point, task in endpoints.items():
1786 tasks = set()
1787 if task:
1788 tasks |= task
1789 if sq_revdeps_new[point]:
1790 tasks |= sq_revdeps_new[point]
1791 sq_revdeps_new[point] = set()
1792 if point in self.rqdata.runq_setscene:
1793 sq_revdeps_new[point] = tasks
Patrick Williamsd8c66bc2016-06-20 12:57:21 -05001794 tasks = set()
Patrick Williamsc124f4f2015-09-15 14:41:29 -05001795 for dep in self.rqdata.runq_depends[point]:
1796 if point in sq_revdeps[dep]:
1797 sq_revdeps[dep].remove(point)
1798 if tasks:
1799 sq_revdeps_new[dep] |= tasks
1800 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1801 newendpoints[dep] = task
1802 if len(newendpoints) != 0:
1803 process_endpoints(newendpoints)
1804
1805 process_endpoints(endpoints)
1806
1807 # Build a list of setscene tasks which are "unskippable"
1808 # These are direct endpoints referenced by the build
1809 endpoints2 = {}
1810 sq_revdeps2 = []
1811 sq_revdeps_new2 = []
1812 def process_endpoints2(endpoints):
1813 newendpoints = {}
1814 for point, task in endpoints.items():
1815 tasks = set([point])
1816 if task:
1817 tasks |= task
1818 if sq_revdeps_new2[point]:
1819 tasks |= sq_revdeps_new2[point]
1820 sq_revdeps_new2[point] = set()
1821 if point in self.rqdata.runq_setscene:
1822 sq_revdeps_new2[point] = tasks
1823 for dep in self.rqdata.runq_depends[point]:
1824 if point in sq_revdeps2[dep]:
1825 sq_revdeps2[dep].remove(point)
1826 if tasks:
1827 sq_revdeps_new2[dep] |= tasks
1828 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1829 newendpoints[dep] = tasks
1830 if len(newendpoints) != 0:
1831 process_endpoints2(newendpoints)
1832 for task in xrange(len(self.rqdata.runq_fnid)):
1833 sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
1834 sq_revdeps_new2.append(set())
1835 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1836 endpoints2[task] = set()
1837 process_endpoints2(endpoints2)
1838 self.unskippable = []
1839 for task in self.rqdata.runq_setscene:
1840 if sq_revdeps_new2[task]:
1841 self.unskippable.append(self.rqdata.runq_setscene.index(task))
1842
1843 for task in xrange(len(self.rqdata.runq_fnid)):
1844 if task in self.rqdata.runq_setscene:
1845 deps = set()
1846 for dep in sq_revdeps_new[task]:
1847 deps.add(self.rqdata.runq_setscene.index(dep))
1848 sq_revdeps_squash.append(deps)
1849 elif len(sq_revdeps_new[task]) != 0:
1850 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1851
1852 # Resolve setscene inter-task dependencies
1853 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
1854 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
1855 for task in self.rqdata.runq_setscene:
1856 realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
1857 idepends = self.rqdata.taskData.tasks_idepends[realid]
1858 for (depid, idependtask) in idepends:
1859 if depid not in self.rqdata.taskData.build_targets:
1860 continue
1861
1862 depdata = self.rqdata.taskData.build_targets[depid][0]
1863 if depdata is None:
1864 continue
1865 dep = self.rqdata.taskData.fn_index[depdata]
1866 taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
1867 if taskid is None:
1868 bb.msg.fatal("RunQueue", "Task %s_setscene depends upon non-existent task %s:%s" % (self.rqdata.get_user_idstring(task), dep, idependtask))
1869
1870 if not self.rqdata.runq_setscene.index(taskid) in self.sq_harddeps:
1871 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)] = set()
1872 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)].add(self.rqdata.runq_setscene.index(task))
1873
1874 sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
1875 # Have to zero this to avoid circular dependencies
1876 sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
1877
1878 for task in self.sq_harddeps:
1879 for dep in self.sq_harddeps[task]:
1880 sq_revdeps_squash[dep].add(task)
1881
1882 #for task in xrange(len(sq_revdeps_squash)):
1883 # realtask = self.rqdata.runq_setscene[task]
1884 # bb.warn("Task %s: %s_setscene is %s " % (task, self.rqdata.get_user_idstring(realtask) , sq_revdeps_squash[task]))
1885
1886 self.sq_deps = []
1887 self.sq_revdeps = sq_revdeps_squash
1888 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1889
1890 for task in xrange(len(self.sq_revdeps)):
1891 self.sq_deps.append(set())
1892 for task in xrange(len(self.sq_revdeps)):
1893 for dep in self.sq_revdeps[task]:
1894 self.sq_deps[dep].add(task)
1895
1896 for task in xrange(len(self.sq_revdeps)):
1897 if len(self.sq_revdeps[task]) == 0:
1898 self.runq_buildable[task] = 1
1899
1900 self.outrightfail = []
1901 if self.rq.hashvalidate:
1902 sq_hash = []
1903 sq_hashfn = []
1904 sq_fn = []
1905 sq_taskname = []
1906 sq_task = []
1907 noexec = []
1908 stamppresent = []
1909 for task in xrange(len(self.sq_revdeps)):
1910 realtask = self.rqdata.runq_setscene[task]
1911 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1912 taskname = self.rqdata.runq_task[realtask]
1913 taskdep = self.rqdata.dataCache.task_deps[fn]
1914
1915 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1916 noexec.append(task)
1917 self.task_skip(task)
1918 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1919 continue
1920
1921 if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
1922 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1923 stamppresent.append(task)
1924 self.task_skip(task)
1925 continue
1926
1927 if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
1928 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1929 stamppresent.append(task)
1930 self.task_skip(task)
1931 continue
1932
1933 sq_fn.append(fn)
1934 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1935 sq_hash.append(self.rqdata.runq_hash[realtask])
1936 sq_taskname.append(taskname)
1937 sq_task.append(task)
1938 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1939 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.expanded_data }
1940 valid = bb.utils.better_eval(call, locs)
1941
1942 valid_new = stamppresent
1943 for v in valid:
1944 valid_new.append(sq_task[v])
1945
1946 for task in xrange(len(self.sq_revdeps)):
1947 if task not in valid_new and task not in noexec:
1948 realtask = self.rqdata.runq_setscene[task]
1949 logger.debug(2, 'No package found, so skipping setscene task %s',
1950 self.rqdata.get_user_idstring(realtask))
1951 self.outrightfail.append(task)
1952
1953 logger.info('Executing SetScene Tasks')
1954
1955 self.rq.state = runQueueSceneRun
1956
1957 def scenequeue_updatecounters(self, task, fail = False):
1958 for dep in self.sq_deps[task]:
1959 if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]:
1960 realtask = self.rqdata.runq_setscene[task]
1961 realdep = self.rqdata.runq_setscene[dep]
1962 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (self.rqdata.get_user_idstring(realtask), self.rqdata.get_user_idstring(realdep)))
1963 self.scenequeue_updatecounters(dep, fail)
1964 continue
1965 if task not in self.sq_revdeps2[dep]:
1966 # May already have been removed by the fail case above
1967 continue
1968 self.sq_revdeps2[dep].remove(task)
1969 if len(self.sq_revdeps2[dep]) == 0:
1970 self.runq_buildable[dep] = 1
1971
1972 def task_completeoutright(self, task):
1973 """
1974 Mark a task as completed
1975 Look at the reverse dependencies and mark any task with
1976 completed dependencies as buildable
1977 """
1978
1979 index = self.rqdata.runq_setscene[task]
1980 logger.debug(1, 'Found task %s which could be accelerated',
1981 self.rqdata.get_user_idstring(index))
1982
1983 self.scenequeue_covered.add(task)
1984 self.scenequeue_updatecounters(task)
1985
1986 def task_complete(self, task):
1987 self.stats.taskCompleted()
1988 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1989 self.task_completeoutright(task)
1990
1991 def task_fail(self, task, result):
1992 self.stats.taskFailed()
1993 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1994 self.scenequeue_notcovered.add(task)
1995 self.scenequeue_updatecounters(task, True)
1996
1997 def task_failoutright(self, task):
1998 self.runq_running[task] = 1
1999 self.runq_buildable[task] = 1
2000 self.stats.taskCompleted()
2001 self.stats.taskSkipped()
2002 index = self.rqdata.runq_setscene[task]
2003 self.scenequeue_notcovered.add(task)
2004 self.scenequeue_updatecounters(task, True)
2005
2006 def task_skip(self, task):
2007 self.runq_running[task] = 1
2008 self.runq_buildable[task] = 1
2009 self.task_completeoutright(task)
2010 self.stats.taskCompleted()
2011 self.stats.taskSkipped()
2012
2013 def execute(self):
2014 """
2015 Run the tasks in a queue prepared by prepare_runqueue
2016 """
2017
2018 self.rq.read_workers()
2019
2020 task = None
2021 if self.stats.active < self.number_tasks:
2022 # Find the next setscene to run
2023 for nexttask in xrange(self.stats.total):
2024 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
2025 if nexttask in self.unskippable:
2026 logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
2027 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
2028 realtask = self.rqdata.runq_setscene[nexttask]
2029 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
2030 foundtarget = False
2031 for target in self.rqdata.target_pairs:
2032 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
2033 foundtarget = True
2034 break
2035 if not foundtarget:
2036 logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
2037 self.task_skip(nexttask)
2038 self.scenequeue_notneeded.add(nexttask)
2039 return True
2040 if nexttask in self.outrightfail:
2041 self.task_failoutright(nexttask)
2042 return True
2043 task = nexttask
2044 break
2045 if task is not None:
2046 realtask = self.rqdata.runq_setscene[task]
2047 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
2048
2049 taskname = self.rqdata.runq_task[realtask] + "_setscene"
2050 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
2051 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
2052 task, self.rqdata.get_user_idstring(realtask))
2053 self.task_failoutright(task)
2054 return True
2055
2056 if self.cooker.configuration.force:
2057 for target in self.rqdata.target_pairs:
2058 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
2059 self.task_failoutright(task)
2060 return True
2061
2062 if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
2063 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
2064 task, self.rqdata.get_user_idstring(realtask))
2065 self.task_skip(task)
2066 return True
2067
2068 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2069 bb.event.fire(startevent, self.cfgData)
2070
2071 taskdep = self.rqdata.dataCache.task_deps[fn]
2072 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
2073 if not self.rq.fakeworker:
2074 self.rq.start_fakeworker(self)
2075 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
2076 self.rq.fakeworker.stdin.flush()
2077 else:
2078 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
2079 self.rq.worker.stdin.flush()
2080
2081 self.runq_running[task] = 1
2082 self.stats.taskActive()
2083 if self.stats.active < self.number_tasks:
2084 return True
2085
2086 if self.stats.active > 0:
2087 self.rq.read_workers()
2088 return self.rq.active_fds()
2089
2090 #for task in xrange(self.stats.total):
2091 # if self.runq_running[task] != 1:
2092 # buildable = self.runq_buildable[task]
2093 # revdeps = self.sq_revdeps[task]
2094 # bb.warn("Found we didn't run %s %s %s %s" % (task, buildable, str(revdeps), self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
2095
2096 # Convert scenequeue_covered task numbers into full taskgraph ids
2097 oldcovered = self.scenequeue_covered
2098 self.rq.scenequeue_covered = set()
2099 for task in oldcovered:
2100 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
Patrick Williamsc124f4f2015-09-15 14:41:29 -05002101
2102 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
2103
2104 self.rq.state = runQueueRunInit
2105
2106 completeevent = sceneQueueComplete(self.stats, self.rq)
2107 bb.event.fire(completeevent, self.cfgData)
2108
2109 return True
2110
2111 def runqueue_process_waitpid(self, task, status):
2112 task = self.rq.rqdata.runq_setscene.index(task)
2113
2114 RunQueueExecute.runqueue_process_waitpid(self, task, status)
2115
2116class TaskFailure(Exception):
2117 """
2118 Exception raised when a task in a runqueue fails
2119 """
2120 def __init__(self, x):
2121 self.args = x
2122
2123
2124class runQueueExitWait(bb.event.Event):
2125 """
2126 Event when waiting for task processes to exit
2127 """
2128
2129 def __init__(self, remain):
2130 self.remain = remain
2131 self.message = "Waiting for %s active tasks to finish" % remain
2132 bb.event.Event.__init__(self)
2133
2134class runQueueEvent(bb.event.Event):
2135 """
2136 Base runQueue event class
2137 """
2138 def __init__(self, task, stats, rq):
2139 self.taskid = task
2140 self.taskstring = rq.rqdata.get_user_idstring(task)
2141 self.taskname = rq.rqdata.get_task_name(task)
2142 self.taskfile = rq.rqdata.get_task_file(task)
2143 self.taskhash = rq.rqdata.get_task_hash(task)
2144 self.stats = stats.copy()
2145 bb.event.Event.__init__(self)
2146
2147class sceneQueueEvent(runQueueEvent):
2148 """
2149 Base sceneQueue event class
2150 """
2151 def __init__(self, task, stats, rq, noexec=False):
2152 runQueueEvent.__init__(self, task, stats, rq)
2153 realtask = rq.rqdata.runq_setscene[task]
2154 self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
2155 self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene"
2156 self.taskfile = rq.rqdata.get_task_file(realtask)
2157 self.taskhash = rq.rqdata.get_task_hash(realtask)
2158
2159class runQueueTaskStarted(runQueueEvent):
2160 """
2161 Event notifying a task was started
2162 """
2163 def __init__(self, task, stats, rq, noexec=False):
2164 runQueueEvent.__init__(self, task, stats, rq)
2165 self.noexec = noexec
2166
2167class sceneQueueTaskStarted(sceneQueueEvent):
2168 """
2169 Event notifying a setscene task was started
2170 """
2171 def __init__(self, task, stats, rq, noexec=False):
2172 sceneQueueEvent.__init__(self, task, stats, rq)
2173 self.noexec = noexec
2174
2175class runQueueTaskFailed(runQueueEvent):
2176 """
2177 Event notifying a task failed
2178 """
2179 def __init__(self, task, stats, exitcode, rq):
2180 runQueueEvent.__init__(self, task, stats, rq)
2181 self.exitcode = exitcode
2182
2183class sceneQueueTaskFailed(sceneQueueEvent):
2184 """
2185 Event notifying a setscene task failed
2186 """
2187 def __init__(self, task, stats, exitcode, rq):
2188 sceneQueueEvent.__init__(self, task, stats, rq)
2189 self.exitcode = exitcode
2190
2191class sceneQueueComplete(sceneQueueEvent):
2192 """
2193 Event when all the sceneQueue tasks are complete
2194 """
2195 def __init__(self, stats, rq):
2196 self.stats = stats.copy()
2197 bb.event.Event.__init__(self)
2198
2199class runQueueTaskCompleted(runQueueEvent):
2200 """
2201 Event notifying a task completed
2202 """
2203
2204class sceneQueueTaskCompleted(sceneQueueEvent):
2205 """
2206 Event notifying a setscene task completed
2207 """
2208
2209class runQueueTaskSkipped(runQueueEvent):
2210 """
2211 Event notifying a task was skipped
2212 """
2213 def __init__(self, task, stats, rq, reason):
2214 runQueueEvent.__init__(self, task, stats, rq)
2215 self.reason = reason
2216
2217class runQueuePipe():
2218 """
2219 Abstraction for a pipe between a worker thread and the server
2220 """
2221 def __init__(self, pipein, pipeout, d, rq, rqexec):
2222 self.input = pipein
2223 if pipeout:
2224 pipeout.close()
2225 bb.utils.nonblockingfd(self.input)
2226 self.queue = ""
2227 self.d = d
2228 self.rq = rq
2229 self.rqexec = rqexec
2230
2231 def setrunqueueexec(self, rqexec):
2232 self.rqexec = rqexec
2233
2234 def read(self):
2235 for w in [self.rq.worker, self.rq.fakeworker]:
2236 if not w:
2237 continue
2238 w.poll()
2239 if w.returncode is not None and not self.rq.teardown:
2240 name = None
2241 if self.rq.worker and w.pid == self.rq.worker.pid:
2242 name = "Worker"
2243 elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
2244 name = "Fakeroot"
2245 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
2246 self.rq.finish_runqueue(True)
2247
2248 start = len(self.queue)
2249 try:
2250 self.queue = self.queue + self.input.read(102400)
2251 except (OSError, IOError) as e:
2252 if e.errno != errno.EAGAIN:
2253 raise
2254 end = len(self.queue)
2255 found = True
2256 while found and len(self.queue):
2257 found = False
2258 index = self.queue.find("</event>")
2259 while index != -1 and self.queue.startswith("<event>"):
2260 try:
2261 event = pickle.loads(self.queue[7:index])
2262 except ValueError as e:
2263 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2264 bb.event.fire_from_worker(event, self.d)
2265 found = True
2266 self.queue = self.queue[index+8:]
2267 index = self.queue.find("</event>")
2268 index = self.queue.find("</exitcode>")
2269 while index != -1 and self.queue.startswith("<exitcode>"):
2270 try:
2271 task, status = pickle.loads(self.queue[10:index])
2272 except ValueError as e:
2273 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
2274 self.rqexec.runqueue_process_waitpid(task, status)
2275 found = True
2276 self.queue = self.queue[index+11:]
2277 index = self.queue.find("</exitcode>")
2278 return (end > start)
2279
2280 def close(self):
2281 while self.read():
2282 continue
2283 if len(self.queue) > 0:
2284 print("Warning, worker left partial message: %s" % self.queue)
2285 self.input.close()