| #!/bin/env python |
| |
| import argparse |
| import requests |
| import tarfile |
| import time |
| import json |
| |
| import urllib3 |
| #import _sysconfigdata |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| |
| |
| class BMC: |
| def __init__(self, server): |
| self.url = "https://{0}/".format(server) |
| self.session = requests.Session() |
| self.login() |
| |
| def login(self): |
| r = self.session.post(self.url + 'login', |
| json={'data': ['root', '0penBmc']}, |
| verify=False) |
| j = r.json() |
| if j['status'] != 'ok': |
| raise Exception("Failed to login: \n" + r.text) |
| |
| def list_sfw(self): |
| r = self.session.get(self.url + 'xyz/openbmc_project/software/', |
| verify=False) |
| j = r.json() |
| if j['status'] != 'ok': |
| raise Exception("Failed to query software: \n" + r.text) |
| |
| events = j['data'] |
| |
| return events |
| |
| def get_image(self, image): |
| r = self.session.get(self.url + image, verify=False) |
| |
| j = r.json() |
| if j['status'] != 'ok': |
| raise Exception("Failed to get image " + image + ": \n" + r.text) |
| |
| return j['data'] |
| |
| def upload_image(self, image): |
| |
| data = open(image,'rb').read() |
| r = self.session.post(self.url + "/upload/image", |
| data=data, |
| headers={'Content-Type': 'application/octet-stream'}, |
| verify=False) |
| j = r.json() |
| if j['status'] != 'ok': |
| raise Exception("Failed to get event " + image + ": \n" + r.text) |
| |
| return j['data'] |
| |
| def activate_image(self, image_id): |
| r = self.session.put(self.url + "/xyz/openbmc_project/software/" + image_id + "/attr/RequestedActivation", |
| json={'data': 'xyz.openbmc_project.Software.Activation.RequestedActivations.Active'}, |
| verify=False) |
| |
| j = r.json() |
| if j['status'] != 'ok': |
| raise Exception("Failed to activate image " + image_id + ": \n" + r.text) |
| |
| return j['data'] |
| |
| def update_auto(self, image, reboot): |
| image_version = self.__get_image_version(image) |
| self.upload_image(image) |
| image_id = self.__wait_for_image_id(image_version) |
| self.activate_image(image_id) |
| self.__wait_for_activation(image_id) |
| if reboot: |
| self.reboot() |
| |
| def reboot(self): |
| r = self.session.post( |
| self.url + '/org/openbmc/control/bmc0/action/warmReset', |
| headers={'Content-Type': 'application/json'}, |
| data='{"data":[]}', |
| verify=False) |
| |
| j = r.json() |
| if j['status'] != 'ok': |
| raise Exception("Failed to reboot BMC:\n" + r.text) |
| |
| def __get_image_version(self, image_file_path): |
| # Open up the manfest file. |
| image_tar = tarfile.open(name=image_file_path, mode='r') |
| manifest = image_tar.extractfile('MANIFEST') |
| |
| # Find version line. |
| for line in manifest: |
| if line.startswith('version='): |
| manifest.close() |
| image_tar.close() |
| return line.split('version=', 1)[1].strip() |
| |
| # If we didn't find the version line, print an error and return false. |
| manifest.close() |
| image_tar.close() |
| raise Exception("Could not find version line in image manifest") |
| |
| def __wait_for_image_id(self, image_version): |
| # Try 8 times, once every 15 seconds. |
| for attempt in range(8): |
| software = self.list_sfw() |
| # Look for our the image with the given version in software |
| for path in software: |
| image = self.get_image(path) |
| if 'Version' in image and image_version == image['Version']: |
| return path.split('/')[-1] |
| time.sleep(15) |
| return False |
| |
| def __wait_for_activation(self, image_id): |
| # Keep waiting until the image is active or activation fails. |
| active = False |
| while not active: |
| image = self.get_image("/xyz/openbmc_project/software/" + image_id) |
| if 'xyz.openbmc_project.Software.Activation.Activations.Active' \ |
| == image['Activation']: |
| print 'Activation Progress: 100%' |
| active = True |
| elif 'xyz.openbmc_project.Software.Activation.Activations.Activating' \ |
| == image['Activation']: |
| print 'Activation Progress: ' + str(image['Progress']) + '%' |
| else: |
| raise Exception("Image activation failed. The BMC has set " \ |
| + "the'Activation' property to " + image['Activation']) |
| time.sleep(15) |
| |
| def do_list_sfw(args): |
| s = BMC(server=args.server) |
| for e in s.list_sfw(): |
| info = s.get_image(e) |
| print(e) |
| print json.dumps(info, indent=4) |
| |
| def do_view_image(args): |
| s = BMC(server=args.server) |
| print json.dumps(s.get_image(args.image), indent=4) |
| |
| def do_upload_image(args): |
| s = BMC(server=args.server) |
| s.upload_image(args.image) |
| |
| def do_activate_image(args): |
| s = BMC(server=args.server) |
| s.activate_image(args.image_id) |
| |
| def do_update_auto(args): |
| s = BMC(server=args.server) |
| s.update_auto(args.image, args.reboot) |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--server', help='hostname or IP of BMC', type=str, |
| required=True) |
| |
| subparsers = parser.add_subparsers() |
| list_events = subparsers.add_parser('list', help='List all software images on BMC') |
| list_events.set_defaults(func=do_list_sfw) |
| |
| image_view = subparsers.add_parser('view', help='View info of input image') |
| image_view.add_argument('image', help='The image to analyze') |
| image_view.set_defaults(func=do_view_image) |
| |
| image_upload = subparsers.add_parser('upload', help='Upload input image') |
| image_upload.add_argument('image', help='The image to upload') |
| image_upload.set_defaults(func=do_upload_image) |
| |
| image_activate = subparsers.add_parser('activate', help='Activate input image id') |
| image_activate.add_argument('image_id', help='The image id to activate') |
| image_activate.set_defaults(func=do_activate_image) |
| |
| image_update_auto = subparsers.add_parser('update_auto', help='Upload and ' |
| + 'activate an image, and then reboot the BMC to apply the update ' |
| + 'if necessary') |
| image_update_auto.add_argument('image', help='The image to update to') |
| image_update_auto.add_argument( |
| '--reboot', |
| action='store_true', |
| default=False, |
| help='Set if the BMC should reboot after the update') |
| image_update_auto.set_defaults(func=do_update_auto) |
| |
| args = parser.parse_args() |
| |
| if 'func' in args: |
| args.func(args) |
| else: |
| parser.print_help() |