blob: 1467bfa78b4666cdf34b23096f1a675250680724 [file] [log] [blame]
#!/usr/bin/env python3
r"""
See class prolog below for details.
"""
import json
import logging
import os
import platform
import re
import subprocess
import sys
import time
from errno import EACCES, EPERM
import yaml
sys.dont_write_bytecode = True
script_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(script_dir)
# Walk path and append to sys.path
for root, dirs, files in os.walk(script_dir):
for dir in dirs:
sys.path.append(os.path.join(root, dir))
from ssh_utility import SSHRemoteclient # NOQA
from telnet_utility import TelnetRemoteclient # NOQA
r"""
User define plugins python functions.
It will imports files from directory plugins
plugins
├── file1.py
└── file2.py
Example how to define in YAML:
- plugin:
- plugin_name: plugin.foo_func.foo_func_yaml
- plugin_args:
- arg1
- arg2
"""
plugin_dir = os.path.join(os.path.dirname(__file__), "plugins")
sys.path.append(plugin_dir)
for module in os.listdir(plugin_dir):
if module == "__init__.py" or not module.endswith(".py"):
continue
plugin_module = f"plugins.{module[:-3]}"
try:
plugin = __import__(plugin_module, globals(), locals(), [], 0)
except Exception as e:
print(f"PLUGIN: Exception: {e}")
print(f"PLUGIN: Module import failed: {module}")
continue
r"""
This is for plugin functions returning data or responses to the caller
in YAML plugin setup.
Example:
- plugin:
- plugin_name: version = plugin.ssh_execution.ssh_execute_cmd
- plugin_args:
- ${hostname}
- ${username}
- ${password}
- "cat /etc/os-release | grep VERSION_ID | awk -F'=' '{print $2}'"
- plugin:
- plugin_name: plugin.print_vars.print_vars
- plugin_args:
- version
where first plugin "version" var is used by another plugin in the YAML
block or plugin
"""
global global_log_store_path
global global_plugin_dict
global global_plugin_list
# Hold the plugin return values in dict and plugin return vars in list.
# Dict is to reference and update vars processing in parser where as
# list is for current vars from the plugin block which needs processing.
global_plugin_dict = {}
global_plugin_list = []
# Hold the plugin return named declared if function returned values are
# list,dict.
# Refer this name list to look up the plugin dict for eval() args function
# Example ['version']
global_plugin_type_list = []
# Path where logs are to be stored or written.
global_log_store_path = ""
# Plugin error state defaults.
plugin_error_dict = {
"exit_on_error": False,
"continue_on_error": False,
}
class ffdc_collector:
r"""
Execute commands from configuration file to collect log files.
Fetch and store generated files at the specified location.
"""
def __init__(
self,
hostname,
username,
password,
port_ssh,
port_https,
port_ipmi,
ffdc_config,
location,
remote_type,
remote_protocol,
env_vars,
econfig,
log_level,
):
r"""
Initialize the FFDCCollector object with the provided parameters.
This method initializes an FFDCCollector object with the given
attributes. The attributes represent the configuration for connecting
to a remote system, collecting log data, and storing the collected
data.
Parameters:
hostname (str): Name or IP address of the targeted
(remote) system.
username (str): User on the targeted system with access
to log files.
password (str): Password for the user on the targeted
system.
port_ssh (int, optional): SSH port value. Defaults to 22.
port_https (int, optional): HTTPS port value. Defaults to 443.
port_ipmi (int, optional): IPMI port value. Defaults to 623.
ffdc_config (str): Configuration file listing commands
and files for FFDC.
location (str): Where to store collected log data.
remote_type (str): Block YAML type name of the remote
host.
remote_protocol (str): Protocol to use to collect data.
env_vars (dict, optional): User-defined CLI environment variables.
Defaults to None.
econfig (str, optional): User-defined environment variables
YAML file. Defaults to None.
log_level (str, optional): Log level for the collector.
Defaults to "INFO".
"""
self.hostname = hostname
self.username = username
self.password = password
self.port_ssh = str(port_ssh)
self.port_https = str(port_https)
self.port_ipmi = str(port_ipmi)
self.ffdc_config = ffdc_config
self.location = location + "/" + remote_type.upper()
self.ssh_remoteclient = None
self.telnet_remoteclient = None
self.ffdc_dir_path = ""
self.ffdc_prefix = ""
self.target_type = remote_type.upper()
self.remote_protocol = remote_protocol.upper()
self.env_vars = env_vars if env_vars else {}
self.econfig = econfig if econfig else {}
self.start_time = 0
self.elapsed_time = ""
self.env_dict = {}
self.logger = None
"""
Set prefix values for SCP files and directories.
Since the time stamp is at second granularity, these values are set
here to be sure that all files for this run will have the same
timestamps and be saved in the same directory.
self.location == local system for now
"""
self.set_ffdc_default_store_path()
# Logger for this run. Need to be after set_ffdc_default_store_path()
self.script_logging(getattr(logging, log_level.upper()))
# Verify top level directory exists for storage
self.validate_local_store(self.location)
if self.verify_script_env():
try:
with open(self.ffdc_config, "r") as file:
self.ffdc_actions = yaml.safe_load(file)
except yaml.YAMLError as e:
self.logger.error(e)
sys.exit(-1)
if self.target_type not in self.ffdc_actions:
self.logger.error(
"\n\tERROR: %s is not listed in %s.\n\n"
% (self.target_type, self.ffdc_config)
)
sys.exit(-1)
self.logger.info("\n\tENV: User define input YAML variables")
self.env_dict = self.load_env()
else:
sys.exit(-1)
def verify_script_env(self):
r"""
Verify that all required environment variables are set.
This method checks if all required environment variables are set.
If any required variable is missing, the method returns False.
Otherwise, it returns True.
Returns:
bool: True if all required environment variables are set,
False otherwise.
"""
# Import to log version
import click
import paramiko
run_env_ok = True
try:
redfishtool_version = (
self.run_tool_cmd("redfishtool -V").split(" ")[2].strip("\n")
)
except Exception as e:
self.logger.error("\tEXCEPTION redfishtool: %s", e)
redfishtool_version = "Not Installed (optional)"
try:
ipmitool_version = self.run_tool_cmd("ipmitool -V").split(" ")[2]
except Exception as e:
self.logger.error("\tEXCEPTION ipmitool: %s", e)
ipmitool_version = "Not Installed (optional)"
self.logger.info("\n\t---- Script host environment ----")
self.logger.info(
"\t{:<10} {:<10}".format("Script hostname", os.uname()[1])
)
self.logger.info(
"\t{:<10} {:<10}".format("Script host os", platform.platform())
)
self.logger.info(
"\t{:<10} {:>10}".format("Python", platform.python_version())
)
self.logger.info("\t{:<10} {:>10}".format("PyYAML", yaml.__version__))
self.logger.info("\t{:<10} {:>10}".format("click", click.__version__))
self.logger.info(
"\t{:<10} {:>10}".format("paramiko", paramiko.__version__)
)
self.logger.info(
"\t{:<10} {:>9}".format("redfishtool", redfishtool_version)
)
self.logger.info(
"\t{:<10} {:>12}".format("ipmitool", ipmitool_version)
)
if eval(yaml.__version__.replace(".", ",")) < (5, 3, 0):
self.logger.error(
"\n\tERROR: Python or python packages do not meet minimum"
" version requirement."
)
self.logger.error(
"\tERROR: PyYAML version 5.3.0 or higher is needed.\n"
)
run_env_ok = False
self.logger.info("\t---- End script host environment ----")
return run_env_ok
def script_logging(self, log_level_attr):
"""
Create a logger for the script with the specified log level.
This method creates a logger for the script with the specified
log level. The logger is configured to write log messages to a file
and the console.
self.logger = logging.getLogger(__name__)
Setting logger with __name__ will add the trace
Example:
INFO:ffdc_collector: System Type: OPENBMC
Currently, set to empty purposely to log as
System Type: OPENBMC
Parameters:
log_level_attr (str): The log level for the logger
(e.g., "DEBUG", "INFO", "WARNING",
"ERROR", "CRITICAL").
Returns:
None
"""
self.logger = logging.getLogger()
self.logger.setLevel(log_level_attr)
log_file_handler = logging.FileHandler(
self.ffdc_dir_path + "collector.log"
)
stdout_handler = logging.StreamHandler(sys.stdout)
self.logger.addHandler(log_file_handler)
self.logger.addHandler(stdout_handler)
# Turn off paramiko INFO logging
logging.getLogger("paramiko").setLevel(logging.WARNING)
def target_is_pingable(self):
r"""
Check if the target system is ping-able.
This method checks if the target system is reachable by sending an
ICMP echo request (ping). If the target system responds to the ping,
the method returns True. Otherwise, it returns False.
Returns:
bool: True if the target system is ping-able, False otherwise.
"""
response = os.system("ping -c 2 %s 2>&1 >/dev/null" % self.hostname)
if response == 0:
self.logger.info(
"\n\t[Check] %s is ping-able.\t\t [OK]" % self.hostname
)
return True
else:
self.logger.error(
"\n\tERROR: %s is not ping-able. FFDC collection aborted.\n"
% self.hostname
)
sys.exit(-1)
return False
def collect_ffdc(self):
r"""
Initiate FFDC collection based on the requested protocol.
This method initiates FFDC (First Failure Data Capture) collection
based on the requested protocol (SSH,SCP, TELNET, REDFISH, IPMI).
The method establishes a connection to the target system using the
specified protocol and collects the required FFDC data.
Returns:
None
"""
self.logger.info(
"\n\t---- Start communicating with %s ----" % self.hostname
)
self.start_time = time.time()
# Find the list of target and protocol supported.
check_protocol_list = []
config_dict = self.ffdc_actions
for target_type in config_dict.keys():
if self.target_type != target_type:
continue
for k, v in config_dict[target_type].items():
if v["PROTOCOL"][0] not in check_protocol_list:
check_protocol_list.append(v["PROTOCOL"][0])
self.logger.info(
"\n\t %s protocol type: %s"
% (self.target_type, check_protocol_list)
)
verified_working_protocol = self.verify_protocol(check_protocol_list)
if verified_working_protocol:
self.logger.info(
"\n\t---- Completed protocol pre-requisite check ----\n"
)
# Verify top level directory exists for storage
self.validate_local_store(self.location)
if (self.remote_protocol not in verified_working_protocol) and (
self.remote_protocol != "ALL"
):
self.logger.info(
"\n\tWorking protocol list: %s" % verified_working_protocol
)
self.logger.error(
"\tERROR: Requested protocol %s is not in working protocol"
" list.\n" % self.remote_protocol
)
sys.exit(-1)
else:
self.generate_ffdc(verified_working_protocol)
def ssh_to_target_system(self):
r"""
Establish an SSH connection to the target system.
This method establishes an SSH connection to the target system using
the provided hostname, username, password, and SSH port. If the
connection is successful, the method returns True. Otherwise, it logs
an error message and returns False.
Returns:
bool: True if the connection is successful, False otherwise.
"""
self.ssh_remoteclient = SSHRemoteclient(
self.hostname, self.username, self.password, self.port_ssh
)
if self.ssh_remoteclient.ssh_remoteclient_login():
self.logger.info(
"\n\t[Check] %s SSH connection established.\t [OK]"
% self.hostname
)
# Check scp connection.
# If scp connection fails,
# continue with FFDC generation but skip scp files to local host.
self.ssh_remoteclient.scp_connection()
return True
else:
self.logger.info(
"\n\t[Check] %s SSH connection.\t [NOT AVAILABLE]"
% self.hostname
)
return False
def telnet_to_target_system(self):
r"""
Establish a Telnet connection to the target system.
This method establishes a Telnet connection to the target system using
the provided hostname, username, and Telnet port. If the connection is
successful, the method returns True. Otherwise, it logs an error
message and returns False.
Returns:
bool: True if the connection is successful, False otherwise.
"""
self.telnet_remoteclient = TelnetRemoteclient(
self.hostname, self.username, self.password
)
if self.telnet_remoteclient.tn_remoteclient_login():
self.logger.info(
"\n\t[Check] %s Telnet connection established.\t [OK]"
% self.hostname
)
return True
else:
self.logger.info(
"\n\t[Check] %s Telnet connection.\t [NOT AVAILABLE]"
% self.hostname
)
return False
def generate_ffdc(self, working_protocol_list):
r"""
Generate FFDC (First Failure Data Capture) based on the remote host
type and working protocols.
This method determines the actions to be performed for generating FFDC
based on the remote host type and the list of confirmed working
protocols. The method iterates through the available actions for the
remote host type and checks if any of the working protocols are
supported. If a supported protocol is found, the method executes the
corresponding FFDC generation action.
Parameters:
working_protocol_list (list): A list of confirmed working
protocols to connect to the
remote host.
Returns:
None
"""
self.logger.info(
"\n\t---- Executing commands on " + self.hostname + " ----"
)
self.logger.info(
"\n\tWorking protocol list: %s" % working_protocol_list
)
config_dict = self.ffdc_actions
for target_type in config_dict.keys():
if self.target_type != target_type:
continue
self.logger.info("\n\tFFDC Path: %s " % self.ffdc_dir_path)
global_plugin_dict["global_log_store_path"] = self.ffdc_dir_path
self.logger.info("\tSystem Type: %s" % target_type)
for k, v in config_dict[target_type].items():
protocol = v["PROTOCOL"][0]
if (
self.remote_protocol not in working_protocol_list
and self.remote_protocol != "ALL"
) or protocol not in working_protocol_list:
continue
if protocol in working_protocol_list:
if protocol in ["SSH", "SCP"]:
self.protocol_ssh(protocol, target_type, k)
elif protocol == "TELNET":
self.protocol_telnet(target_type, k)
elif protocol in ["REDFISH", "IPMI", "SHELL"]:
self.protocol_service_execute(protocol, target_type, k)
else:
self.logger.error(
"\n\tERROR: %s is not available for %s."
% (protocol, self.hostname)
)
# Close network connection after collecting all files
self.elapsed_time = time.strftime(
"%H:%M:%S", time.gmtime(time.time() - self.start_time)
)
self.logger.info("\n\tTotal time taken: %s" % self.elapsed_time)
if self.ssh_remoteclient:
self.ssh_remoteclient.ssh_remoteclient_disconnect()
if self.telnet_remoteclient:
self.telnet_remoteclient.tn_remoteclient_disconnect()
def protocol_ssh(self, protocol, target_type, sub_type):
r"""
Perform actions using SSH and SCP protocols.
This method executes a set of commands using the SSH protocol to
connect to the target system and collect FFDC data. The method takes
the protocol, target type, and sub-type as arguments and performs the
corresponding actions based on the provided parameters.
Parameters:
protocol (str): The protocol to execute (SSH or SCP).
target_type (str): The type group of the remote host.
sub_type (str): The group type of commands to execute.
Returns:
None
"""
if protocol == "SCP":
self.group_copy(self.ffdc_actions[target_type][sub_type])
else:
self.collect_and_copy_ffdc(
self.ffdc_actions[target_type][sub_type]
)
def protocol_telnet(self, target_type, sub_type):
r"""
Perform actions using the Telnet protocol.
This method executes a set of commands using the Telnet protocol to
connect to the target system and collect FFDC data. The method takes
the target type and sub-type as arguments and performs the
corresponding actions based on the provided parameters.
Parameters:
target_type (str): The type group of the remote host.
sub_type (str): The group type of commands to execute.
Returns:
None
"""
self.logger.info(
"\n\t[Run] Executing commands on %s using %s"
% (self.hostname, "TELNET")
)
telnet_files_saved = []
progress_counter = 0
list_of_commands = self.ffdc_actions[target_type][sub_type]["COMMANDS"]
for index, each_cmd in enumerate(list_of_commands, start=0):
command_txt, command_timeout = self.unpack_command(each_cmd)
result = self.telnet_remoteclient.execute_command(
command_txt, command_timeout
)
if result:
try:
targ_file = self.ffdc_actions[target_type][sub_type][
"FILES"
][index]
except IndexError:
targ_file = command_txt
self.logger.warning(
"\n\t[WARN] Missing filename to store data from"
" telnet %s." % each_cmd
)
self.logger.warning(
"\t[WARN] Data will be stored in %s." % targ_file
)
targ_file_with_path = (
self.ffdc_dir_path + self.ffdc_prefix + targ_file
)
# Creates a new file
with open(targ_file_with_path, "w") as fp:
fp.write(result)
fp.close
telnet_files_saved.append(targ_file)
progress_counter += 1
self.print_progress(progress_counter)
self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
for file in telnet_files_saved:
self.logger.info("\n\t\tSuccessfully save file " + file + ".")
def protocol_service_execute(self, protocol, target_type, sub_type):
r"""
Perform actions for a given protocol.
This method executes a set of commands using the specified protocol to
connect to the target system and collect FFDC data. The method takes
the protocol, target type, and sub-type as arguments and performs the
corresponding actions based on the provided parameters.
Parameters:
protocol (str): The protocol to execute
(REDFISH, IPMI, or SHELL).
target_type (str): The type group of the remote host.
sub_type (str): The group type of commands to execute.
Returns:
None
"""
self.logger.info(
"\n\t[Run] Executing commands to %s using %s"
% (self.hostname, protocol)
)
executed_files_saved = []
progress_counter = 0
list_of_cmd = self.get_command_list(
self.ffdc_actions[target_type][sub_type]
)
for index, each_cmd in enumerate(list_of_cmd, start=0):
plugin_call = False
if isinstance(each_cmd, dict):
if "plugin" in each_cmd:
# If the error is set and plugin explicitly
# requested to skip execution on error..
if plugin_error_dict[
"exit_on_error"
] and self.plugin_error_check(each_cmd["plugin"]):
self.logger.info(
"\n\t[PLUGIN-ERROR] exit_on_error: %s"
% plugin_error_dict["exit_on_error"]
)
self.logger.info(
"\t[PLUGIN-SKIP] %s" % each_cmd["plugin"][0]
)
continue
plugin_call = True
# call the plugin
self.logger.info("\n\t[PLUGIN-START]")
result = self.execute_plugin_block(each_cmd["plugin"])
self.logger.info("\t[PLUGIN-END]\n")
else:
each_cmd = self.yaml_env_and_plugin_vars_populate(each_cmd)
if not plugin_call:
result = self.run_tool_cmd(each_cmd)
if result:
try:
file_name = self.get_file_list(
self.ffdc_actions[target_type][sub_type]
)[index]
# If file is specified as None.
if file_name == "None":
continue
targ_file = self.yaml_env_and_plugin_vars_populate(
file_name
)
except IndexError:
targ_file = each_cmd.split("/")[-1]
self.logger.warning(
"\n\t[WARN] Missing filename to store data from %s."
% each_cmd
)
self.logger.warning(
"\t[WARN] Data will be stored in %s." % targ_file
)
targ_file_with_path = (
self.ffdc_dir_path + self.ffdc_prefix + targ_file
)
# Creates a new file
with open(targ_file_with_path, "w") as fp:
if isinstance(result, dict):
fp.write(json.dumps(result))
else:
fp.write(result)
fp.close
executed_files_saved.append(targ_file)
progress_counter += 1
self.print_progress(progress_counter)
self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
for file in executed_files_saved:
self.logger.info("\n\t\tSuccessfully save file " + file + ".")
def collect_and_copy_ffdc(
self, ffdc_actions_for_target_type, form_filename=False
):
r"""
Send commands and collect FFDC data from the targeted system.
This method sends a set of commands and collects FFDC data from the
targeted system based on the provided ffdc_actions_for_target_type
dictionary. The method also has an optional form_filename parameter,
which, if set to True, prepends the target type to the output file
name.
Parameters:
ffdc_actions_for_target_type (dict): A dictionary containing
commands and files for the
selected remote host type.
form_filename (bool, optional): If True, prepends the target
type to the output file name.
Defaults to False.
Returns:
None
"""
# Executing commands, if any
self.ssh_execute_ffdc_commands(
ffdc_actions_for_target_type, form_filename
)
# Copying files
if self.ssh_remoteclient.scpclient:
self.logger.info(
"\n\n\tCopying FFDC files from remote system %s.\n"
% self.hostname
)
# Retrieving files from target system
list_of_files = self.get_file_list(ffdc_actions_for_target_type)
self.scp_ffdc(
self.ffdc_dir_path,
self.ffdc_prefix,
form_filename,
list_of_files,
)
else:
self.logger.info(
"\n\n\tSkip copying FFDC files from remote system %s.\n"
% self.hostname
)
def get_command_list(self, ffdc_actions_for_target_type):
r"""
Fetch a list of commands from the configuration file.
This method retrieves a list of commands from the
ffdc_actions_for_target_type dictionary, which contains commands and
files for the selected remote host type. The method returns the list
of commands.
Parameters:
ffdc_actions_for_target_type (dict): A dictionary containing
commands and files for the
selected remote host type.
Returns:
list: A list of commands.
"""
try:
list_of_commands = ffdc_actions_for_target_type["COMMANDS"]
except KeyError:
list_of_commands = []
return list_of_commands
def get_file_list(self, ffdc_actions_for_target_type):
r"""
Fetch a list of files from the configuration file.
This method retrieves a list of files from the
ffdc_actions_for_target_type dictionary, which contains commands and
files for the selected remote host type. The method returns the list
of files.
Parameters:
ffdc_actions_for_target_type (dict): A dictionary containing
commands and files for the
selected remote host type.
Returns:
list: A list of files.
"""
try:
list_of_files = ffdc_actions_for_target_type["FILES"]
except KeyError:
list_of_files = []
return list_of_files
def unpack_command(self, command):
r"""
Unpack a command from the configuration file, handling both dictionary
and string inputs.
This method takes a command from the configuration file, which can be
either a dictionary or a string. If the input is a dictionary, the
method extracts the command text and timeout from the dictionary.
If the input is a string, the method assumes a default timeout of
60 seconds.
The method returns a tuple containing the command text and timeout.
Parameters:
command (dict or str): A command from the configuration file,
which can be either a dictionary or a
string.
Returns:
tuple: A tuple containing the command text and timeout.
"""
if isinstance(command, dict):
command_txt = next(iter(command))
command_timeout = next(iter(command.values()))
elif isinstance(command, str):
command_txt = command
# Default command timeout 60 seconds
command_timeout = 60
return command_txt, command_timeout
def ssh_execute_ffdc_commands(
self, ffdc_actions_for_target_type, form_filename=False
):
r"""
Send commands in the ffdc_config file to the targeted system using SSH.
This method sends a set of commands and collects FFDC data from the
targeted system using the SSH protocol. The method takes the
ffdc_actions_for_target_type dictionary and an optional
form_filename parameter as arguments.
If form_filename is set to True, the method prepends the target type
to the output file name. The method returns the output of the executed
commands.
It also prints the progress counter string + on the console.
Parameters:
ffdc_actions_for_target_type (dict): A dictionary containing
commands and files for the
selected remote host type.
form_filename (bool, optional): If True, prepends the target
type to the output file name.
Defaults to False.
Returns:
None
"""
self.logger.info(
"\n\t[Run] Executing commands on %s using %s"
% (self.hostname, ffdc_actions_for_target_type["PROTOCOL"][0])
)
list_of_commands = self.get_command_list(ffdc_actions_for_target_type)
# If command list is empty, returns
if not list_of_commands:
return
progress_counter = 0
for command in list_of_commands:
command_txt, command_timeout = self.unpack_command(command)
if form_filename:
command_txt = str(command_txt % self.target_type)
(
cmd_exit_code,
err,
response,
) = self.ssh_remoteclient.execute_command(
command_txt, command_timeout
)
if cmd_exit_code:
self.logger.warning(
"\n\t\t[WARN] %s exits with code %s."
% (command_txt, str(cmd_exit_code))
)
self.logger.warning("\t\t[WARN] %s " % err)
progress_counter += 1
self.print_progress(progress_counter)
self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
def group_copy(self, ffdc_actions_for_target_type):
r"""
SCP a group of files (wildcard) from the remote host.
This method copies a group of files from the remote host using the SCP
protocol. The method takes the fdc_actions_for_target_type dictionary
as an argument, which contains commands and files for the selected
remote host type.
Parameters:
fdc_actions_for_target_type (dict): A dictionary containing
commands and files for the
selected remote host type.
Returns:
None
"""
if self.ssh_remoteclient.scpclient:
self.logger.info(
"\n\tCopying files from remote system %s via SCP.\n"
% self.hostname
)
list_of_commands = self.get_command_list(
ffdc_actions_for_target_type
)
# If command list is empty, returns
if not list_of_commands:
return
for command in list_of_commands:
try:
command = self.yaml_env_and_plugin_vars_populate(command)
except IndexError:
self.logger.error("\t\tInvalid command %s" % command)
continue
(
cmd_exit_code,
err,
response,
) = self.ssh_remoteclient.execute_command(command)
# If file does not exist, code take no action.
# cmd_exit_code is ignored for this scenario.
if response:
scp_result = self.ssh_remoteclient.scp_file_from_remote(
response.split("\n"), self.ffdc_dir_path
)
if scp_result:
self.logger.info(
"\t\tSuccessfully copied from "
+ self.hostname
+ ":"
+ command
)
else:
self.logger.info("\t\t%s has no result" % command)
else:
self.logger.info(
"\n\n\tSkip copying files from remote system %s.\n"
% self.hostname
)
def scp_ffdc(
self,
targ_dir_path,
targ_file_prefix,
form_filename,
file_list=None,
quiet=None,
):
r"""
SCP all files in the file_dict to the indicated directory on the local
system.
This method copies all files specified in the file_dict dictionary
from the targeted system to the local system using the SCP protocol.
The method takes the target directory path, target file prefix, and a
boolean flag form_filename as required arguments.
The file_dict argument is optional and contains the files to be copied.
The quiet argument is also optional and, if set to True, suppresses
the output of the SCP operation.
Parameters:
targ_dir_path (str): The path of the directory to receive
the files on the local system.
targ_file_prefix (str): Prefix which will be prepended to each
target file's name.
form_filename (bool): If True, prepends the target type to
the file names.
file_dict (dict, optional): A dictionary containing the files to
be copied. Defaults to None.
quiet (bool, optional): If True, suppresses the output of the
SCP operation. Defaults to None.
Returns:
None
"""
progress_counter = 0
for filename in file_list:
if form_filename:
filename = str(filename % self.target_type)
source_file_path = filename
targ_file_path = (
targ_dir_path + targ_file_prefix + filename.split("/")[-1]
)
# If source file name contains wild card, copy filename as is.
if "*" in source_file_path:
scp_result = self.ssh_remoteclient.scp_file_from_remote(
source_file_path, self.ffdc_dir_path
)
else:
scp_result = self.ssh_remoteclient.scp_file_from_remote(
source_file_path, targ_file_path
)
if not quiet:
if scp_result:
self.logger.info(
"\t\tSuccessfully copied from "
+ self.hostname
+ ":"
+ source_file_path
+ ".\n"
)
else:
self.logger.info(
"\t\tFail to copy from "
+ self.hostname
+ ":"
+ source_file_path
+ ".\n"
)
else:
progress_counter += 1
self.print_progress(progress_counter)
def set_ffdc_default_store_path(self):
r"""
Set default values for self.ffdc_dir_path and self.ffdc_prefix.
This method sets default values for the self.ffdc_dir_path and
self.ffdc_prefix class variables.
The collected FFDC files will be stored in the directory
/self.location/hostname_timestr/, with individual files having the
format timestr_filename where timestr is in %Y%m%d-%H%M%S.
Returns:
None
"""
timestr = time.strftime("%Y%m%d-%H%M%S")
self.ffdc_dir_path = (
self.location + "/" + self.hostname + "_" + timestr + "/"
)
self.ffdc_prefix = timestr + "_"
self.validate_local_store(self.ffdc_dir_path)
# Need to verify local store path exists prior to instantiate this class.
# This class method to validate log path before referencing this class.
@classmethod
def validate_local_store(cls, dir_path):
r"""
Ensure the specified directory exists to store FFDC files locally.
This method checks if the provided dir_path exists. If the directory
does not exist, the method creates it. The method does not return any
value.
Parameters:
dir_path (str): The directory path where collected FFDC data files
will be stored.
Returns:
None
"""
if not os.path.exists(dir_path):
try:
os.makedirs(dir_path, 0o755)
except (IOError, OSError) as e:
# PermissionError
if e.errno == EPERM or e.errno == EACCES:
print(
"\tERROR: os.makedirs %s failed with"
" PermissionError.\n" % dir_path
)
else:
print(
"\tERROR: os.makedirs %s failed with %s.\n"
% (dir_path, e.strerror)
)
sys.exit(-1)
def print_progress(self, progress):
r"""
Print the current activity progress.
This method prints the current activity progress using the provided
progress counter. The method does not return any value.
Parameters:
progress (int): The current activity progress counter.
Returns:
None
"""
sys.stdout.write("\r\t" + "+" * progress)
sys.stdout.flush()
time.sleep(0.1)
def verify_redfish(self):
r"""
Verify if the remote host has the Redfish service active.
This method checks if the remote host has the Redfish service active
by sending a GET request to the Redfish base URL /redfish/v1/.
If the request is successful (status code 200), the method returns
stdout output of the run else error message.
Returns:
str: Redfish service executed output.
"""
redfish_parm = (
"redfishtool -r "
+ self.hostname
+ ":"
+ self.port_https
+ " -S Always raw GET /redfish/v1/"
)
return self.run_tool_cmd(redfish_parm, True)
def verify_ipmi(self):
r"""
Verify if the remote host has the IPMI LAN service active.
This method checks if the remote host has the IPMI LAN service active
by sending an IPMI "power status" command.
If the command is successful (returns a non-empty response),
else error message.
Returns:
str: IPMI LAN service executed output.
"""
if self.target_type == "OPENBMC":
ipmi_parm = (
"ipmitool -I lanplus -C 17 -U "
+ self.username
+ " -P "
+ self.password
+ " -H "
+ self.hostname
+ " -p "
+ str(self.port_ipmi)
+ " power status"
)
else:
ipmi_parm = (
"ipmitool -I lanplus -P "
+ self.password
+ " -H "
+ self.hostname
+ " -p "
+ str(self.port_ipmi)
+ " power status"
)
return self.run_tool_cmd(ipmi_parm, True)
def run_tool_cmd(self, parms_string, quiet=False):
r"""
Run a CLI standard tool or script with the provided command options.
This method runs a CLI standard tool or script with the provided
parms_string command options. If the quiet parameter is set to True,
the method suppresses the output of the command.
The method returns the output of the command as a string.
Parameters:
parms_string (str): The command options for the CLI tool or
script.
quiet (bool, optional): If True, suppresses the output of the
command. Defaults to False.
Returns:
str: The output of the command as a string.
"""
result = subprocess.run(
[parms_string],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
universal_newlines=True,
)
if result.stderr and not quiet:
if self.password in parms_string:
parms_string = parms_string.replace(self.password, "********")
self.logger.error("\n\t\tERROR with %s " % parms_string)
self.logger.error("\t\t" + result.stderr)
return result.stdout
def verify_protocol(self, protocol_list):
r"""
Perform a working check for the provided list of protocols.
This method checks if the specified protocols are available on the
remote host. The method iterates through the protocol_list and
attempts to establish a connection using each protocol.
If a connection is successfully established, the method append to the
list and if any protocol fails to connect, the method ignores it.
Parameters:
protocol_list (list): A list of protocols to check.
Returns:
list: All protocols are available list.
"""
tmp_list = []
if self.target_is_pingable():
tmp_list.append("SHELL")
for protocol in protocol_list:
if self.remote_protocol != "ALL":
if self.remote_protocol != protocol:
continue
# Only check SSH/SCP once for both protocols
if (
protocol == "SSH"
or protocol == "SCP"
and protocol not in tmp_list
):
if self.ssh_to_target_system():
# Add only what user asked.
if self.remote_protocol != "ALL":
tmp_list.append(self.remote_protocol)
else:
tmp_list.append("SSH")
tmp_list.append("SCP")
if protocol == "TELNET":
if self.telnet_to_target_system():
tmp_list.append(protocol)
if protocol == "REDFISH":
if self.verify_redfish():
tmp_list.append(protocol)
self.logger.info(
"\n\t[Check] %s Redfish Service.\t\t [OK]"
% self.hostname
)
else:
self.logger.info(
"\n\t[Check] %s Redfish Service.\t\t [NOT AVAILABLE]"
% self.hostname
)
if protocol == "IPMI":
if self.verify_ipmi():
tmp_list.append(protocol)
self.logger.info(
"\n\t[Check] %s IPMI LAN Service.\t\t [OK]"
% self.hostname
)
else:
self.logger.info(
"\n\t[Check] %s IPMI LAN Service.\t\t [NOT AVAILABLE]"
% self.hostname
)
return tmp_list
def load_env(self):
r"""
Load the user environment variables from a YAML file.
This method reads the environment variables from a YAML file specified
in the ENV_FILE environment variable. If the file is not found or
there is an error reading the file, an exception is raised.
The YAML file should have the following format:
.. code-block:: yaml
VAR_NAME: VAR_VALUE
Where VAR_NAME is the name of the environment variable, and
VAR_VALUE is its value.
After loading the environment variables, they are stored in the
self.env attribute for later use.
"""
os.environ["hostname"] = self.hostname
os.environ["username"] = self.username
os.environ["password"] = self.password
os.environ["port_ssh"] = self.port_ssh
os.environ["port_https"] = self.port_https
os.environ["port_ipmi"] = self.port_ipmi
# Append default Env.
self.env_dict["hostname"] = self.hostname
self.env_dict["username"] = self.username
self.env_dict["password"] = self.password
self.env_dict["port_ssh"] = self.port_ssh
self.env_dict["port_https"] = self.port_https
self.env_dict["port_ipmi"] = self.port_ipmi
try:
tmp_env_dict = {}
if self.env_vars:
tmp_env_dict = json.loads(self.env_vars)
# Export ENV vars default.
for key, value in tmp_env_dict.items():
os.environ[key] = value
self.env_dict[key] = str(value)
# Load user specified ENV config YAML.
if self.econfig:
with open(self.econfig, "r") as file:
try:
tmp_env_dict = yaml.load(file, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
self.logger.error(e)
sys.exit(-1)
# Export ENV vars.
for key, value in tmp_env_dict["env_params"].items():
os.environ[key] = str(value)
self.env_dict[key] = str(value)
except json.decoder.JSONDecodeError as e:
self.logger.error("\n\tERROR: %s " % e)
sys.exit(-1)
except FileNotFoundError as e:
self.logger.error("\n\tERROR: %s " % e)
sys.exit(-1)
# This to mask the password from displaying on the console.
mask_dict = self.env_dict.copy()
for k, v in mask_dict.items():
if k.lower().find("password") != -1:
hidden_text = []
hidden_text.append(v)
password_regex = (
"(" + "|".join([re.escape(x) for x in hidden_text]) + ")"
)
mask_dict[k] = re.sub(password_regex, "********", v)
self.logger.info(json.dumps(mask_dict, indent=8, sort_keys=False))
def execute_python_eval(self, eval_string):
r"""
Execute a qualified Python function string using the eval() function.
This method executes a provided Python function string using the
eval() function.
The method takes the eval_string as an argument, which is expected to
be a valid Python function call.
The method returns the result of the executed function.
Example:
eval(plugin.foo_func.foo_func(10))
Parameters:
eval_string (str): A valid Python function call string.
Returns:
str: The result of the executed function and on failure return
PLUGIN_EVAL_ERROR.
"""
try:
self.logger.info("\tExecuting plugin func()")
self.logger.debug("\tCall func: %s" % eval_string)
result = eval(eval_string)
self.logger.info("\treturn: %s" % str(result))
except (
ValueError,
SyntaxError,
NameError,
AttributeError,
TypeError,
) as e:
self.logger.error("\tERROR: execute_python_eval: %s" % e)
# Set the plugin error state.
plugin_error_dict["exit_on_error"] = True
self.logger.info("\treturn: PLUGIN_EVAL_ERROR")
return "PLUGIN_EVAL_ERROR"
return result
def execute_plugin_block(self, plugin_cmd_list):
r"""
Pack the plugin commands into qualified Python string objects.
This method processes the plugin_cmd_list argument, which is expected
to contain a list of plugin commands read from a YAML file. The method
iterates through the list, constructs a qualified Python string object
for each plugin command, and returns a list of these string objects.
Parameters:
plugin_cmd_list (list): A list of plugin commands containing
plugin names and arguments.
Plugin block read from YAML
[
{'plugin_name':'plugin.foo_func.my_func'},
{'plugin_args':[10]},
]
Example:
Execute and no return response
- plugin:
- plugin_name: plugin.foo_func.my_func
- plugin_args:
- arg1
- arg2
Execute and return a response
- plugin:
- plugin_name: result = plugin.foo_func.my_func
- plugin_args:
- arg1
- arg2
Execute and return multiple values response
- plugin:
- plugin_name: result1,result2 = plugin.foo_func.my_func
- plugin_args:
- arg1
- arg2
Returns:
str: Execute and not response or a string value(s) responses,
"""
try:
idx = self.key_index_list_dict("plugin_name", plugin_cmd_list)
plugin_name = plugin_cmd_list[idx]["plugin_name"]
# Equal separator means plugin function returns result.
if " = " in plugin_name:
# Ex. ['result', 'plugin.foo_func.my_func']
plugin_name_args = plugin_name.split(" = ")
# plugin func return data.
for arg in plugin_name_args:
if arg == plugin_name_args[-1]:
plugin_name = arg
else:
plugin_resp = arg.split(",")
# ['result1','result2']
for x in plugin_resp:
global_plugin_list.append(x)
global_plugin_dict[x] = ""
# Walk the plugin args ['arg1,'arg2']
# If the YAML plugin statement 'plugin_args' is not declared.
plugin_args = []
if any("plugin_args" in d for d in plugin_cmd_list):
idx = self.key_index_list_dict("plugin_args", plugin_cmd_list)
if idx is not None:
plugin_args = plugin_cmd_list[idx].get("plugin_args", [])
plugin_args = self.yaml_args_populate(plugin_args)
else:
plugin_args = self.yaml_args_populate([])
# Pack the args list to string parameters for plugin function.
parm_args_str = self.yaml_args_string(plugin_args)
"""
Example of plugin_func:
plugin.redfish.enumerate_request(
"xx.xx.xx.xx:443",
"root",
"********",
"/redfish/v1/",
"json")
"""
if parm_args_str:
plugin_func = f"{plugin_name}({parm_args_str})"
else:
plugin_func = f"{plugin_name}()"
# Execute plugin function.
if global_plugin_dict:
resp = self.execute_python_eval(plugin_func)
# Update plugin vars dict if there is any.
if resp != "PLUGIN_EVAL_ERROR":
self.response_args_data(resp)
else:
resp = self.execute_python_eval(plugin_func)
except Exception as e:
# Set the plugin error state.
plugin_error_dict["exit_on_error"] = True
self.logger.error("\tERROR: execute_plugin_block: %s" % e)
pass
# There is a real error executing the plugin function.
if resp == "PLUGIN_EVAL_ERROR":
return resp
# Check if plugin_expects_return (int, string, list,dict etc)
if any("plugin_expects_return" in d for d in plugin_cmd_list):
idx = self.key_index_list_dict(
"plugin_expects_return", plugin_cmd_list
)
plugin_expects = plugin_cmd_list[idx]["plugin_expects_return"]
if plugin_expects:
if resp:
if (
self.plugin_expect_type(plugin_expects, resp)
== "INVALID"
):
self.logger.error("\tWARN: Plugin error check skipped")
elif not self.plugin_expect_type(plugin_expects, resp):
self.logger.error(
"\tERROR: Plugin expects return data: %s"
% plugin_expects
)
plugin_error_dict["exit_on_error"] = True
elif not resp:
self.logger.error(
"\tERROR: Plugin func failed to return data"
)
plugin_error_dict["exit_on_error"] = True
return resp
def response_args_data(self, plugin_resp):
r"""
Parse the plugin function response and update plugin return variable.
plugin_resp Response data from plugin function.
"""
resp_list = []
resp_data = ""
# There is nothing to update the plugin response.
if len(global_plugin_list) == 0 or plugin_resp == "None":
return
if isinstance(plugin_resp, str):
resp_data = plugin_resp.strip("\r\n\t")
resp_list.append(resp_data)
elif isinstance(plugin_resp, bytes):
resp_data = str(plugin_resp, "UTF-8").strip("\r\n\t")
resp_list.append(resp_data)
elif isinstance(plugin_resp, tuple):
if len(global_plugin_list) == 1:
resp_list.append(plugin_resp)
else:
resp_list = list(plugin_resp)
resp_list = [x.strip("\r\n\t") for x in resp_list]
elif isinstance(plugin_resp, list):
if len(global_plugin_list) == 1:
resp_list.append([x.strip("\r\n\t") for x in plugin_resp])
else:
resp_list = [x.strip("\r\n\t") for x in plugin_resp]
elif isinstance(plugin_resp, int) or isinstance(plugin_resp, float):
resp_list.append(plugin_resp)
# Iterate if there is a list of plugin return vars to update.
for idx, item in enumerate(resp_list, start=0):
# Exit loop, done required loop.
if idx >= len(global_plugin_list):
break
# Find the index of the return func in the list and
# update the global func return dictionary.
try:
dict_idx = global_plugin_list[idx]
global_plugin_dict[dict_idx] = item
except (IndexError, ValueError) as e:
self.logger.warn("\tWARN: response_args_data: %s" % e)
pass
# Done updating plugin dict irrespective of pass or failed,
# clear all the list element for next plugin block execute.
global_plugin_list.clear()
def yaml_args_string(self, plugin_args):
r"""
Pack the arguments into a string representation.
This method processes the plugin_arg argument, which is expected to
contain a list of arguments. The method iterates through the list,
converts each argument to a string, and concatenates them into a
single string. Special handling is applied for integer, float, and
predefined plugin variable types.
Ecample:
From
['xx.xx.xx.xx:443', 'root', '********', '/redfish/v1/', 'json']
to
"xx.xx.xx.xx:443","root","********","/redfish/v1/","json"
Parameters:
plugin_args (list): A list of arguments to be packed into
a string.
Returns:
str: A string representation of the arguments.
"""
args_str = ""
for i, arg in enumerate(plugin_args):
if arg:
if isinstance(arg, (int, float)):
args_str += str(arg)
elif arg in global_plugin_type_list:
args_str += str(global_plugin_dict[arg])
else:
args_str += f'"{arg.strip("\r\n\t")}"'
# Skip last list element.
if i != len(plugin_args) - 1:
args_str += ","
return args_str
def yaml_args_populate(self, yaml_arg_list):
r"""
Decode environment and plugin variables and populate the argument list.
This method processes the yaml_arg_list argument, which is expected to
contain a list of arguments read from a YAML file. The method iterates
through the list, decodes environment and plugin variables, and
returns a populated list of arguments.
.. code-block:: yaml
- plugin_args:
- arg1
- arg2
['${hostname}:${port_https}', '${username}', '/redfish/v1/', 'json']
Returns the populated plugin list
['xx.xx.xx.xx:443', 'root', '/redfish/v1/', 'json']
Parameters:
yaml_arg_list (list): A list of arguments containing environment
and plugin variables.
Returns:
list: A populated list of arguments with decoded environment and
plugin variables.
"""
if isinstance(yaml_arg_list, list):
populated_list = []
for arg in yaml_arg_list:
if isinstance(arg, (int, float)):
populated_list.append(arg)
elif isinstance(arg, str):
arg_str = self.yaml_env_and_plugin_vars_populate(str(arg))
populated_list.append(arg_str)
else:
populated_list.append(arg)
return populated_list
def yaml_env_and_plugin_vars_populate(self, yaml_arg_str):
r"""
Update environment variables and plugin variables based on the
provided YAML argument string.
This method processes the yaml_arg_str argument, which is expected
to contain a string representing environment variables and plugin
variables in the format:
.. code-block:: yaml
- cat ${MY_VAR}
- ls -AX my_plugin_var
The method parses the string, extracts the variable names, and updates
the corresponding environment variables and plugin variables.
Parameters:
yaml_arg_str (str): A string containing environment and plugin
variable definitions in YAML format.
Returns:
str: The updated YAML argument string with plugin variables
replaced.
"""
# Parse and convert the Plugin YAML vars string to python vars
# Example:
# ${my_hostname}:${port_https} -> ['my_hostname', 'port_https']
try:
# Example, list of matching
# env vars ['username', 'password', 'hostname']
# Extra escape \ for special symbols. '\$\{([^\}]+)\}' works good.
env_var_regex = r"\$\{([^\}]+)\}"
env_var_names_list = re.findall(env_var_regex, yaml_arg_str)
for var in env_var_names_list:
env_var = os.environ.get(var)
if env_var:
env_replace = "${" + var + "}"
yaml_arg_str = yaml_arg_str.replace(env_replace, env_var)
except Exception as e:
self.logger.error("\tERROR:yaml_env_vars_populate: %s" % e)
pass
"""
Parse the string for plugin vars.
Implement the logic to update environment variables based on the
extracted variable names.
"""
try:
# Example, list of plugin vars env_var_names_list
# ['my_hostname', 'port_https']
global_plugin_dict_keys = set(global_plugin_dict.keys())
# Skip env var list already populated above code block list.
plugin_var_name_list = [
var
for var in global_plugin_dict_keys
if var not in env_var_names_list
]
for var in plugin_var_name_list:
plugin_var_value = global_plugin_dict[var]
if yaml_arg_str in global_plugin_dict:
"""
If this plugin var exist but empty in dict, don't replace.
his is either a YAML plugin statement incorrectly used or
user added a plugin var which is not going to be populated.
"""
if isinstance(plugin_var_value, (list, dict)):
"""
List data type or dict can't be replaced, use
directly in eval function call.
"""
global_plugin_type_list.append(var)
else:
yaml_arg_str = yaml_arg_str.replace(
str(var), str(plugin_var_value)
)
except (IndexError, ValueError) as e:
self.logger.error("\tERROR: yaml_plugin_vars_populate: %s" % e)
pass
# From ${my_hostname}:${port_https} -> ['my_hostname', 'port_https']
# to populated values string as
# Example: xx.xx.xx.xx:443 and return the string
return yaml_arg_str
def plugin_error_check(self, plugin_dict):
r"""
Process plugin error dictionary and return the corresponding error
message.
This method checks if any dictionary in the plugin_dict list contains
a "plugin_error" key. If such a dictionary is found, it retrieves the
value associated with the "plugin_error" key and returns the
corresponding error message from the plugin_error_dict attribute.
Parameters:
plugin_dict (list of dict): A list of dictionaries containing
plugin error information.
Returns:
str: The error message corresponding to the "plugin_error" value,
or None if no error is found.
"""
if any("plugin_error" in d for d in plugin_dict):
for d in plugin_dict:
if "plugin_error" in d:
value = d["plugin_error"]
return self.plugin_error_dict.get(value, None)
return None
def key_index_list_dict(self, key, list_dict):
r"""
Find the index of the first dictionary in the list that contains
the specified key.
Parameters:
key (str): The key to search for in the
dictionaries.
list_dict (list of dict): A list of dictionaries to search
through.
Returns:
int: The index of the first dictionary containing the key, or -1
if no match is found.
"""
for i, d in enumerate(list_dict):
if key in d:
return i
return -1
def plugin_expect_type(self, type, data):
r"""
Check if the provided data matches the expected type.
This method checks if the data argument matches the specified type.
It supports the following types: "int", "float", "str", "list", "dict",
and "tuple".
If the type is not recognized, it logs an info message and returns
"INVALID".
Parameters:
type (str): The expected data type.
data: The data to check against the expected type.
Returns:
bool or str: True if the data matches the expected type, False if
not, or "INVALID" if the type is not recognized.
"""
if type == "int":
return isinstance(data, int)
elif type == "float":
return isinstance(data, float)
elif type == "str":
return isinstance(data, str)
elif type == "list":
return isinstance(data, list)
elif type == "dict":
return isinstance(data, dict)
elif type == "tuple":
return isinstance(data, tuple)
else:
self.logger.info("\tInvalid data type requested: %s" % type)
return "INVALID"