blob: 555027730b34c1ef0d88a09cb00a5c45a7085e73 [file] [log] [blame]
#!/usr/bin/env python3
r"""
This module is the python counterpart to obmc_boot_test.
"""
import glob
import importlib.util
import os
import random
import re
import signal
import time
try:
import cPickle as pickle
except ImportError:
import pickle
import socket
import gen_arg as ga
import gen_cmd as gc
import gen_misc as gm
import gen_plug_in_utils as gpu
import gen_print as gp
import gen_robot_keyword as grk
import gen_robot_plug_in as grpi
import gen_valid as gv
import logging_utils as log
import pel_utils as pel
import state as st
import var_stack as vs
from boot_data import *
from robot.libraries.BuiltIn import BuiltIn
from robot.utils import DotDict
base_path = (
os.path.dirname(
os.path.dirname(importlib.util.find_spec("gen_robot_print").origin)
)
+ os.sep
)
sys.path.append(base_path + "extended/")
import run_keyword as rk # NOQA
# Setting master_pid correctly influences the behavior of plug-ins like
# DB_Logging
program_pid = os.getpid()
master_pid = os.environ.get("AUTOBOOT_MASTER_PID", program_pid)
pgm_name = re.sub("\\.py$", "", os.path.basename(__file__))
# Set up boot data structures.
os_host = BuiltIn().get_variable_value("${OS_HOST}", default="")
boot_lists = read_boot_lists()
# The maximum number of entries that can be in the boot_history global variable.
max_boot_history = 10
boot_history = []
state = st.return_state_constant("default_state")
cp_setup_called = 0
next_boot = ""
base_tool_dir_path = (
os.path.normpath(os.environ.get("AUTOBOOT_BASE_TOOL_DIR_PATH", "/tmp"))
+ os.sep
)
ffdc_dir_path = os.path.normpath(os.environ.get("FFDC_DIR_PATH", "")) + os.sep
boot_success = 0
status_dir_path = os.environ.get(
"STATUS_DIR_PATH", ""
) or BuiltIn().get_variable_value("${STATUS_DIR_PATH}", default="")
if status_dir_path != "":
status_dir_path = os.path.normpath(status_dir_path) + os.sep
# For plugin expecting env gen_call_robot.py
os.environ["STATUS_DIR_PATH"] = status_dir_path
redfish_support_trans_state = int(
os.environ.get("REDFISH_SUPPORT_TRANS_STATE", 0)
) or int(
BuiltIn().get_variable_value("${REDFISH_SUPPORT_TRANS_STATE}", default=0)
)
redfish_supported = BuiltIn().get_variable_value(
"${REDFISH_SUPPORTED}", default=False
)
redfish_rest_supported = BuiltIn().get_variable_value(
"${REDFISH_REST_SUPPORTED}", default=False
)
redfish_delete_sessions = int(
BuiltIn().get_variable_value("${REDFISH_DELETE_SESSIONS}", default=1)
)
if redfish_supported:
redfish = BuiltIn().get_library_instance("redfish")
default_power_on = "Redfish Power On"
default_power_off = "Redfish Power Off"
if not redfish_support_trans_state:
delete_errlogs_cmd = "Delete Error Logs ${quiet}=${1}"
delete_bmcdump_cmd = "Delete All BMC Dump"
default_set_power_policy = "Set BMC Power Policy ALWAYS_POWER_OFF"
else:
delete_errlogs_cmd = "Redfish Purge Event Log"
delete_bmcdump_cmd = "Redfish Delete All BMC Dumps"
delete_sysdump_cmd = "Redfish Delete All System Dumps"
default_set_power_policy = (
"Redfish Set Power Restore Policy AlwaysOff"
)
else:
default_power_on = "REST Power On"
default_power_off = "REST Power Off"
delete_errlogs_cmd = "Delete Error Logs ${quiet}=${1}"
delete_bmcdump_cmd = "Delete All BMC Dump"
default_set_power_policy = "Set BMC Power Policy ALWAYS_POWER_OFF"
boot_count = 0
LOG_LEVEL = BuiltIn().get_variable_value("${LOG_LEVEL}")
AUTOBOOT_FFDC_PREFIX = os.environ.get("AUTOBOOT_FFDC_PREFIX", "")
ffdc_prefix = AUTOBOOT_FFDC_PREFIX
boot_start_time = ""
boot_end_time = ""
save_stack = vs.var_stack("save_stack")
main_func_parm_list = ["boot_stack", "stack_mode", "quiet"]
def dump_ffdc_rc():
r"""
Return the constant dump ffdc test return code value.
When a plug-in call point program returns this value, it indicates that
this program should collect FFDC.
"""
return 0x00000200
def stop_test_rc():
r"""
Return the constant stop test return code value.
When a plug-in call point program returns this value, it indicates that
this program should stop running.
"""
return 0x00000200
def process_host(host, host_var_name=""):
r"""
Process a host by getting the associated host name and IP address and
setting them in global variables.
If the caller does not pass the host_var_name, this function will try to
figure out the name of the variable used by the caller for the host parm.
Callers are advised to explicitly specify the host_var_name when calling
with an exec command. In such cases, the get_arg_name cannot figure out
the host variable name.
This function will then create similar global variable names by
removing "_host" and appending "_host_name" or "_ip" to the host variable
name.
Example:
If a call is made like this:
process_host(openbmc_host)
Global variables openbmc_host_name and openbmc_ip will be set.
Description of argument(s):
host A host name or IP. The name of the variable used should
have a suffix of "_host".
host_var_name The name of the variable being used as the host parm.
"""
if host_var_name == "":
host_var_name = gp.get_arg_name(0, 1, stack_frame_ix=2)
host_name_var_name = re.sub("host", "host_name", host_var_name)
ip_var_name = re.sub("host", "ip", host_var_name)
cmd_buf = (
"global "
+ host_name_var_name
+ ", "
+ ip_var_name
+ " ; "
+ host_name_var_name
+ ", "
+ ip_var_name
+ " = gm.get_host_name_ip('"
+ host
+ "')"
)
exec(cmd_buf)
def process_pgm_parms():
r"""
Process the program parameters by assigning them all to corresponding
globals. Also, set some global values that depend on program parameters.
"""
# Program parameter processing.
# Assign all program parms to python variables which are global to this
# module.
global parm_list
parm_list = BuiltIn().get_variable_value("${parm_list}")
# The following subset of parms should be processed as integers.
int_list = [
"max_num_tests",
"boot_pass",
"boot_fail",
"ffdc_only",
"boot_fail_threshold",
"delete_errlogs",
"call_post_stack_plug",
"do_pre_boot_plug_in_setup",
"quiet",
"test_mode",
"debug",
]
for parm in parm_list:
if parm in int_list:
sub_cmd = (
'int(BuiltIn().get_variable_value("${' + parm + '}", "0"))'
)
else:
sub_cmd = 'BuiltIn().get_variable_value("${' + parm + '}")'
cmd_buf = "global " + parm + " ; " + parm + " = " + sub_cmd
gp.dpissuing(cmd_buf)
exec(cmd_buf)
if re.match(r".*_host$", parm):
cmd_buf = "process_host(" + parm + ", '" + parm + "')"
exec(cmd_buf)
if re.match(r".*_password$", parm):
# Register the value of any parm whose name ends in _password.
# This will cause the print functions to replace passwords with
# asterisks in the output.
cmd_buf = "gp.register_passwords(" + parm + ")"
exec(cmd_buf)
global ffdc_dir_path_style
global boot_list
global boot_stack
global boot_results_file_path
global boot_results
global boot_history
global ffdc_list_file_path
global ffdc_report_list_path
global ffdc_summary_list_path
global boot_table
global valid_boot_types
if ffdc_dir_path_style == "":
ffdc_dir_path_style = int(os.environ.get("FFDC_DIR_PATH_STYLE", "0"))
# Convert these program parms to lists for easier processing..
boot_list = list(filter(None, boot_list.split(":")))
boot_stack = list(filter(None, boot_stack.split(":")))
boot_table = create_boot_table(boot_table_path, os_host=os_host)
valid_boot_types = create_valid_boot_list(boot_table)
cleanup_boot_results_file()
boot_results_file_path = create_boot_results_file_path(
pgm_name, openbmc_nickname, master_pid
)
if os.path.isfile(boot_results_file_path):
# We've been called before in this run so we'll load the saved
# boot_results and boot_history objects.
boot_results, boot_history = pickle.load(
open(boot_results_file_path, "rb")
)
else:
boot_results = boot_results(boot_table, boot_pass, boot_fail)
ffdc_list_file_path = (
base_tool_dir_path + openbmc_nickname + "/FFDC_FILE_LIST"
)
ffdc_report_list_path = (
base_tool_dir_path + openbmc_nickname + "/FFDC_REPORT_FILE_LIST"
)
ffdc_summary_list_path = (
base_tool_dir_path + openbmc_nickname + "/FFDC_SUMMARY_FILE_LIST"
)
def initial_plug_in_setup():
r"""
Initialize all plug-in environment variables which do not change for the
duration of the program.
"""
global LOG_LEVEL
BuiltIn().set_log_level("NONE")
BuiltIn().set_global_variable("${master_pid}", master_pid)
BuiltIn().set_global_variable("${FFDC_DIR_PATH}", ffdc_dir_path)
BuiltIn().set_global_variable("${STATUS_DIR_PATH}", status_dir_path)
BuiltIn().set_global_variable("${BASE_TOOL_DIR_PATH}", base_tool_dir_path)
BuiltIn().set_global_variable(
"${FFDC_LIST_FILE_PATH}", ffdc_list_file_path
)
BuiltIn().set_global_variable(
"${FFDC_REPORT_LIST_PATH}", ffdc_report_list_path
)
BuiltIn().set_global_variable(
"${FFDC_SUMMARY_LIST_PATH}", ffdc_summary_list_path
)
BuiltIn().set_global_variable(
"${FFDC_DIR_PATH_STYLE}", ffdc_dir_path_style
)
BuiltIn().set_global_variable("${FFDC_CHECK}", ffdc_check)
# For each program parameter, set the corresponding AUTOBOOT_ environment
# variable value. Also, set an AUTOBOOT_ environment variable for every
# element in additional_values.
additional_values = [
"program_pid",
"master_pid",
"ffdc_dir_path",
"status_dir_path",
"base_tool_dir_path",
"ffdc_list_file_path",
"ffdc_report_list_path",
"ffdc_summary_list_path",
"execdir",
"redfish_supported",
"redfish_rest_supported",
"redfish_support_trans_state",
]
plug_in_vars = parm_list + additional_values
for var_name in plug_in_vars:
var_value = BuiltIn().get_variable_value("${" + var_name + "}")
var_name = var_name.upper()
if var_value is None:
var_value = ""
os.environ["AUTOBOOT_" + var_name] = str(var_value)
BuiltIn().set_log_level(LOG_LEVEL)
# Make sure the ffdc list directory exists.
ffdc_list_dir_path = os.path.dirname(ffdc_list_file_path) + os.sep
if not os.path.exists(ffdc_list_dir_path):
os.makedirs(ffdc_list_dir_path)
def plug_in_setup():
r"""
Initialize all changing plug-in environment variables for use by the
plug-in programs.
"""
global LOG_LEVEL
global test_really_running
BuiltIn().set_log_level("NONE")
boot_pass, boot_fail = boot_results.return_total_pass_fail()
if boot_pass > 1:
test_really_running = 1
else:
test_really_running = 0
BuiltIn().set_global_variable(
"${test_really_running}", test_really_running
)
BuiltIn().set_global_variable("${boot_type_desc}", next_boot)
BuiltIn().set_global_variable("${boot_pass}", boot_pass)
BuiltIn().set_global_variable("${boot_fail}", boot_fail)
BuiltIn().set_global_variable("${boot_success}", boot_success)
BuiltIn().set_global_variable("${ffdc_prefix}", ffdc_prefix)
BuiltIn().set_global_variable("${boot_start_time}", boot_start_time)
BuiltIn().set_global_variable("${boot_end_time}", boot_end_time)
# For each program parameter, set the corresponding AUTOBOOT_ environment
# variable value. Also, set an AUTOBOOT_ environment variable for every
# element in additional_values.
additional_values = [
"boot_type_desc",
"boot_success",
"boot_pass",
"boot_fail",
"test_really_running",
"ffdc_prefix",
"boot_start_time",
"boot_end_time",
]
plug_in_vars = additional_values
for var_name in plug_in_vars:
var_value = BuiltIn().get_variable_value("${" + var_name + "}")
var_name = var_name.upper()
if var_value is None:
var_value = ""
os.environ["AUTOBOOT_" + var_name] = str(var_value)
if debug:
shell_rc, out_buf = gc.cmd_fnc_u(
"printenv | egrep AUTOBOOT_ | sort -u"
)
BuiltIn().set_log_level(LOG_LEVEL)
def pre_boot_plug_in_setup():
# Clear the ffdc_list_file_path file. Plug-ins may now write to it.
try:
os.remove(ffdc_list_file_path)
except OSError:
pass
# Clear the ffdc_report_list_path file. Plug-ins may now write to it.
try:
os.remove(ffdc_report_list_path)
except OSError:
pass
# Clear the ffdc_summary_list_path file. Plug-ins may now write to it.
try:
os.remove(ffdc_summary_list_path)
except OSError:
pass
global ffdc_prefix
seconds = time.time()
loc_time = time.localtime(seconds)
time_string = time.strftime("%y%m%d.%H%M%S.", loc_time)
ffdc_prefix = openbmc_nickname + "." + time_string
def default_sigusr1(signal_number=0, frame=None):
r"""
Handle SIGUSR1 by doing nothing.
This function assists in debugging SIGUSR1 processing by printing messages
to stdout and to the log.html file.
Description of argument(s):
signal_number The signal number (should always be 10 for SIGUSR1).
frame The frame data.
"""
gp.qprintn()
gp.qprint_executing()
gp.lprint_executing()
def set_default_siguser1():
r"""
Set the default_sigusr1 function to be the SIGUSR1 handler.
"""
gp.qprintn()
gp.qprint_executing()
gp.lprint_executing()
signal.signal(signal.SIGUSR1, default_sigusr1)
def setup():
r"""
Do general program setup tasks.
"""
global cp_setup_called
global transitional_boot_selected
gp.qprintn()
if redfish_supported:
redfish.login()
set_default_siguser1()
transitional_boot_selected = False
robot_pgm_dir_path = os.path.dirname(__file__) + os.sep
repo_bin_path = robot_pgm_dir_path.replace("/lib/", "/bin/")
# If we can't find process_plug_in_packages.py, ssh_pw or
# validate_plug_ins.py, then we don't have our repo bin in PATH.
shell_rc, out_buf = gc.cmd_fnc_u(
"which process_plug_in_packages.py" + " ssh_pw validate_plug_ins.py",
quiet=1,
print_output=0,
show_err=0,
)
if shell_rc != 0:
os.environ["PATH"] = repo_bin_path + ":" + os.environ.get("PATH", "")
# Likewise, our repo lib subdir needs to be in sys.path and PYTHONPATH.
if robot_pgm_dir_path not in sys.path:
sys.path.append(robot_pgm_dir_path)
PYTHONPATH = os.environ.get("PYTHONPATH", "")
if PYTHONPATH == "":
os.environ["PYTHONPATH"] = robot_pgm_dir_path
else:
os.environ["PYTHONPATH"] = robot_pgm_dir_path + ":" + PYTHONPATH
validate_parms()
gp.qprint_pgm_header()
grk.run_key_u(default_set_power_policy, ignore=1)
initial_plug_in_setup()
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="setup"
)
if rc != 0:
error_message = "Plug-in setup failed.\n"
gp.print_error_report(error_message)
BuiltIn().fail(error_message)
# Setting cp_setup_called lets our Teardown know that it needs to call
# the cleanup plug-in call point.
cp_setup_called = 1
# Keyword "FFDC" will fail if TEST_MESSAGE is not set.
BuiltIn().set_global_variable("${TEST_MESSAGE}", "${EMPTY}")
# FFDC_LOG_PATH is used by "FFDC" keyword.
BuiltIn().set_global_variable("${FFDC_LOG_PATH}", ffdc_dir_path)
# Also printed by FFDC.
global host_name
global host_ip
host = socket.gethostname()
host_name, host_ip = gm.get_host_name_ip(host)
gp.dprint_var(boot_table)
gp.dprint_var(boot_lists)
def validate_parms():
r"""
Validate all program parameters.
"""
process_pgm_parms()
gp.qprintn()
global openbmc_model
if openbmc_model == "":
status, ret_values = grk.run_key_u("Get BMC System Model", ignore=1)
# Set the model to default "OPENBMC" if getting it from BMC fails.
if status == "FAIL":
openbmc_model = "OPENBMC"
else:
openbmc_model = ret_values
BuiltIn().set_global_variable("${openbmc_model}", openbmc_model)
gv.set_exit_on_error(True)
gv.valid_value(openbmc_host)
gv.valid_value(openbmc_username)
gv.valid_value(openbmc_password)
gv.valid_value(rest_username)
gv.valid_value(rest_password)
gv.valid_value(ipmi_username)
gv.valid_value(ipmi_password)
if os_host != "":
gv.valid_value(os_username)
gv.valid_value(os_password)
if pdu_host != "":
gv.valid_value(pdu_username)
gv.valid_value(pdu_password)
gv.valid_integer(pdu_slot_no)
if openbmc_serial_host != "":
gv.valid_integer(openbmc_serial_port)
gv.valid_value(openbmc_model)
gv.valid_integer(max_num_tests)
gv.valid_integer(boot_pass)
gv.valid_integer(boot_fail)
plug_in_packages_list = grpi.rvalidate_plug_ins(plug_in_dir_paths)
BuiltIn().set_global_variable(
"${plug_in_packages_list}", plug_in_packages_list
)
gv.valid_value(stack_mode, valid_values=["normal", "skip"])
gv.set_exit_on_error(False)
if len(boot_list) == 0 and len(boot_stack) == 0 and not ffdc_only:
error_message = (
"You must provide either a value for either the"
+ " boot_list or the boot_stack parm.\n"
)
BuiltIn().fail(gp.sprint_error(error_message))
valid_boot_list(boot_list, valid_boot_types)
valid_boot_list(boot_stack, valid_boot_types)
selected_PDU_boots = list(
set(boot_list + boot_stack) & set(boot_lists["PDU_reboot"])
)
if len(selected_PDU_boots) > 0 and pdu_host == "":
error_message = (
"You have selected the following boots which"
+ " require a PDU host but no value for pdu_host:\n"
)
error_message += gp.sprint_var(selected_PDU_boots)
error_message += gp.sprint_var(pdu_host, fmt=gp.blank())
BuiltIn().fail(gp.sprint_error(error_message))
return
def my_get_state():
r"""
Get the system state plus a little bit of wrapping.
"""
global state
req_states = ["epoch_seconds"] + st.default_req_states
gp.qprint_timen("Getting system state.")
if test_mode:
state["epoch_seconds"] = int(time.time())
else:
state = st.get_state(req_states=req_states, quiet=quiet)
gp.qprint_var(state)
def valid_state():
r"""
Verify that our state dictionary contains no blank values. If we don't get
valid state data, we cannot continue to work.
"""
if st.compare_states(state, st.invalid_state_match, "or"):
error_message = (
"The state dictionary contains blank fields which"
+ " is illegal.\n"
+ gp.sprint_var(state)
)
BuiltIn().fail(gp.sprint_error(error_message))
def select_boot():
r"""
Select a boot test to be run based on our current state and return the
chosen boot type.
Description of arguments:
state The state of the machine.
"""
global transitional_boot_selected
global boot_stack
gp.qprint_timen("Selecting a boot test.")
if transitional_boot_selected and not boot_success:
prior_boot = next_boot
boot_candidate = boot_stack.pop()
gp.qprint_timen(
"The prior '"
+ next_boot
+ "' was chosen to"
+ " transition to a valid state for '"
+ boot_candidate
+ "' which was at the top of the boot_stack. Since"
+ " the '"
+ next_boot
+ "' failed, the '"
+ boot_candidate
+ "' has been removed from the stack"
+ " to avoid and endless failure loop."
)
if len(boot_stack) == 0:
return ""
my_get_state()
valid_state()
transitional_boot_selected = False
stack_popped = 0
if len(boot_stack) > 0:
stack_popped = 1
gp.qprint_dashes()
gp.qprint_var(boot_stack)
gp.qprint_dashes()
skip_boot_printed = 0
while len(boot_stack) > 0:
boot_candidate = boot_stack.pop()
if stack_mode == "normal":
break
else:
if st.compare_states(state, boot_table[boot_candidate]["end"]):
if not skip_boot_printed:
gp.qprint_var(stack_mode)
gp.qprintn()
gp.qprint_timen(
"Skipping the following boot tests"
+ " which are unnecessary since their"
+ " required end states match the"
+ " current machine state:"
)
skip_boot_printed = 1
gp.qprint_var(boot_candidate)
boot_candidate = ""
if boot_candidate == "":
gp.qprint_dashes()
gp.qprint_var(boot_stack)
gp.qprint_dashes()
return boot_candidate
if st.compare_states(state, boot_table[boot_candidate]["start"]):
gp.qprint_timen(
"The machine state is valid for a '"
+ boot_candidate
+ "' boot test."
)
gp.qprint_dashes()
gp.qprint_var(boot_stack)
gp.qprint_dashes()
return boot_candidate
else:
gp.qprint_timen(
"The machine state does not match the required"
+ " starting state for a '"
+ boot_candidate
+ "' boot test:"
)
gp.qprint_varx(
"boot_table_start_entry", boot_table[boot_candidate]["start"]
)
boot_stack.append(boot_candidate)
transitional_boot_selected = True
popped_boot = boot_candidate
# Loop through your list selecting a boot_candidates
boot_candidates = []
for boot_candidate in boot_list:
if st.compare_states(state, boot_table[boot_candidate]["start"]):
if stack_popped:
if st.compare_states(
boot_table[boot_candidate]["end"],
boot_table[popped_boot]["start"],
):
boot_candidates.append(boot_candidate)
else:
boot_candidates.append(boot_candidate)
if len(boot_candidates) == 0:
gp.qprint_timen(
"The user's boot list contained no boot tests"
+ " which are valid for the current machine state."
)
boot_candidate = default_power_on
if not st.compare_states(state, boot_table[default_power_on]["start"]):
boot_candidate = default_power_off
boot_candidates.append(boot_candidate)
gp.qprint_timen(
"Using default '"
+ boot_candidate
+ "' boot type to transition to valid state."
)
gp.dprint_var(boot_candidates)
# Randomly select a boot from the candidate list.
boot = random.choice(boot_candidates)
return boot
def print_defect_report(ffdc_file_list):
r"""
Print a defect report.
Description of argument(s):
ffdc_file_list A list of files which were collected by our ffdc functions.
"""
# Making deliberate choice to NOT run plug_in_setup(). We don't want
# ffdc_prefix updated.
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="ffdc_report", stop_on_plug_in_failure=0
)
# Get additional header data which may have been created by ffdc plug-ins.
# Also, delete the individual header files to cleanup.
cmd_buf = (
"file_list=$(cat "
+ ffdc_report_list_path
+ " 2>/dev/null)"
+ ' ; [ ! -z "${file_list}" ] && cat ${file_list}'
+ " 2>/dev/null ; rm -rf ${file_list} 2>/dev/null || :"
)
shell_rc, more_header_info = gc.cmd_fnc_u(
cmd_buf, print_output=0, show_err=0
)
# Get additional summary data which may have been created by ffdc plug-ins.
# Also, delete the individual header files to cleanup.
cmd_buf = (
"file_list=$(cat "
+ ffdc_summary_list_path
+ " 2>/dev/null)"
+ ' ; [ ! -z "${file_list}" ] && cat ${file_list}'
+ " 2>/dev/null ; rm -rf ${file_list} 2>/dev/null || :"
)
shell_rc, ffdc_summary_info = gc.cmd_fnc_u(
cmd_buf, print_output=0, show_err=0
)
# ffdc_list_file_path contains a list of any ffdc files created by plug-
# ins, etc. Read that data into a list.
try:
plug_in_ffdc_list = (
open(ffdc_list_file_path, "r").read().rstrip("\n").split("\n")
)
plug_in_ffdc_list = list(filter(None, plug_in_ffdc_list))
except IOError:
plug_in_ffdc_list = []
# Combine the files from plug_in_ffdc_list with the ffdc_file_list passed
# in. Eliminate duplicates and sort the list.
ffdc_file_list = sorted(set(ffdc_file_list + plug_in_ffdc_list))
if status_file_path != "":
ffdc_file_list.insert(0, status_file_path)
# Convert the list to a printable list.
printable_ffdc_file_list = "\n".join(ffdc_file_list)
# Open ffdc_file_list for writing. We will write a complete list of
# FFDC files to it for possible use by plug-ins like cp_stop_check.
ffdc_list_file = open(ffdc_list_file_path, "w")
ffdc_list_file.write(printable_ffdc_file_list + "\n")
ffdc_list_file.close()
indent = 0
width = 90
linefeed = 1
char = "="
gp.qprintn()
gp.qprint_dashes(indent, width, linefeed, char)
gp.qprintn("Copy this data to the defect:\n")
if len(more_header_info) > 0:
gp.qprintn(more_header_info)
gp.qpvars(
host_name,
host_ip,
openbmc_nickname,
openbmc_host,
openbmc_host_name,
openbmc_ip,
openbmc_username,
openbmc_password,
rest_username,
rest_password,
ipmi_username,
ipmi_password,
os_host,
os_host_name,
os_ip,
os_username,
os_password,
pdu_host,
pdu_host_name,
pdu_ip,
pdu_username,
pdu_password,
pdu_slot_no,
openbmc_serial_host,
openbmc_serial_host_name,
openbmc_serial_ip,
openbmc_serial_port,
)
gp.qprintn()
print_boot_history(boot_history)
gp.qprintn()
gp.qprint_var(state)
gp.qprintn()
gp.qprintn("FFDC data files:")
gp.qprintn(printable_ffdc_file_list)
gp.qprintn()
if len(ffdc_summary_info) > 0:
gp.qprintn(ffdc_summary_info)
gp.qprint_dashes(indent, width, linefeed, char)
def my_ffdc():
r"""
Collect FFDC data.
"""
global state
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="ffdc", stop_on_plug_in_failure=0
)
AUTOBOOT_FFDC_PREFIX = os.environ["AUTOBOOT_FFDC_PREFIX"]
status, ffdc_file_list = grk.run_key_u(
"FFDC ffdc_prefix="
+ AUTOBOOT_FFDC_PREFIX
+ " ffdc_function_list="
+ ffdc_function_list,
ignore=1,
)
if status != "PASS":
gp.qprint_error("Call to ffdc failed.\n")
if type(ffdc_file_list) is not list:
ffdc_file_list = []
# Leave a record for caller that "soft" errors occurred.
soft_errors = 1
gpu.save_plug_in_value(soft_errors, pgm_name)
my_get_state()
print_defect_report(ffdc_file_list)
def print_test_start_message(boot_keyword):
r"""
Print a message indicating what boot test is about to run.
Description of arguments:
boot_keyword The name of the boot which is to be run
(e.g. "BMC Power On").
"""
global boot_history
global boot_start_time
doing_msg = gp.sprint_timen('Doing "' + boot_keyword + '".')
# Set boot_start_time for use by plug-ins.
boot_start_time = doing_msg[1:33]
gp.qprint_var(boot_start_time)
gp.qprint(doing_msg)
update_boot_history(boot_history, doing_msg, max_boot_history)
def stop_boot_test(signal_number=0, frame=None):
r"""
Handle SIGUSR1 by aborting the boot test that is running.
Description of argument(s):
signal_number The signal number (should always be 10 for SIGUSR1).
frame The frame data.
"""
gp.qprintn()
gp.qprint_executing()
gp.lprint_executing()
# Restore original sigusr1 handler.
set_default_siguser1()
message = "The caller has asked that the boot test be stopped and marked"
message += " as a failure."
function_stack = gm.get_function_stack()
if "wait_state" in function_stack:
st.set_exit_wait_early_message(message)
else:
BuiltIn().fail(gp.sprint_error(message))
def run_boot(boot):
r"""
Run the specified boot.
Description of arguments:
boot The name of the boot test to be performed.
"""
global state
signal.signal(signal.SIGUSR1, stop_boot_test)
gp.qprint_timen("stop_boot_test is armed.")
print_test_start_message(boot)
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="pre_boot"
)
if rc != 0:
error_message = (
"Plug-in failed with non-zero return code.\n"
+ gp.sprint_var(rc, fmt=gp.hexa())
)
set_default_siguser1()
BuiltIn().fail(gp.sprint_error(error_message))
if test_mode:
# In test mode, we'll pretend the boot worked by assigning its
# required end state to the default state value.
state = st.strip_anchor_state(boot_table[boot]["end"])
else:
# Assertion: We trust that the state data was made fresh by the
# caller.
gp.qprintn()
if boot_table[boot]["method_type"] == "keyword":
rk.my_run_keywords(
boot_table[boot].get("lib_file_path", ""),
boot_table[boot]["method"],
quiet=quiet,
)
if boot_table[boot]["bmc_reboot"]:
st.wait_for_comm_cycle(int(state["epoch_seconds"]))
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="post_reboot"
)
if rc != 0:
error_message = "Plug-in failed with non-zero return code.\n"
error_message += gp.sprint_var(rc, fmt=gp.hexa())
set_default_siguser1()
BuiltIn().fail(gp.sprint_error(error_message))
else:
match_state = st.anchor_state(state)
del match_state["epoch_seconds"]
# Wait for the state to change in any way.
st.wait_state(
match_state,
wait_time=state_change_timeout,
interval="10 seconds",
invert=1,
)
gp.qprintn()
if boot_table[boot]["end"]["chassis"] == "Off":
boot_timeout = power_off_timeout
else:
boot_timeout = power_on_timeout
st.wait_state(
boot_table[boot]["end"],
wait_time=boot_timeout,
interval="10 seconds",
)
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="post_boot"
)
if rc != 0:
error_message = (
"Plug-in failed with non-zero return code.\n"
+ gp.sprint_var(rc, fmt=gp.hexa())
)
set_default_siguser1()
BuiltIn().fail(gp.sprint_error(error_message))
# Restore original sigusr1 handler.
set_default_siguser1()
def test_loop_body():
r"""
The main loop body for the loop in main_py.
Description of arguments:
boot_count The iteration number (starts at 1).
"""
global boot_count
global state
global next_boot
global boot_success
global boot_end_time
# The flag can be enabled or disabled on the go
redfish_delete_sessions = int(
BuiltIn().get_variable_value("${REDFISH_DELETE_SESSIONS}", default=1)
)
gp.qprintn()
next_boot = select_boot()
if next_boot == "":
return True
boot_count += 1
gp.qprint_timen("Starting boot " + str(boot_count) + ".")
pre_boot_plug_in_setup()
cmd_buf = ["run_boot", next_boot]
boot_status, msg = BuiltIn().run_keyword_and_ignore_error(*cmd_buf)
if boot_status == "FAIL":
gp.qprint(msg)
gp.qprintn()
if boot_status == "PASS":
boot_success = 1
completion_msg = gp.sprint_timen(
'BOOT_SUCCESS: "' + next_boot + '" succeeded.'
)
else:
boot_success = 0
completion_msg = gp.sprint_timen(
'BOOT_FAILED: "' + next_boot + '" failed.'
)
# Set boot_end_time for use by plug-ins.
boot_end_time = completion_msg[1:33]
gp.qprint_var(boot_end_time)
gp.qprint(completion_msg)
boot_results.update(next_boot, boot_status)
plug_in_setup()
# NOTE: A post_test_case call point failure is NOT counted as a boot
# failure.
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="post_test_case", stop_on_plug_in_failure=0
)
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="ffdc_check",
shell_rc=dump_ffdc_rc(),
stop_on_plug_in_failure=1,
stop_on_non_zero_rc=1,
)
if ffdc_check == "All" or shell_rc == dump_ffdc_rc():
status, ret_values = grk.run_key_u("my_ffdc", ignore=1)
if status != "PASS":
gp.qprint_error("Call to my_ffdc failed.\n")
# Leave a record for caller that "soft" errors occurred.
soft_errors = 1
gpu.save_plug_in_value(soft_errors, pgm_name)
if delete_errlogs:
# print error logs before delete
if redfish_support_trans_state:
status, error_logs = grk.run_key_u("Get Redfish Event Logs")
log.print_error_logs(
error_logs, "AdditionalDataURI Message Severity"
)
else:
status, error_logs = grk.run_key_u("Get Error Logs")
log.print_error_logs(error_logs, "AdditionalData Message Severity")
pels = pel.peltool("-l", ignore_err=1)
gp.qprint_var(pels)
# We need to purge error logs between boots or they build up.
grk.run_key(delete_errlogs_cmd, ignore=1)
grk.run_key(delete_bmcdump_cmd, ignore=1)
if redfish_support_trans_state:
grk.run_key(delete_sysdump_cmd, ignore=1)
boot_results.print_report()
gp.qprint_timen("Finished boot " + str(boot_count) + ".")
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="stop_check", shell_rc=stop_test_rc(), stop_on_non_zero_rc=1
)
if shell_rc == stop_test_rc():
message = "Stopping as requested by user.\n"
gp.qprint_time(message)
BuiltIn().fail(message)
# This should help prevent ConnectionErrors.
# Purge all redfish and REST connection sessions.
if redfish_delete_sessions:
grk.run_key_u("Close All Connections", ignore=1)
grk.run_key_u("Delete All Redfish Sessions", ignore=1)
return True
def obmc_boot_test_teardown():
r"""
Clean up after the main keyword.
"""
gp.qprint_executing()
if ga.psutil_imported:
ga.terminate_descendants()
if cp_setup_called:
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="cleanup", stop_on_plug_in_failure=0
)
if "boot_results_file_path" in globals():
# Save boot_results and boot_history objects to a file in case they are
# needed again.
gp.qprint_timen("Saving boot_results to the following path.")
gp.qprint_var(boot_results_file_path)
pickle.dump(
(boot_results, boot_history),
open(boot_results_file_path, "wb"),
pickle.HIGHEST_PROTOCOL,
)
global save_stack
# Restore any global values saved on the save_stack.
for parm_name in main_func_parm_list:
# Get the parm_value if it was saved on the stack.
try:
parm_value = save_stack.pop(parm_name)
except BaseException:
# If it was not saved, no further action is required.
continue
# Restore the saved value.
cmd_buf = (
'BuiltIn().set_global_variable("${' + parm_name + '}", parm_value)'
)
gp.dpissuing(cmd_buf)
exec(cmd_buf)
gp.dprintn(save_stack.sprint_obj())
def test_teardown():
r"""
Clean up after this test case.
"""
gp.qprintn()
gp.qprint_executing()
if ga.psutil_imported:
ga.terminate_descendants()
cmd_buf = [
"Print Error",
"A keyword timeout occurred ending this program.\n",
]
BuiltIn().run_keyword_if_timeout_occurred(*cmd_buf)
if redfish_supported:
redfish.logout()
gp.qprint_pgm_footer()
def post_stack():
r"""
Process post_stack plug-in programs.
"""
if not call_post_stack_plug:
# The caller does not wish to have post_stack plug-in processing done.
return
global boot_success
# NOTE: A post_stack call-point failure is NOT counted as a boot failure.
pre_boot_plug_in_setup()
# For the purposes of the following plug-ins, mark the "boot" as a success.
boot_success = 1
plug_in_setup()
(
rc,
shell_rc,
failed_plug_in_name,
history,
) = grpi.rprocess_plug_in_packages(
call_point="post_stack", stop_on_plug_in_failure=0, return_history=True
)
for doing_msg in history:
update_boot_history(boot_history, doing_msg, max_boot_history)
if rc != 0:
boot_success = 0
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="ffdc_check",
shell_rc=dump_ffdc_rc(),
stop_on_plug_in_failure=1,
stop_on_non_zero_rc=1,
)
if shell_rc == dump_ffdc_rc():
status, ret_values = grk.run_key_u("my_ffdc", ignore=1)
if status != "PASS":
gp.qprint_error("Call to my_ffdc failed.\n")
# Leave a record for caller that "soft" errors occurred.
soft_errors = 1
gpu.save_plug_in_value(soft_errors, pgm_name)
plug_in_setup()
rc, shell_rc, failed_plug_in_name = grpi.rprocess_plug_in_packages(
call_point="stop_check", shell_rc=stop_test_rc(), stop_on_non_zero_rc=1
)
if shell_rc == stop_test_rc():
message = "Stopping as requested by user.\n"
gp.qprint_time(message)
BuiltIn().fail(message)
def obmc_boot_test_py(
loc_boot_stack=None, loc_stack_mode=None, loc_quiet=None
):
r"""
Do main program processing.
"""
global save_stack
ga.set_term_options(
term_requests={"pgm_names": ["process_plug_in_packages.py"]}
)
gp.dprintn()
# Process function parms.
for parm_name in main_func_parm_list:
# Get parm's value.
parm_value = eval("loc_" + parm_name)
gp.dpvars(parm_name, parm_value)
if parm_value is not None:
# Save the global value on a stack.
cmd_buf = (
'save_stack.push(BuiltIn().get_variable_value("${'
+ parm_name
+ '}"), "'
+ parm_name
+ '")'
)
gp.dpissuing(cmd_buf)
exec(cmd_buf)
# Set the global value to the passed value.
cmd_buf = (
'BuiltIn().set_global_variable("${'
+ parm_name
+ '}", loc_'
+ parm_name
+ ")"
)
gp.dpissuing(cmd_buf)
exec(cmd_buf)
gp.dprintn(save_stack.sprint_obj())
setup()
init_boot_pass, init_boot_fail = boot_results.return_total_pass_fail()
if ffdc_only:
gp.qprint_timen("Caller requested ffdc_only.")
if do_pre_boot_plug_in_setup:
pre_boot_plug_in_setup()
grk.run_key_u("my_ffdc")
return
if delete_errlogs:
# print error logs before delete
if redfish_support_trans_state:
status, error_logs = grk.run_key_u("Get Redfish Event Logs")
log.print_error_logs(
error_logs, "AdditionalDataURI Message Severity"
)
else:
status, error_logs = grk.run_key_u("Get Error Logs")
log.print_error_logs(error_logs, "AdditionalData Message Severity")
pels = pel.peltool("-l", ignore_err=1)
gp.qprint_var(pels)
# Delete errlogs prior to doing any boot tests.
grk.run_key(delete_errlogs_cmd, ignore=1)
grk.run_key(delete_bmcdump_cmd, ignore=1)
if redfish_support_trans_state:
grk.run_key(delete_sysdump_cmd, ignore=1)
# Process caller's boot_stack.
while len(boot_stack) > 0:
test_loop_body()
gp.qprint_timen("Finished processing stack.")
post_stack()
# Process caller's boot_list.
if len(boot_list) > 0:
for ix in range(1, max_num_tests + 1):
test_loop_body()
gp.qprint_timen("Completed all requested boot tests.")
boot_pass, boot_fail = boot_results.return_total_pass_fail()
new_fail = boot_fail - init_boot_fail
if new_fail > boot_fail_threshold:
error_message = (
"Boot failures exceed the boot failure"
+ " threshold:\n"
+ gp.sprint_var(new_fail)
+ gp.sprint_var(boot_fail_threshold)
)
BuiltIn().fail(gp.sprint_error(error_message))