Patrick Williams | d8c66bc | 2016-06-20 12:57:21 -0500 | [diff] [blame^] | 1 | """Helper module for GPG signing""" |
| 2 | import os |
| 3 | |
| 4 | import bb |
| 5 | import oe.utils |
| 6 | |
| 7 | class LocalSigner(object): |
| 8 | """Class for handling local (on the build host) signing""" |
| 9 | def __init__(self, d): |
| 10 | self.gpg_bin = d.getVar('GPG_BIN', True) or \ |
| 11 | bb.utils.which(os.getenv('PATH'), 'gpg') |
| 12 | self.gpg_path = d.getVar('GPG_PATH', True) |
| 13 | self.rpm_bin = bb.utils.which(os.getenv('PATH'), "rpm") |
| 14 | |
| 15 | def export_pubkey(self, output_file, keyid, armor=True): |
| 16 | """Export GPG public key to a file""" |
| 17 | cmd = '%s --batch --yes --export -o %s ' % \ |
| 18 | (self.gpg_bin, output_file) |
| 19 | if self.gpg_path: |
| 20 | cmd += "--homedir %s " % self.gpg_path |
| 21 | if armor: |
| 22 | cmd += "--armor " |
| 23 | cmd += keyid |
| 24 | status, output = oe.utils.getstatusoutput(cmd) |
| 25 | if status: |
| 26 | raise bb.build.FuncFailed('Failed to export gpg public key (%s): %s' % |
| 27 | (keyid, output)) |
| 28 | |
| 29 | def sign_rpms(self, files, keyid, passphrase): |
| 30 | """Sign RPM files""" |
| 31 | |
| 32 | cmd = self.rpm_bin + " --addsign --define '_gpg_name %s' " % keyid |
| 33 | cmd += "--define '_gpg_passphrase %s' " % passphrase |
| 34 | if self.gpg_bin: |
| 35 | cmd += "--define '%%__gpg %s' " % self.gpg_bin |
| 36 | if self.gpg_path: |
| 37 | cmd += "--define '_gpg_path %s' " % self.gpg_path |
| 38 | cmd += ' '.join(files) |
| 39 | |
| 40 | status, output = oe.utils.getstatusoutput(cmd) |
| 41 | if status: |
| 42 | raise bb.build.FuncFailed("Failed to sign RPM packages: %s" % output) |
| 43 | |
| 44 | def detach_sign(self, input_file, keyid, passphrase_file, passphrase=None, armor=True): |
| 45 | """Create a detached signature of a file""" |
| 46 | import subprocess |
| 47 | |
| 48 | if passphrase_file and passphrase: |
| 49 | raise Exception("You should use either passphrase_file of passphrase, not both") |
| 50 | |
| 51 | cmd = [self.gpg_bin, '--detach-sign', '--batch', '--no-tty', '--yes', |
| 52 | '--passphrase-fd', '0', '-u', keyid] |
| 53 | |
| 54 | if self.gpg_path: |
| 55 | cmd += ['--homedir', self.gpg_path] |
| 56 | if armor: |
| 57 | cmd += ['--armor'] |
| 58 | |
| 59 | #gpg > 2.1 supports password pipes only through the loopback interface |
| 60 | #gpg < 2.1 errors out if given unknown parameters |
| 61 | dots = self.get_gpg_version().split('.') |
| 62 | assert len(dots) >= 2 |
| 63 | if int(dots[0]) >= 2 and int(dots[1]) >= 1: |
| 64 | cmd += ['--pinentry-mode', 'loopback'] |
| 65 | |
| 66 | cmd += [input_file] |
| 67 | |
| 68 | try: |
| 69 | if passphrase_file: |
| 70 | with open(passphrase_file) as fobj: |
| 71 | passphrase = fobj.readline(); |
| 72 | |
| 73 | job = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
| 74 | (_, stderr) = job.communicate(passphrase) |
| 75 | |
| 76 | if job.returncode: |
| 77 | raise bb.build.FuncFailed("GPG exited with code %d: %s" % |
| 78 | (job.returncode, stderr)) |
| 79 | |
| 80 | except IOError as e: |
| 81 | bb.error("IO error (%s): %s" % (e.errno, e.strerror)) |
| 82 | raise Exception("Failed to sign '%s'" % input_file) |
| 83 | |
| 84 | except OSError as e: |
| 85 | bb.error("OS error (%s): %s" % (e.errno, e.strerror)) |
| 86 | raise Exception("Failed to sign '%s" % input_file) |
| 87 | |
| 88 | |
| 89 | def get_gpg_version(self): |
| 90 | """Return the gpg version""" |
| 91 | import subprocess |
| 92 | try: |
| 93 | return subprocess.check_output((self.gpg_bin, "--version")).split()[2] |
| 94 | except subprocess.CalledProcessError as e: |
| 95 | raise bb.build.FuncFailed("Could not get gpg version: %s" % e) |
| 96 | |
| 97 | |
| 98 | def verify(self, sig_file): |
| 99 | """Verify signature""" |
| 100 | cmd = self.gpg_bin + " --verify " |
| 101 | if self.gpg_path: |
| 102 | cmd += "--homedir %s " % self.gpg_path |
| 103 | cmd += sig_file |
| 104 | status, _ = oe.utils.getstatusoutput(cmd) |
| 105 | ret = False if status else True |
| 106 | return ret |
| 107 | |
| 108 | |
| 109 | def get_signer(d, backend): |
| 110 | """Get signer object for the specified backend""" |
| 111 | # Use local signing by default |
| 112 | if backend == 'local': |
| 113 | return LocalSigner(d) |
| 114 | else: |
| 115 | bb.fatal("Unsupported signing backend '%s'" % backend) |
| 116 | |