| #!/usr/bin/env python3 |
| # bblock |
| # lock/unlock task to latest signature |
| # |
| # Copyright (c) 2023 BayLibre, SAS |
| # Author: Julien Stepahn <jstephan@baylibre.com> |
| # |
| # SPDX-License-Identifier: GPL-2.0-only |
| # |
| |
| import os |
| import sys |
| import logging |
| |
| scripts_path = os.path.dirname(os.path.realpath(__file__)) |
| lib_path = scripts_path + "/lib" |
| sys.path = sys.path + [lib_path] |
| |
| import scriptpath |
| |
| scriptpath.add_bitbake_lib_path() |
| |
| import bb.tinfoil |
| import bb.msg |
| |
| import argparse_oe |
| |
| myname = os.path.basename(sys.argv[0]) |
| logger = bb.msg.logger_create(myname) |
| |
| |
| def getTaskSignatures(tinfoil, pn, tasks): |
| tinfoil.set_event_mask( |
| [ |
| "bb.event.GetTaskSignatureResult", |
| "logging.LogRecord", |
| "bb.command.CommandCompleted", |
| "bb.command.CommandFailed", |
| ] |
| ) |
| ret = tinfoil.run_command("getTaskSignatures", pn, tasks) |
| if ret: |
| while True: |
| event = tinfoil.wait_event(1) |
| if event: |
| if isinstance(event, bb.command.CommandCompleted): |
| break |
| elif isinstance(event, bb.command.CommandFailed): |
| logger.error(str(event)) |
| sys.exit(2) |
| elif isinstance(event, bb.event.GetTaskSignatureResult): |
| sig = event.sig |
| elif isinstance(event, logging.LogRecord): |
| logger.handle(event) |
| else: |
| logger.error("No result returned from getTaskSignatures command") |
| sys.exit(2) |
| return sig |
| |
| |
| def parseRecipe(tinfoil, recipe): |
| try: |
| tinfoil.parse_recipes() |
| d = tinfoil.parse_recipe(recipe) |
| except Exception: |
| logger.error("Failed to get recipe info for: %s" % recipe) |
| sys.exit(1) |
| return d |
| |
| |
| def bblockDump(lockfile): |
| try: |
| with open(lockfile, "r") as lockfile: |
| for line in lockfile: |
| print(line.strip()) |
| except IOError: |
| return 1 |
| return 0 |
| |
| |
| def bblockReset(lockfile, pns, package_archs, tasks): |
| if not pns: |
| logger.info("Unlocking all recipes") |
| try: |
| os.remove(lockfile) |
| except FileNotFoundError: |
| pass |
| else: |
| logger.info("Unlocking {pns}".format(pns=pns)) |
| tmp_lockfile = lockfile + ".tmp" |
| with open(lockfile, "r") as infile, open(tmp_lockfile, "w") as outfile: |
| for line in infile: |
| if not ( |
| any(element in line for element in pns) |
| and any(element in line for element in package_archs.split()) |
| ): |
| outfile.write(line) |
| else: |
| if tasks and not any(element in line for element in tasks): |
| outfile.write(line) |
| os.remove(lockfile) |
| os.rename(tmp_lockfile, lockfile) |
| |
| |
| def main(): |
| parser = argparse_oe.ArgumentParser(description="Lock and unlock a recipe") |
| parser.add_argument("pn", nargs="*", help="Space separated list of recipe to lock") |
| parser.add_argument( |
| "-t", |
| "--tasks", |
| help="Comma separated list of tasks", |
| type=lambda s: [ |
| task if task.startswith("do_") else "do_" + task for task in s.split(",") |
| ], |
| ) |
| parser.add_argument( |
| "-r", |
| "--reset", |
| action="store_true", |
| help="Unlock pn recipes, or all recipes if pn is empty", |
| ) |
| parser.add_argument( |
| "-d", |
| "--dump", |
| action="store_true", |
| help="Dump generated bblock.conf file", |
| ) |
| |
| global_args, unparsed_args = parser.parse_known_args() |
| |
| with bb.tinfoil.Tinfoil() as tinfoil: |
| tinfoil.prepare(config_only=True) |
| |
| package_archs = tinfoil.config_data.getVar("PACKAGE_ARCHS") |
| builddir = tinfoil.config_data.getVar("TOPDIR") |
| lockfile = "{builddir}/conf/bblock.conf".format(builddir=builddir) |
| |
| if global_args.dump: |
| bblockDump(lockfile) |
| return 0 |
| |
| if global_args.reset: |
| bblockReset(lockfile, global_args.pn, package_archs, global_args.tasks) |
| return 0 |
| |
| with open(lockfile, "a") as lockfile: |
| s = "" |
| if lockfile.tell() == 0: |
| s = "# Generated by bblock\n" |
| s += 'SIGGEN_LOCKEDSIGS_TASKSIG_CHECK = "info"\n' |
| s += 'SIGGEN_LOCKEDSIGS_TYPES += "${PACKAGE_ARCHS}"\n' |
| s += "\n" |
| |
| for pn in global_args.pn: |
| d = parseRecipe(tinfoil, pn) |
| package_arch = d.getVar("PACKAGE_ARCH") |
| siggen_locked_sigs_package_arch = d.getVar( |
| "SIGGEN_LOCKEDSIGS_{package_arch}".format(package_arch=package_arch) |
| ) |
| sigs = getTaskSignatures(tinfoil, [pn], global_args.tasks) |
| for sig in sigs: |
| new_entry = "{pn}:{taskname}:{sig}".format( |
| pn=sig[0], taskname=sig[1], sig=sig[2] |
| ) |
| if ( |
| siggen_locked_sigs_package_arch |
| and not new_entry in siggen_locked_sigs_package_arch |
| ) or not siggen_locked_sigs_package_arch: |
| s += 'SIGGEN_LOCKEDSIGS_{package_arch} += "{new_entry}"\n'.format( |
| package_arch=package_arch, new_entry=new_entry |
| ) |
| lockfile.write(s) |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| try: |
| ret = main() |
| except Exception: |
| ret = 1 |
| import traceback |
| |
| traceback.print_exc() |
| sys.exit(ret) |