blob: 0aa40a9df3a86cd2154e2610ca976301eb379a55 [file] [log] [blame]
import sys
import dbus
import argparse
from dbus.mainloop.glib import DBusGMainLoop
import gobject
import json
import os
import signal
import time
from glob import glob
from os.path import join
from subprocess import Popen
import obmc_system_config
descriptors = {
'power': {
'bus_name': 'org.openbmc.control.Power',
'object_name': '/org/openbmc/control/power0',
'interface_name': 'org.openbmc.control.Power'
'chassison': {
'bus_name': 'xyz.openbmc_project.State.Chassis',
'object_name': '/xyz/openbmc_project/state/chassis0',
'interface_name': 'xyz.openbmc_project.State.Chassis',
'property': 'RequestedPowerTransition',
'value': 'xyz.openbmc_project.State.Chassis.Transition.On',
'monitor': '',
'chassisoff': {
'bus_name': 'xyz.openbmc_project.State.Chassis',
'object_name': '/xyz/openbmc_project/state/chassis0',
'interface_name': 'xyz.openbmc_project.State.Chassis',
'property': 'RequestedPowerTransition',
'value': 'xyz.openbmc_project.State.Chassis.Transition.Off',
'monitor': '',
'poweron': {
'bus_name': 'xyz.openbmc_project.State.Host',
'object_name': '/xyz/openbmc_project/state/host0',
'interface_name': 'xyz.openbmc_project.State.Host',
'property': 'RequestedHostTransition',
'value': 'xyz.openbmc_project.State.Host.Transition.On',
'monitor': '',
'poweroff': {
'bus_name': 'xyz.openbmc_project.State.Host',
'object_name': '/xyz/openbmc_project/state/host0',
'interface_name': 'xyz.openbmc_project.State.Host',
'property': 'RequestedHostTransition',
'value': 'xyz.openbmc_project.State.Host.Transition.Off',
'monitor': '',
'bmcstate': {
'bus_name': 'xyz.openbmc_project.State.BMC',
'object_name': '/xyz/openbmc_project/state/bmc0',
'interface_name': 'xyz.openbmc_project.State.BMC',
'property': 'CurrentBMCState',
'chassisstate': {
'bus_name': 'xyz.openbmc_project.State.Chassis',
'object_name': '/xyz/openbmc_project/state/chassis0',
'interface_name': 'xyz.openbmc_project.State.Chassis',
'property': 'CurrentPowerState',
'hoststate': {
'bus_name': 'xyz.openbmc_project.State.Host',
'object_name': '/xyz/openbmc_project/state/host0',
'interface_name': 'xyz.openbmc_project.State.Host',
'property': 'CurrentHostState',
'bootprogress': {
'bus_name': 'xyz.openbmc_project.State.Host',
'object_name': '/xyz/openbmc_project/state/host0',
'interface_name': 'xyz.openbmc_project.State.Boot.Progress',
'property': 'BootProgress',
'state' : ['bmcstate', 'chassisstate', 'hoststate'],
'status' : ['bmcstate', 'chassisstate', 'hoststate'],
GPIO_DEFS_FILE = '/etc/default/obmc/gpio/gpio_defs.json'
def find_gpio_base(path="/sys/class/gpio/"):
pattern = "gpiochip*"
for gc in glob(join(path, pattern)):
with open(join(gc, "label")) as f:
label = f.readline().strip()
if label == "1e780000.gpio":
with open(join(gc, "base")) as f:
return int(f.readline().strip())
# trigger a file not found exception
open(join(path, "gpiochip"))
GPIO_BASE = find_gpio_base()
def convertGpio(name):
offset = int(''.join(list(filter(str.isdigit, name))))
port = list(filter(str.isalpha, name.upper()))
a = ord(port[-1]) - ord('A')
if len(port) > 1:
a += 26
base = a * 8 + GPIO_BASE
return base + offset
def run_set_property(dbus_bus, dbus_iface, descriptor, args):
mainloop = gobject.MainLoop()
iface = descriptor['interface_name']
prop = descriptor['property']
if 'monitor' not in descriptor:
dbus_iface.Set(iface, prop, descriptor['value'])
return True
def property_listener(job, path, unit, state):
if descriptor['monitor'] != unit:
property_listener.success = (state == 'done')
property_listener.success = True
if args.wait and args.verbose:
pid = Popen(["/bin/journalctl", "-f", "--no-pager"]).pid
if args.wait:
sig_match = dbus_bus.add_signal_receiver(property_listener, "JobRemoved")
dbus_iface.Set(iface, prop, descriptor['value'])
if args.wait:
if args.wait and args.verbose:
# wait some time for the journal output
os.kill(pid, signal.SIGTERM)
return property_listener.success
def get_dbus_obj(dbus_bus, bus, obj, args):
if not args.wait:
return dbus_bus.get_object(bus, obj)
mainloop = gobject.MainLoop()
def property_listener(job, path, unit, state):
if '' == unit:
sig_match = dbus_bus.add_signal_receiver(property_listener, "JobRemoved")
return dbus_bus.get_object(bus, obj)
except dbus.exceptions.DBusException as e:
if args.verbose:
pid = Popen(["/bin/journalctl", "-f", "--no-pager"]).pid
if args.verbose:
os.kill(pid, signal.SIGTERM)
return dbus_bus.get_object(bus, obj)
def run_one_command(dbus_bus, descriptor, args):
bus = descriptor['bus_name']
obj = descriptor['object_name']
iface = descriptor['interface_name']
dbus_obj = get_dbus_obj(dbus_bus, bus, obj, args)
result = None
if 'property' in descriptor:
dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus.Properties")
if 'value' in descriptor:
result = run_set_property(dbus_bus, dbus_iface, descriptor, args)
prop = descriptor['property']
dbus_prop = dbus_iface.Get(iface, prop)
print '{:<20}: {}'.format(prop, str(dbus_prop))
result = True
dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus.Properties")
props = dbus_iface.GetAll(iface)
for p in props:
print "{} = {}".format(p, str(props[p]))
result = True
return result
def run_all_commands(dbus_bus, recipe, args):
if isinstance(recipe, dict):
return run_one_command(dbus_bus, recipe, args)
assert isinstance(recipe, list)
for command in recipe:
descriptor = descriptors[command]
if not run_one_command(dbus_bus, descriptor, args):
print "Failed to execute command: {}".format(descriptor)
return False
return True
def gpio_set_value(gpio_name, active_low, asserted):
gpio_id = convertGpio(gpio_name)
gpio_value_path = "/sys/class/gpio/gpio{}/value".format(gpio_id)
with open(gpio_value_path, 'w') as gpio:
# Inversion behaviour needs to change with the resolution of
#, where properly
# configuring the kernel will allow it to handle the inversion for us.
gpio.write(str(int(asserted ^ active_low)))
return True
def gpio_deassert(gpio_name, active_low, args):
# Deal with silly python2 exception handling as outlined in main
if args.verbose:
return gpio_set_value(gpio_name, active_low, False)
return gpio_set_value(gpio_name, active_low, False)
except IOError as e:
print >> sys.stderr, "Failed to access GPIO {}: {}".format(gpio_name, e.message)
return False
def run_chassiskill(args):
# We shouldn't be able to invoke run_chassiskill() unless it's been
# explicitly added as a valid command to argparse in main()
assert can_chassiskill()
data = {}
with open(GPIO_DEFS_FILE, 'r') as json_input:
data = json.load(json_input)
gpios = data['gpio_configs']['power_config']['power_up_outs']
defs = data['gpio_definitions']
for gpio in gpios:
definition = filter(lambda g: g['name'] == gpio['name'], defs)
if len(definition) == 0:
print >> sys.stderr, "Missing definition for GPIO {}".format(gpio['name'])
pin = str(definition[0]['pin'])
active_low = not gpio['polarity']
if not gpio_deassert(pin, active_low, args):
return False
return True
def can_chassiskill():
with open(GPIO_DEFS_FILE, 'r') as json_input:
data = json.load(json_input)
gpios = data['gpio_configs']['power_config']['power_up_outs']
return len(gpios) > 0
return False
def main():
# Conditionally add the `chassiskill` command based on whether the
# required GPIO configuration is present in the system description.
if can_chassiskill():
descriptors['chassiskill'] = None
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='store_true',
help="Verbose output")
parser.add_argument('--wait', '-w', action='store_true',
help='Block until the state transition succeeds or fails')
parser.add_argument('--wait-tune', '-t', nargs='?', default=8, type=float,
# help='Seconds to wait for journal output to complete after receiving DBus signal',
parser.add_argument('recipe', choices=sorted(descriptors.keys()))
args = parser.parse_args()
# This is a special case: directly pull the power, don't do any D-Bus
# related stuff
if args.recipe == "chassiskill":
return run_chassiskill(args)
dbus_bus = dbus.SystemBus()
# The only way to get a sensible backtrace with python 2 without stupid
# hoops is to let the uncaught exception handler do the work for you.
# Catching and binding an exception appears to overwrite the stack trace at
# the point of bind.
# So, if we're in verbose mode, don't try to catch the DBus exception. That
# way we can understand where it originated.
if args.verbose:
return run_all_commands(dbus_bus, descriptors[args.recipe], args)
# Otherwise, we don't care about the traceback. Just catch it and print the
# error message.
return run_all_commands(dbus_bus, descriptors[args.recipe], args)
except dbus.exceptions.DBusException as e:
print >> sys.stderr, "DBus error occurred: {}".format(e.get_dbus_message())
if __name__ == "__main__":
sys.exit(0 if main() else 1)