|  | #!/usr/bin/env python3 | 
|  |  | 
|  | r""" | 
|  | See class prolog below for details. | 
|  | """ | 
|  |  | 
|  | import json | 
|  | import logging | 
|  | import os | 
|  | import platform | 
|  | import re | 
|  | import subprocess | 
|  | import sys | 
|  | import time | 
|  | from errno import EACCES, EPERM | 
|  |  | 
|  | import yaml | 
|  |  | 
|  | sys.dont_write_bytecode = True | 
|  |  | 
|  |  | 
|  | script_dir = os.path.dirname(os.path.abspath(__file__)) | 
|  | sys.path.append(script_dir) | 
|  | # Walk path and append to sys.path | 
|  | for root, dirs, files in os.walk(script_dir): | 
|  | for dir in dirs: | 
|  | sys.path.append(os.path.join(root, dir)) | 
|  |  | 
|  | from ssh_utility import SSHRemoteclient  # NOQA | 
|  | from telnet_utility import TelnetRemoteclient  # NOQA | 
|  |  | 
|  | r""" | 
|  | User define plugins python functions. | 
|  |  | 
|  | It will imports files from directory plugins | 
|  |  | 
|  | plugins | 
|  | ├── file1.py | 
|  | └── file2.py | 
|  |  | 
|  | Example how to define in YAML: | 
|  | - plugin: | 
|  | - plugin_name: plugin.foo_func.foo_func_yaml | 
|  | - plugin_args: | 
|  | - arg1 | 
|  | - arg2 | 
|  | """ | 
|  | plugin_dir = os.path.join(os.path.dirname(__file__), "plugins") | 
|  | sys.path.append(plugin_dir) | 
|  |  | 
|  | for module in os.listdir(plugin_dir): | 
|  | if module == "__init__.py" or not module.endswith(".py"): | 
|  | continue | 
|  |  | 
|  | plugin_module = f"plugins.{module[:-3]}" | 
|  | try: | 
|  | plugin = __import__(plugin_module, globals(), locals(), [], 0) | 
|  | except Exception as e: | 
|  | print(f"PLUGIN: Exception: {e}") | 
|  | print(f"PLUGIN: Module import failed: {module}") | 
|  | continue | 
|  |  | 
|  | r""" | 
|  | This is for plugin functions returning data or responses to the caller | 
|  | in YAML plugin setup. | 
|  |  | 
|  | Example: | 
|  |  | 
|  | - plugin: | 
|  | - plugin_name: version = plugin.ssh_execution.ssh_execute_cmd | 
|  | - plugin_args: | 
|  | - ${hostname} | 
|  | - ${username} | 
|  | - ${password} | 
|  | - "cat /etc/os-release | grep VERSION_ID | awk -F'=' '{print $2}'" | 
|  | - plugin: | 
|  | - plugin_name: plugin.print_vars.print_vars | 
|  | - plugin_args: | 
|  | - version | 
|  |  | 
|  | where first plugin "version" var is used by another plugin in the YAML | 
|  | block or plugin | 
|  |  | 
|  | """ | 
|  | global global_log_store_path | 
|  | global global_plugin_dict | 
|  | global global_plugin_list | 
|  |  | 
|  | # Hold the plugin return values in dict and plugin return vars in list. | 
|  | # Dict is to reference and update vars processing in parser where as | 
|  | # list is for current vars from the plugin block which needs processing. | 
|  | global_plugin_dict = {} | 
|  | global_plugin_list = [] | 
|  |  | 
|  | # Hold the plugin return named declared if function returned values are | 
|  | # list,dict. | 
|  | # Refer this name list to look up the plugin dict for eval() args function | 
|  | # Example ['version'] | 
|  | global_plugin_type_list = [] | 
|  |  | 
|  | # Path where logs are to be stored or written. | 
|  | global_log_store_path = "" | 
|  |  | 
|  | # Plugin error state defaults. | 
|  | plugin_error_dict = { | 
|  | "exit_on_error": False, | 
|  | "continue_on_error": False, | 
|  | } | 
|  |  | 
|  |  | 
|  | class ffdc_collector: | 
|  | r""" | 
|  | Execute commands from configuration file to collect log files. | 
|  | Fetch and store generated files at the specified location. | 
|  |  | 
|  | """ | 
|  |  | 
|  | def __init__( | 
|  | self, | 
|  | hostname, | 
|  | username, | 
|  | password, | 
|  | port_ssh, | 
|  | port_https, | 
|  | port_ipmi, | 
|  | ffdc_config, | 
|  | location, | 
|  | remote_type, | 
|  | remote_protocol, | 
|  | env_vars, | 
|  | econfig, | 
|  | log_level, | 
|  | ): | 
|  | r""" | 
|  | Initialize the FFDCCollector object with the provided parameters. | 
|  |  | 
|  | This method initializes an FFDCCollector object with the given | 
|  | attributes. The attributes represent the configuration for connecting | 
|  | to a remote system, collecting log data, and storing the collected | 
|  | data. | 
|  |  | 
|  | Parameters: | 
|  | hostname (str):             Name or IP address of the targeted | 
|  | (remote) system. | 
|  | username (str):             User on the targeted system with access | 
|  | to log files. | 
|  | password (str):             Password for the user on the targeted | 
|  | system. | 
|  | port_ssh (int, optional):   SSH port value. Defaults to 22. | 
|  | port_https (int, optional): HTTPS port value. Defaults to 443. | 
|  | port_ipmi (int, optional):  IPMI port value. Defaults to 623. | 
|  | ffdc_config (str):          Configuration file listing commands | 
|  | and files for FFDC. | 
|  | location (str):             Where to store collected log data. | 
|  | remote_type (str):          Block YAML type name of the remote | 
|  | host. | 
|  | remote_protocol (str):      Protocol to use to collect data. | 
|  | env_vars (dict, optional):  User-defined CLI environment variables. | 
|  | Defaults to None. | 
|  | econfig (str, optional):    User-defined environment variables | 
|  | YAML file. Defaults to None. | 
|  | log_level (str, optional):  Log level for the collector. | 
|  | Defaults to "INFO". | 
|  | """ | 
|  |  | 
|  | self.hostname = hostname | 
|  | self.username = username | 
|  | self.password = password | 
|  | self.port_ssh = str(port_ssh) | 
|  | self.port_https = str(port_https) | 
|  | self.port_ipmi = str(port_ipmi) | 
|  | self.ffdc_config = ffdc_config | 
|  | self.location = location + "/" + remote_type.upper() | 
|  | self.ssh_remoteclient = None | 
|  | self.telnet_remoteclient = None | 
|  | self.ffdc_dir_path = "" | 
|  | self.ffdc_prefix = "" | 
|  | self.target_type = remote_type.upper() | 
|  | self.remote_protocol = remote_protocol.upper() | 
|  | self.env_vars = env_vars if env_vars else {} | 
|  | self.econfig = econfig if econfig else {} | 
|  | self.start_time = 0 | 
|  | self.elapsed_time = "" | 
|  | self.env_dict = {} | 
|  | self.logger = None | 
|  |  | 
|  | """ | 
|  | Set prefix values for SCP files and directories. | 
|  | Since the time stamp is at second granularity, these values are set | 
|  | here to be sure that all files for this run will have the same | 
|  | timestamps and be saved in the same directory. | 
|  | self.location == local system for now | 
|  | """ | 
|  | self.set_ffdc_default_store_path() | 
|  |  | 
|  | # Logger for this run.  Need to be after set_ffdc_default_store_path() | 
|  | self.script_logging(getattr(logging, log_level.upper())) | 
|  |  | 
|  | # Verify top level directory exists for storage | 
|  | self.validate_local_store(self.location) | 
|  |  | 
|  | if self.verify_script_env(): | 
|  | try: | 
|  | with open(self.ffdc_config, "r") as file: | 
|  | self.ffdc_actions = yaml.safe_load(file) | 
|  | except yaml.YAMLError as e: | 
|  | self.logger.error(e) | 
|  | sys.exit(-1) | 
|  |  | 
|  | if self.target_type not in self.ffdc_actions: | 
|  | self.logger.error( | 
|  | "\n\tERROR: %s is not listed in %s.\n\n" | 
|  | % (self.target_type, self.ffdc_config) | 
|  | ) | 
|  | sys.exit(-1) | 
|  |  | 
|  | self.logger.info("\n\tENV: User define input YAML variables") | 
|  | self.env_dict = self.load_env() | 
|  | else: | 
|  | sys.exit(-1) | 
|  |  | 
|  | def verify_script_env(self): | 
|  | r""" | 
|  | Verify that all required environment variables are set. | 
|  |  | 
|  | This method checks if all required environment variables are set. | 
|  | If any required variable is missing, the method returns False. | 
|  | Otherwise, it returns True. | 
|  |  | 
|  | Returns: | 
|  | bool: True if all required environment variables are set, | 
|  | False otherwise. | 
|  | """ | 
|  | # Import to log version | 
|  | import click | 
|  | import paramiko | 
|  |  | 
|  | run_env_ok = True | 
|  |  | 
|  | try: | 
|  | redfishtool_version = ( | 
|  | self.run_tool_cmd("redfishtool -V").split(" ")[2].strip("\n") | 
|  | ) | 
|  | except Exception as e: | 
|  | self.logger.error("\tEXCEPTION redfishtool: %s", e) | 
|  | redfishtool_version = "Not Installed (optional)" | 
|  |  | 
|  | try: | 
|  | ipmitool_version = self.run_tool_cmd("ipmitool -V").split(" ")[2] | 
|  | except Exception as e: | 
|  | self.logger.error("\tEXCEPTION ipmitool: %s", e) | 
|  | ipmitool_version = "Not Installed (optional)" | 
|  |  | 
|  | self.logger.info("\n\t---- Script host environment ----") | 
|  | self.logger.info( | 
|  | "\t{:<10}  {:<10}".format("Script hostname", os.uname()[1]) | 
|  | ) | 
|  | self.logger.info( | 
|  | "\t{:<10}  {:<10}".format("Script host os", platform.platform()) | 
|  | ) | 
|  | self.logger.info( | 
|  | "\t{:<10}  {:>10}".format("Python", platform.python_version()) | 
|  | ) | 
|  | self.logger.info("\t{:<10}  {:>10}".format("PyYAML", yaml.__version__)) | 
|  | self.logger.info("\t{:<10}  {:>10}".format("click", click.__version__)) | 
|  | self.logger.info( | 
|  | "\t{:<10}  {:>10}".format("paramiko", paramiko.__version__) | 
|  | ) | 
|  | self.logger.info( | 
|  | "\t{:<10}  {:>9}".format("redfishtool", redfishtool_version) | 
|  | ) | 
|  | self.logger.info( | 
|  | "\t{:<10}  {:>12}".format("ipmitool", ipmitool_version) | 
|  | ) | 
|  |  | 
|  | if eval(yaml.__version__.replace(".", ",")) < (5, 3, 0): | 
|  | self.logger.error( | 
|  | "\n\tERROR: Python or python packages do not meet minimum" | 
|  | " version requirement." | 
|  | ) | 
|  | self.logger.error( | 
|  | "\tERROR: PyYAML version 5.3.0 or higher is needed.\n" | 
|  | ) | 
|  | run_env_ok = False | 
|  |  | 
|  | self.logger.info("\t---- End script host environment ----") | 
|  | return run_env_ok | 
|  |  | 
|  | def script_logging(self, log_level_attr): | 
|  | """ | 
|  | Create a logger for the script with the specified log level. | 
|  |  | 
|  | This method creates a logger for the script with the specified | 
|  | log level. The logger is configured to write log messages to a file | 
|  | and the console. | 
|  |  | 
|  | self.logger = logging.getLogger(__name__) | 
|  |  | 
|  | Setting logger with __name__ will add the trace | 
|  | Example: | 
|  |  | 
|  | INFO:ffdc_collector:    System Type: OPENBMC | 
|  |  | 
|  | Currently, set to empty purposely to log as | 
|  | System Type: OPENBMC | 
|  |  | 
|  | Parameters: | 
|  | log_level_attr (str):   The log level for the logger | 
|  | (e.g., "DEBUG", "INFO", "WARNING", | 
|  | "ERROR", "CRITICAL"). | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | self.logger = logging.getLogger() | 
|  | self.logger.setLevel(log_level_attr) | 
|  |  | 
|  | log_file_handler = logging.FileHandler( | 
|  | self.ffdc_dir_path + "collector.log" | 
|  | ) | 
|  | stdout_handler = logging.StreamHandler(sys.stdout) | 
|  |  | 
|  | self.logger.addHandler(log_file_handler) | 
|  | self.logger.addHandler(stdout_handler) | 
|  |  | 
|  | # Turn off paramiko INFO logging | 
|  | logging.getLogger("paramiko").setLevel(logging.WARNING) | 
|  |  | 
|  | def target_is_pingable(self): | 
|  | r""" | 
|  | Check if the target system is ping-able. | 
|  |  | 
|  | This method checks if the target system is reachable by sending an | 
|  | ICMP echo request (ping). If the target system responds to the ping, | 
|  | the method returns True. Otherwise, it returns False. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the target system is ping-able, False otherwise. | 
|  | """ | 
|  | response = os.system("ping -c 2 %s  2>&1 >/dev/null" % self.hostname) | 
|  | if response == 0: | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s is ping-able.\t\t [OK]" % self.hostname | 
|  | ) | 
|  | return True | 
|  | else: | 
|  | self.logger.error( | 
|  | "\n\tERROR: %s is not ping-able. FFDC collection aborted.\n" | 
|  | % self.hostname | 
|  | ) | 
|  | sys.exit(-1) | 
|  | return False | 
|  |  | 
|  | def collect_ffdc(self): | 
|  | r""" | 
|  | Initiate FFDC collection based on the requested protocol. | 
|  |  | 
|  | This method initiates FFDC (First Failure Data Capture) collection | 
|  | based on the requested protocol (SSH,SCP, TELNET, REDFISH, IPMI). | 
|  | The method establishes a connection to the target system using the | 
|  | specified protocol and collects the required FFDC data. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | self.logger.info( | 
|  | "\n\t---- Start communicating with %s ----" % self.hostname | 
|  | ) | 
|  | self.start_time = time.time() | 
|  |  | 
|  | # Find the list of target and protocol supported. | 
|  | check_protocol_list = [] | 
|  | config_dict = self.ffdc_actions | 
|  |  | 
|  | for target_type in config_dict.keys(): | 
|  | if self.target_type != target_type: | 
|  | continue | 
|  |  | 
|  | for k, v in config_dict[target_type].items(): | 
|  | if v["PROTOCOL"][0] not in check_protocol_list: | 
|  | check_protocol_list.append(v["PROTOCOL"][0]) | 
|  |  | 
|  | self.logger.info( | 
|  | "\n\t %s protocol type: %s" | 
|  | % (self.target_type, check_protocol_list) | 
|  | ) | 
|  |  | 
|  | verified_working_protocol = self.verify_protocol(check_protocol_list) | 
|  |  | 
|  | if verified_working_protocol: | 
|  | self.logger.info( | 
|  | "\n\t---- Completed protocol pre-requisite check ----\n" | 
|  | ) | 
|  |  | 
|  | # Verify top level directory exists for storage | 
|  | self.validate_local_store(self.location) | 
|  |  | 
|  | if (self.remote_protocol not in verified_working_protocol) and ( | 
|  | self.remote_protocol != "ALL" | 
|  | ): | 
|  | self.logger.info( | 
|  | "\n\tWorking protocol list: %s" % verified_working_protocol | 
|  | ) | 
|  | self.logger.error( | 
|  | "\tERROR: Requested protocol %s is not in working protocol" | 
|  | " list.\n" % self.remote_protocol | 
|  | ) | 
|  | sys.exit(-1) | 
|  | else: | 
|  | self.generate_ffdc(verified_working_protocol) | 
|  |  | 
|  | def ssh_to_target_system(self): | 
|  | r""" | 
|  | Establish an SSH connection to the target system. | 
|  |  | 
|  | This method establishes an SSH connection to the target system using | 
|  | the provided hostname, username, password, and SSH port. If the | 
|  | connection is successful, the method returns True. Otherwise, it logs | 
|  | an error message and returns False. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the connection is successful, False otherwise. | 
|  | """ | 
|  |  | 
|  | self.ssh_remoteclient = SSHRemoteclient( | 
|  | self.hostname, self.username, self.password, self.port_ssh | 
|  | ) | 
|  |  | 
|  | if self.ssh_remoteclient.ssh_remoteclient_login(): | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s SSH connection established.\t [OK]" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | # Check scp connection. | 
|  | # If scp connection fails, | 
|  | # continue with FFDC generation but skip scp files to local host. | 
|  | self.ssh_remoteclient.scp_connection() | 
|  | return True | 
|  | else: | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s SSH connection.\t [NOT AVAILABLE]" | 
|  | % self.hostname | 
|  | ) | 
|  | return False | 
|  |  | 
|  | def telnet_to_target_system(self): | 
|  | r""" | 
|  | Establish a Telnet connection to the target system. | 
|  |  | 
|  | This method establishes a Telnet connection to the target system using | 
|  | the provided hostname, username, and Telnet port. If the connection is | 
|  | successful, the method returns True. Otherwise, it logs an error | 
|  | message and returns False. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the connection is successful, False otherwise. | 
|  | """ | 
|  | self.telnet_remoteclient = TelnetRemoteclient( | 
|  | self.hostname, self.username, self.password | 
|  | ) | 
|  | if self.telnet_remoteclient.tn_remoteclient_login(): | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s Telnet connection established.\t [OK]" | 
|  | % self.hostname | 
|  | ) | 
|  | return True | 
|  | else: | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s Telnet connection.\t [NOT AVAILABLE]" | 
|  | % self.hostname | 
|  | ) | 
|  | return False | 
|  |  | 
|  | def generate_ffdc(self, working_protocol_list): | 
|  | r""" | 
|  | Generate FFDC (First Failure Data Capture) based on the remote host | 
|  | type and working protocols. | 
|  |  | 
|  | This method determines the actions to be performed for generating FFDC | 
|  | based on the remote host type and the list of confirmed working | 
|  | protocols. The method iterates through the available actions for the | 
|  | remote host type and checks if any of the working protocols are | 
|  | supported. If a supported protocol is found, the method executes the | 
|  | corresponding FFDC generation action. | 
|  |  | 
|  | Parameters: | 
|  | working_protocol_list (list):  A list of confirmed working | 
|  | protocols to connect to the | 
|  | remote host. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | self.logger.info( | 
|  | "\n\t---- Executing commands on " + self.hostname + " ----" | 
|  | ) | 
|  | self.logger.info( | 
|  | "\n\tWorking protocol list: %s" % working_protocol_list | 
|  | ) | 
|  |  | 
|  | config_dict = self.ffdc_actions | 
|  | for target_type in config_dict.keys(): | 
|  | if self.target_type != target_type: | 
|  | continue | 
|  |  | 
|  | self.logger.info("\n\tFFDC Path: %s " % self.ffdc_dir_path) | 
|  | global_plugin_dict["global_log_store_path"] = self.ffdc_dir_path | 
|  | self.logger.info("\tSystem Type: %s" % target_type) | 
|  | for k, v in config_dict[target_type].items(): | 
|  | protocol = v["PROTOCOL"][0] | 
|  |  | 
|  | if ( | 
|  | self.remote_protocol not in working_protocol_list | 
|  | and self.remote_protocol != "ALL" | 
|  | ) or protocol not in working_protocol_list: | 
|  | continue | 
|  |  | 
|  | if protocol in working_protocol_list: | 
|  | if protocol in ["SSH", "SCP"]: | 
|  | self.protocol_ssh(protocol, target_type, k) | 
|  | elif protocol == "TELNET": | 
|  | self.protocol_telnet(target_type, k) | 
|  | elif protocol in ["REDFISH", "IPMI", "SHELL"]: | 
|  | self.protocol_service_execute(protocol, target_type, k) | 
|  | else: | 
|  | self.logger.error( | 
|  | "\n\tERROR: %s is not available for %s." | 
|  | % (protocol, self.hostname) | 
|  | ) | 
|  |  | 
|  | # Close network connection after collecting all files | 
|  | self.elapsed_time = time.strftime( | 
|  | "%H:%M:%S", time.gmtime(time.time() - self.start_time) | 
|  | ) | 
|  | self.logger.info("\n\tTotal time taken: %s" % self.elapsed_time) | 
|  | if self.ssh_remoteclient: | 
|  | self.ssh_remoteclient.ssh_remoteclient_disconnect() | 
|  | if self.telnet_remoteclient: | 
|  | self.telnet_remoteclient.tn_remoteclient_disconnect() | 
|  |  | 
|  | def protocol_ssh(self, protocol, target_type, sub_type): | 
|  | r""" | 
|  | Perform actions using SSH and SCP protocols. | 
|  |  | 
|  | This method executes a set of commands using the SSH protocol to | 
|  | connect to the target system and collect FFDC data. The method takes | 
|  | the protocol, target type, and sub-type as arguments and performs the | 
|  | corresponding actions based on the provided parameters. | 
|  |  | 
|  | Parameters: | 
|  | protocol (str):    The protocol to execute (SSH or SCP). | 
|  | target_type (str): The type group of the remote host. | 
|  | sub_type (str):    The group type of commands to execute. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | if protocol == "SCP": | 
|  | self.group_copy(self.ffdc_actions[target_type][sub_type]) | 
|  | else: | 
|  | self.collect_and_copy_ffdc( | 
|  | self.ffdc_actions[target_type][sub_type] | 
|  | ) | 
|  |  | 
|  | def protocol_telnet(self, target_type, sub_type): | 
|  | r""" | 
|  | Perform actions using the Telnet protocol. | 
|  |  | 
|  | This method executes a set of commands using the Telnet protocol to | 
|  | connect to the target system and collect FFDC data. The method takes | 
|  | the target type and sub-type as arguments and performs the | 
|  | corresponding actions based on the provided parameters. | 
|  |  | 
|  | Parameters: | 
|  | target_type (str): The type group of the remote host. | 
|  | sub_type (str):    The group type of commands to execute. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | self.logger.info( | 
|  | "\n\t[Run] Executing commands on %s using %s" | 
|  | % (self.hostname, "TELNET") | 
|  | ) | 
|  | telnet_files_saved = [] | 
|  | progress_counter = 0 | 
|  | list_of_commands = self.ffdc_actions[target_type][sub_type]["COMMANDS"] | 
|  | for index, each_cmd in enumerate(list_of_commands, start=0): | 
|  | command_txt, command_timeout = self.unpack_command(each_cmd) | 
|  | result = self.telnet_remoteclient.execute_command( | 
|  | command_txt, command_timeout | 
|  | ) | 
|  | if result: | 
|  | try: | 
|  | targ_file = self.ffdc_actions[target_type][sub_type][ | 
|  | "FILES" | 
|  | ][index] | 
|  | except IndexError: | 
|  | targ_file = command_txt | 
|  | self.logger.warning( | 
|  | "\n\t[WARN] Missing filename to store data from" | 
|  | " telnet %s." % each_cmd | 
|  | ) | 
|  | self.logger.warning( | 
|  | "\t[WARN] Data will be stored in %s." % targ_file | 
|  | ) | 
|  | targ_file_with_path = ( | 
|  | self.ffdc_dir_path + self.ffdc_prefix + targ_file | 
|  | ) | 
|  | # Creates a new file | 
|  | with open(targ_file_with_path, "w") as fp: | 
|  | fp.write(result) | 
|  | fp.close | 
|  | telnet_files_saved.append(targ_file) | 
|  | progress_counter += 1 | 
|  | self.print_progress(progress_counter) | 
|  | self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]") | 
|  | for file in telnet_files_saved: | 
|  | self.logger.info("\n\t\tSuccessfully save file " + file + ".") | 
|  |  | 
|  | def protocol_service_execute(self, protocol, target_type, sub_type): | 
|  | r""" | 
|  | Perform actions for a given protocol. | 
|  |  | 
|  | This method executes a set of commands using the specified protocol to | 
|  | connect to the target system and collect FFDC data. The method takes | 
|  | the protocol, target type, and sub-type as arguments and performs the | 
|  | corresponding actions based on the provided parameters. | 
|  |  | 
|  | Parameters: | 
|  | protocol (str):    The protocol to execute | 
|  | (REDFISH, IPMI, or SHELL). | 
|  | target_type (str): The type group of the remote host. | 
|  | sub_type (str):    The group type of commands to execute. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | self.logger.info( | 
|  | "\n\t[Run] Executing commands to %s using %s" | 
|  | % (self.hostname, protocol) | 
|  | ) | 
|  | executed_files_saved = [] | 
|  | progress_counter = 0 | 
|  | list_of_cmd = self.get_command_list( | 
|  | self.ffdc_actions[target_type][sub_type] | 
|  | ) | 
|  | for index, each_cmd in enumerate(list_of_cmd, start=0): | 
|  | plugin_call = False | 
|  | if isinstance(each_cmd, dict): | 
|  | if "plugin" in each_cmd: | 
|  | # If the error is set and plugin explicitly | 
|  | # requested to skip execution on error.. | 
|  | if plugin_error_dict[ | 
|  | "exit_on_error" | 
|  | ] and self.plugin_error_check(each_cmd["plugin"]): | 
|  | self.logger.info( | 
|  | "\n\t[PLUGIN-ERROR] exit_on_error: %s" | 
|  | % plugin_error_dict["exit_on_error"] | 
|  | ) | 
|  | self.logger.info( | 
|  | "\t[PLUGIN-SKIP] %s" % each_cmd["plugin"][0] | 
|  | ) | 
|  | continue | 
|  | plugin_call = True | 
|  | # call the plugin | 
|  | self.logger.info("\n\t[PLUGIN-START]") | 
|  | result = self.execute_plugin_block(each_cmd["plugin"]) | 
|  | self.logger.info("\t[PLUGIN-END]\n") | 
|  | else: | 
|  | each_cmd = self.yaml_env_and_plugin_vars_populate(each_cmd) | 
|  |  | 
|  | if not plugin_call: | 
|  | result = self.run_tool_cmd(each_cmd) | 
|  | if result: | 
|  | try: | 
|  | file_name = self.get_file_list( | 
|  | self.ffdc_actions[target_type][sub_type] | 
|  | )[index] | 
|  | # If file is specified as None. | 
|  | if file_name == "None": | 
|  | continue | 
|  | targ_file = self.yaml_env_and_plugin_vars_populate( | 
|  | file_name | 
|  | ) | 
|  | except IndexError: | 
|  | targ_file = each_cmd.split("/")[-1] | 
|  | self.logger.warning( | 
|  | "\n\t[WARN] Missing filename to store data from %s." | 
|  | % each_cmd | 
|  | ) | 
|  | self.logger.warning( | 
|  | "\t[WARN] Data will be stored in %s." % targ_file | 
|  | ) | 
|  |  | 
|  | targ_file_with_path = ( | 
|  | self.ffdc_dir_path + self.ffdc_prefix + targ_file | 
|  | ) | 
|  |  | 
|  | # Creates a new file | 
|  | with open(targ_file_with_path, "w") as fp: | 
|  | if isinstance(result, dict): | 
|  | fp.write(json.dumps(result)) | 
|  | else: | 
|  | fp.write(result) | 
|  | fp.close | 
|  | executed_files_saved.append(targ_file) | 
|  |  | 
|  | progress_counter += 1 | 
|  | self.print_progress(progress_counter) | 
|  |  | 
|  | self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]") | 
|  |  | 
|  | for file in executed_files_saved: | 
|  | self.logger.info("\n\t\tSuccessfully save file " + file + ".") | 
|  |  | 
|  | def collect_and_copy_ffdc( | 
|  | self, ffdc_actions_for_target_type, form_filename=False | 
|  | ): | 
|  | r""" | 
|  | Send commands and collect FFDC data from the targeted system. | 
|  |  | 
|  | This method sends a set of commands and collects FFDC data from the | 
|  | targeted system based on the provided ffdc_actions_for_target_type | 
|  | dictionary. The method also has an optional form_filename parameter, | 
|  | which, if set to True, prepends the target type to the output file | 
|  | name. | 
|  |  | 
|  | Parameters: | 
|  | ffdc_actions_for_target_type (dict): A dictionary containing | 
|  | commands and files for the | 
|  | selected remote host type. | 
|  | form_filename (bool, optional):      If True, prepends the target | 
|  | type to the output file name. | 
|  | Defaults to False. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | # Executing commands, if any | 
|  | self.ssh_execute_ffdc_commands( | 
|  | ffdc_actions_for_target_type, form_filename | 
|  | ) | 
|  |  | 
|  | # Copying files | 
|  | if self.ssh_remoteclient.scpclient: | 
|  | self.logger.info( | 
|  | "\n\n\tCopying FFDC files from remote system %s.\n" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | # Retrieving files from target system | 
|  | list_of_files = self.get_file_list(ffdc_actions_for_target_type) | 
|  | self.scp_ffdc( | 
|  | self.ffdc_dir_path, | 
|  | self.ffdc_prefix, | 
|  | form_filename, | 
|  | list_of_files, | 
|  | ) | 
|  | else: | 
|  | self.logger.info( | 
|  | "\n\n\tSkip copying FFDC files from remote system %s.\n" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | def get_command_list(self, ffdc_actions_for_target_type): | 
|  | r""" | 
|  | Fetch a list of commands from the configuration file. | 
|  |  | 
|  | This method retrieves a list of commands from the | 
|  | ffdc_actions_for_target_type dictionary, which contains commands and | 
|  | files for the selected remote host type. The method returns the list | 
|  | of commands. | 
|  |  | 
|  | Parameters: | 
|  | ffdc_actions_for_target_type (dict): A dictionary containing | 
|  | commands and files for the | 
|  | selected remote host type. | 
|  |  | 
|  | Returns: | 
|  | list: A list of commands. | 
|  | """ | 
|  | try: | 
|  | list_of_commands = ffdc_actions_for_target_type["COMMANDS"] | 
|  | except KeyError: | 
|  | list_of_commands = [] | 
|  | return list_of_commands | 
|  |  | 
|  | def get_file_list(self, ffdc_actions_for_target_type): | 
|  | r""" | 
|  | Fetch a list of files from the configuration file. | 
|  |  | 
|  | This method retrieves a list of files from the | 
|  | ffdc_actions_for_target_type dictionary, which contains commands and | 
|  | files for the selected remote host type. The method returns the list | 
|  | of files. | 
|  |  | 
|  | Parameters: | 
|  | ffdc_actions_for_target_type (dict): A dictionary containing | 
|  | commands and files for the | 
|  | selected remote host type. | 
|  |  | 
|  | Returns: | 
|  | list: A list of files. | 
|  | """ | 
|  | try: | 
|  | list_of_files = ffdc_actions_for_target_type["FILES"] | 
|  | except KeyError: | 
|  | list_of_files = [] | 
|  | return list_of_files | 
|  |  | 
|  | def unpack_command(self, command): | 
|  | r""" | 
|  | Unpack a command from the configuration file, handling both dictionary | 
|  | and string inputs. | 
|  |  | 
|  | This method takes a command from the configuration file, which can be | 
|  | either a dictionary or a string. If the input is a dictionary, the | 
|  | method extracts the command text and timeout from the dictionary. | 
|  | If the input is a string, the method assumes a default timeout of | 
|  | 60 seconds. | 
|  | The method returns a tuple containing the command text and timeout. | 
|  |  | 
|  | Parameters: | 
|  | command (dict or str): A command from the configuration file, | 
|  | which can be either a dictionary or a | 
|  | string. | 
|  |  | 
|  | Returns: | 
|  | tuple: A tuple containing the command text and timeout. | 
|  | """ | 
|  | if isinstance(command, dict): | 
|  | command_txt = next(iter(command)) | 
|  | command_timeout = next(iter(command.values())) | 
|  | elif isinstance(command, str): | 
|  | command_txt = command | 
|  | # Default command timeout 60 seconds | 
|  | command_timeout = 60 | 
|  |  | 
|  | return command_txt, command_timeout | 
|  |  | 
|  | def ssh_execute_ffdc_commands( | 
|  | self, ffdc_actions_for_target_type, form_filename=False | 
|  | ): | 
|  | r""" | 
|  | Send commands in the ffdc_config file to the targeted system using SSH. | 
|  |  | 
|  | This method sends a set of commands and collects FFDC data from the | 
|  | targeted system using the SSH protocol. The method takes the | 
|  | ffdc_actions_for_target_type dictionary and an optional | 
|  | form_filename parameter as arguments. | 
|  |  | 
|  | If form_filename is set to True, the method prepends the target type | 
|  | to the output file name. The method returns the output of the executed | 
|  | commands. | 
|  |  | 
|  | It also prints the progress counter string + on the console. | 
|  |  | 
|  | Parameters: | 
|  | ffdc_actions_for_target_type (dict): A dictionary containing | 
|  | commands and files for the | 
|  | selected remote host type. | 
|  | form_filename (bool, optional):      If True, prepends the target | 
|  | type to the output file name. | 
|  | Defaults to False. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | self.logger.info( | 
|  | "\n\t[Run] Executing commands on %s using %s" | 
|  | % (self.hostname, ffdc_actions_for_target_type["PROTOCOL"][0]) | 
|  | ) | 
|  |  | 
|  | list_of_commands = self.get_command_list(ffdc_actions_for_target_type) | 
|  | # If command list is empty, returns | 
|  | if not list_of_commands: | 
|  | return | 
|  |  | 
|  | progress_counter = 0 | 
|  | for command in list_of_commands: | 
|  | command_txt, command_timeout = self.unpack_command(command) | 
|  |  | 
|  | if form_filename: | 
|  | command_txt = str(command_txt % self.target_type) | 
|  |  | 
|  | ( | 
|  | cmd_exit_code, | 
|  | err, | 
|  | response, | 
|  | ) = self.ssh_remoteclient.execute_command( | 
|  | command_txt, command_timeout | 
|  | ) | 
|  |  | 
|  | if cmd_exit_code: | 
|  | self.logger.warning( | 
|  | "\n\t\t[WARN] %s exits with code %s." | 
|  | % (command_txt, str(cmd_exit_code)) | 
|  | ) | 
|  | self.logger.warning("\t\t[WARN] %s " % err) | 
|  |  | 
|  | progress_counter += 1 | 
|  | self.print_progress(progress_counter) | 
|  |  | 
|  | self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]") | 
|  |  | 
|  | def group_copy(self, ffdc_actions_for_target_type): | 
|  | r""" | 
|  | SCP a group of files (wildcard) from the remote host. | 
|  |  | 
|  | This method copies a group of files from the remote host using the SCP | 
|  | protocol. The method takes the fdc_actions_for_target_type dictionary | 
|  | as an argument, which contains commands and files for the selected | 
|  | remote host type. | 
|  |  | 
|  | Parameters: | 
|  | fdc_actions_for_target_type (dict): A dictionary containing | 
|  | commands and files for the | 
|  | selected remote host type. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | if self.ssh_remoteclient.scpclient: | 
|  | self.logger.info( | 
|  | "\n\tCopying files from remote system %s via SCP.\n" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | list_of_commands = self.get_command_list( | 
|  | ffdc_actions_for_target_type | 
|  | ) | 
|  | # If command list is empty, returns | 
|  | if not list_of_commands: | 
|  | return | 
|  |  | 
|  | for command in list_of_commands: | 
|  | try: | 
|  | command = self.yaml_env_and_plugin_vars_populate(command) | 
|  | except IndexError: | 
|  | self.logger.error("\t\tInvalid command %s" % command) | 
|  | continue | 
|  |  | 
|  | ( | 
|  | cmd_exit_code, | 
|  | err, | 
|  | response, | 
|  | ) = self.ssh_remoteclient.execute_command(command) | 
|  |  | 
|  | # If file does not exist, code take no action. | 
|  | # cmd_exit_code is ignored for this scenario. | 
|  | if response: | 
|  | scp_result = self.ssh_remoteclient.scp_file_from_remote( | 
|  | response.split("\n"), self.ffdc_dir_path | 
|  | ) | 
|  | if scp_result: | 
|  | self.logger.info( | 
|  | "\t\tSuccessfully copied from " | 
|  | + self.hostname | 
|  | + ":" | 
|  | + command | 
|  | ) | 
|  | else: | 
|  | self.logger.info("\t\t%s has no result" % command) | 
|  |  | 
|  | else: | 
|  | self.logger.info( | 
|  | "\n\n\tSkip copying files from remote system %s.\n" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | def scp_ffdc( | 
|  | self, | 
|  | targ_dir_path, | 
|  | targ_file_prefix, | 
|  | form_filename, | 
|  | file_list=None, | 
|  | quiet=None, | 
|  | ): | 
|  | r""" | 
|  | SCP all files in the file_dict to the indicated directory on the local | 
|  | system. | 
|  |  | 
|  | This method copies all files specified in the file_dict dictionary | 
|  | from the targeted system to the local system using the SCP protocol. | 
|  | The method takes the target directory path, target file prefix, and a | 
|  | boolean flag form_filename as required arguments. | 
|  |  | 
|  | The file_dict argument is optional and contains the files to be copied. | 
|  | The quiet argument is also optional and, if set to True, suppresses | 
|  | the output of the SCP operation. | 
|  |  | 
|  | Parameters: | 
|  | targ_dir_path (str):        The path of the directory to receive | 
|  | the files on the local system. | 
|  | targ_file_prefix (str):     Prefix which will be prepended to each | 
|  | target file's name. | 
|  | form_filename (bool):       If True, prepends the target type to | 
|  | the file names. | 
|  | file_dict (dict, optional): A dictionary containing the files to | 
|  | be copied. Defaults to None. | 
|  | quiet (bool, optional):     If True, suppresses the output of the | 
|  | SCP operation. Defaults to None. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | progress_counter = 0 | 
|  | for filename in file_list: | 
|  | if form_filename: | 
|  | filename = str(filename % self.target_type) | 
|  | source_file_path = filename | 
|  | targ_file_path = ( | 
|  | targ_dir_path + targ_file_prefix + filename.split("/")[-1] | 
|  | ) | 
|  |  | 
|  | # If source file name contains wild card, copy filename as is. | 
|  | if "*" in source_file_path: | 
|  | scp_result = self.ssh_remoteclient.scp_file_from_remote( | 
|  | source_file_path, self.ffdc_dir_path | 
|  | ) | 
|  | else: | 
|  | scp_result = self.ssh_remoteclient.scp_file_from_remote( | 
|  | source_file_path, targ_file_path | 
|  | ) | 
|  |  | 
|  | if not quiet: | 
|  | if scp_result: | 
|  | self.logger.info( | 
|  | "\t\tSuccessfully copied from " | 
|  | + self.hostname | 
|  | + ":" | 
|  | + source_file_path | 
|  | + ".\n" | 
|  | ) | 
|  | else: | 
|  | self.logger.info( | 
|  | "\t\tFail to copy from " | 
|  | + self.hostname | 
|  | + ":" | 
|  | + source_file_path | 
|  | + ".\n" | 
|  | ) | 
|  | else: | 
|  | progress_counter += 1 | 
|  | self.print_progress(progress_counter) | 
|  |  | 
|  | def set_ffdc_default_store_path(self): | 
|  | r""" | 
|  | Set default values for self.ffdc_dir_path and self.ffdc_prefix. | 
|  |  | 
|  | This method sets default values for the self.ffdc_dir_path and | 
|  | self.ffdc_prefix class variables. | 
|  |  | 
|  | The collected FFDC files will be stored in the directory | 
|  | /self.location/hostname_timestr/, with individual files having the | 
|  | format timestr_filename where timestr is in %Y%m%d-%H%M%S. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | timestr = time.strftime("%Y%m%d-%H%M%S") | 
|  | self.ffdc_dir_path = ( | 
|  | self.location + "/" + self.hostname + "_" + timestr + "/" | 
|  | ) | 
|  | self.ffdc_prefix = timestr + "_" | 
|  | self.validate_local_store(self.ffdc_dir_path) | 
|  |  | 
|  | # Need to verify local store path exists prior to instantiate this class. | 
|  | # This class method to validate log path before referencing this class. | 
|  | @classmethod | 
|  | def validate_local_store(cls, dir_path): | 
|  | r""" | 
|  | Ensure the specified directory exists to store FFDC files locally. | 
|  |  | 
|  | This method checks if the provided dir_path exists. If the directory | 
|  | does not exist, the method creates it. The method does not return any | 
|  | value. | 
|  |  | 
|  | Parameters: | 
|  | dir_path (str): The directory path where collected FFDC data files | 
|  | will be stored. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | if not os.path.exists(dir_path): | 
|  | try: | 
|  | os.makedirs(dir_path, 0o755) | 
|  | except (IOError, OSError) as e: | 
|  | # PermissionError | 
|  | if e.errno == EPERM or e.errno == EACCES: | 
|  | print( | 
|  | "\tERROR: os.makedirs %s failed with" | 
|  | " PermissionError.\n" % dir_path | 
|  | ) | 
|  | else: | 
|  | print( | 
|  | "\tERROR: os.makedirs %s failed with %s.\n" | 
|  | % (dir_path, e.strerror) | 
|  | ) | 
|  | sys.exit(-1) | 
|  |  | 
|  | def print_progress(self, progress): | 
|  | r""" | 
|  | Print the current activity progress. | 
|  |  | 
|  | This method prints the current activity progress using the provided | 
|  | progress counter. The method does not return any value. | 
|  |  | 
|  | Parameters: | 
|  | progress (int): The current activity progress counter. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | """ | 
|  | sys.stdout.write("\r\t" + "+" * progress) | 
|  | sys.stdout.flush() | 
|  | time.sleep(0.1) | 
|  |  | 
|  | def verify_redfish(self): | 
|  | r""" | 
|  | Verify if the remote host has the Redfish service active. | 
|  |  | 
|  | This method checks if the remote host has the Redfish service active | 
|  | by sending a GET request to the Redfish base URL /redfish/v1/. | 
|  | If the request is successful (status code 200), the method returns | 
|  | stdout output of the run else error message. | 
|  |  | 
|  | Returns: | 
|  | str: Redfish service executed output. | 
|  | """ | 
|  | redfish_parm = ( | 
|  | "redfishtool -r " | 
|  | + self.hostname | 
|  | + ":" | 
|  | + self.port_https | 
|  | + " -S Always raw GET /redfish/v1/" | 
|  | ) | 
|  | return self.run_tool_cmd(redfish_parm, True) | 
|  |  | 
|  | def verify_ipmi(self): | 
|  | r""" | 
|  | Verify if the remote host has the IPMI LAN service active. | 
|  |  | 
|  | This method checks if the remote host has the IPMI LAN service active | 
|  | by sending an IPMI "power status" command. | 
|  |  | 
|  | If the command is successful (returns a non-empty response), | 
|  | else error message. | 
|  |  | 
|  | Returns: | 
|  | str: IPMI LAN service executed output. | 
|  | """ | 
|  | if self.target_type == "OPENBMC": | 
|  | ipmi_parm = ( | 
|  | "ipmitool -I lanplus -C 17  -U " | 
|  | + self.username | 
|  | + " -P " | 
|  | + self.password | 
|  | + " -H " | 
|  | + self.hostname | 
|  | + " -p " | 
|  | + str(self.port_ipmi) | 
|  | + " power status" | 
|  | ) | 
|  | else: | 
|  | ipmi_parm = ( | 
|  | "ipmitool -I lanplus  -P " | 
|  | + self.password | 
|  | + " -H " | 
|  | + self.hostname | 
|  | + " -p " | 
|  | + str(self.port_ipmi) | 
|  | + " power status" | 
|  | ) | 
|  |  | 
|  | return self.run_tool_cmd(ipmi_parm, True) | 
|  |  | 
|  | def run_tool_cmd(self, parms_string, quiet=False): | 
|  | r""" | 
|  | Run a CLI standard tool or script with the provided command options. | 
|  |  | 
|  | This method runs a CLI standard tool or script with the provided | 
|  | parms_string command options. If the quiet parameter is set to True, | 
|  | the method suppresses the output of the command. | 
|  | The method returns the output of the command as a string. | 
|  |  | 
|  | Parameters: | 
|  | parms_string (str):     The command options for the CLI tool or | 
|  | script. | 
|  | quiet (bool, optional): If True, suppresses the output of the | 
|  | command. Defaults to False. | 
|  |  | 
|  | Returns: | 
|  | str: The output of the command as a string. | 
|  | """ | 
|  |  | 
|  | result = subprocess.run( | 
|  | [parms_string], | 
|  | stdout=subprocess.PIPE, | 
|  | stderr=subprocess.PIPE, | 
|  | shell=True, | 
|  | universal_newlines=True, | 
|  | ) | 
|  |  | 
|  | if result.stderr and not quiet: | 
|  | if self.password in parms_string: | 
|  | parms_string = parms_string.replace(self.password, "********") | 
|  | self.logger.error("\n\t\tERROR with %s " % parms_string) | 
|  | self.logger.error("\t\t" + result.stderr) | 
|  |  | 
|  | return result.stdout | 
|  |  | 
|  | def verify_protocol(self, protocol_list): | 
|  | r""" | 
|  | Perform a working check for the provided list of protocols. | 
|  |  | 
|  | This method checks if the specified protocols are available on the | 
|  | remote host. The method iterates through the protocol_list and | 
|  | attempts to establish a connection using each protocol. | 
|  |  | 
|  | If a connection is successfully established, the method append to the | 
|  | list and if any protocol fails to connect, the method ignores it. | 
|  |  | 
|  | Parameters: | 
|  | protocol_list (list):   A list of protocols to check. | 
|  |  | 
|  | Returns: | 
|  | list: All protocols are available list. | 
|  | """ | 
|  |  | 
|  | tmp_list = [] | 
|  | if self.target_is_pingable(): | 
|  | tmp_list.append("SHELL") | 
|  |  | 
|  | for protocol in protocol_list: | 
|  | if self.remote_protocol != "ALL": | 
|  | if self.remote_protocol != protocol: | 
|  | continue | 
|  |  | 
|  | # Only check SSH/SCP once for both protocols | 
|  | if ( | 
|  | protocol == "SSH" | 
|  | or protocol == "SCP" | 
|  | and protocol not in tmp_list | 
|  | ): | 
|  | if self.ssh_to_target_system(): | 
|  | # Add only what user asked. | 
|  | if self.remote_protocol != "ALL": | 
|  | tmp_list.append(self.remote_protocol) | 
|  | else: | 
|  | tmp_list.append("SSH") | 
|  | tmp_list.append("SCP") | 
|  |  | 
|  | if protocol == "TELNET": | 
|  | if self.telnet_to_target_system(): | 
|  | tmp_list.append(protocol) | 
|  |  | 
|  | if protocol == "REDFISH": | 
|  | if self.verify_redfish(): | 
|  | tmp_list.append(protocol) | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s Redfish Service.\t\t [OK]" | 
|  | % self.hostname | 
|  | ) | 
|  | else: | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s Redfish Service.\t\t [NOT AVAILABLE]" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | if protocol == "IPMI": | 
|  | if self.verify_ipmi(): | 
|  | tmp_list.append(protocol) | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s IPMI LAN Service.\t\t [OK]" | 
|  | % self.hostname | 
|  | ) | 
|  | else: | 
|  | self.logger.info( | 
|  | "\n\t[Check] %s IPMI LAN Service.\t\t [NOT AVAILABLE]" | 
|  | % self.hostname | 
|  | ) | 
|  |  | 
|  | return tmp_list | 
|  |  | 
|  | def load_env(self): | 
|  | r""" | 
|  | Load the user environment variables from a YAML file. | 
|  |  | 
|  | This method reads the environment variables from a YAML file specified | 
|  | in the ENV_FILE environment variable. If the file is not found or | 
|  | there is an error reading the file, an exception is raised. | 
|  |  | 
|  | The YAML file should have the following format: | 
|  |  | 
|  | .. code-block:: yaml | 
|  |  | 
|  | VAR_NAME: VAR_VALUE | 
|  |  | 
|  | Where VAR_NAME is the name of the environment variable, and | 
|  | VAR_VALUE is its value. | 
|  |  | 
|  | After loading the environment variables, they are stored in the | 
|  | self.env attribute for later use. | 
|  | """ | 
|  |  | 
|  | os.environ["hostname"] = self.hostname | 
|  | os.environ["username"] = self.username | 
|  | os.environ["password"] = self.password | 
|  | os.environ["port_ssh"] = self.port_ssh | 
|  | os.environ["port_https"] = self.port_https | 
|  | os.environ["port_ipmi"] = self.port_ipmi | 
|  |  | 
|  | # Append default Env. | 
|  | self.env_dict["hostname"] = self.hostname | 
|  | self.env_dict["username"] = self.username | 
|  | self.env_dict["password"] = self.password | 
|  | self.env_dict["port_ssh"] = self.port_ssh | 
|  | self.env_dict["port_https"] = self.port_https | 
|  | self.env_dict["port_ipmi"] = self.port_ipmi | 
|  |  | 
|  | try: | 
|  | tmp_env_dict = {} | 
|  | if self.env_vars: | 
|  | tmp_env_dict = json.loads(self.env_vars) | 
|  | # Export ENV vars default. | 
|  | for key, value in tmp_env_dict.items(): | 
|  | os.environ[key] = value | 
|  | self.env_dict[key] = str(value) | 
|  |  | 
|  | # Load user specified ENV config YAML. | 
|  | if self.econfig: | 
|  | with open(self.econfig, "r") as file: | 
|  | try: | 
|  | tmp_env_dict = yaml.load(file, Loader=yaml.SafeLoader) | 
|  | except yaml.YAMLError as e: | 
|  | self.logger.error(e) | 
|  | sys.exit(-1) | 
|  | # Export ENV vars. | 
|  | for key, value in tmp_env_dict["env_params"].items(): | 
|  | os.environ[key] = str(value) | 
|  | self.env_dict[key] = str(value) | 
|  | except json.decoder.JSONDecodeError as e: | 
|  | self.logger.error("\n\tERROR: %s " % e) | 
|  | sys.exit(-1) | 
|  | except FileNotFoundError as e: | 
|  | self.logger.error("\n\tERROR: %s " % e) | 
|  | sys.exit(-1) | 
|  |  | 
|  | # This to mask the password from displaying on the console. | 
|  | mask_dict = self.env_dict.copy() | 
|  | for k, v in mask_dict.items(): | 
|  | if k.lower().find("password") != -1: | 
|  | hidden_text = [] | 
|  | hidden_text.append(v) | 
|  | password_regex = ( | 
|  | "(" + "|".join([re.escape(x) for x in hidden_text]) + ")" | 
|  | ) | 
|  | mask_dict[k] = re.sub(password_regex, "********", v) | 
|  |  | 
|  | self.logger.info(json.dumps(mask_dict, indent=8, sort_keys=False)) | 
|  |  | 
|  | def execute_python_eval(self, eval_string): | 
|  | r""" | 
|  | Execute a qualified Python function string using the eval() function. | 
|  |  | 
|  | This method executes a provided Python function string using the | 
|  | eval() function. | 
|  |  | 
|  | The method takes the eval_string as an argument, which is expected to | 
|  | be a valid Python function call. | 
|  |  | 
|  | The method returns the result of the executed function. | 
|  |  | 
|  | Example: | 
|  | eval(plugin.foo_func.foo_func(10)) | 
|  |  | 
|  | Parameters: | 
|  | eval_string (str): A valid Python function call string. | 
|  |  | 
|  | Returns: | 
|  | str: The result of the executed function and on failure return | 
|  | PLUGIN_EVAL_ERROR. | 
|  | """ | 
|  | try: | 
|  | self.logger.info("\tExecuting plugin func()") | 
|  | self.logger.debug("\tCall func: %s" % eval_string) | 
|  | result = eval(eval_string) | 
|  | self.logger.info("\treturn: %s" % str(result)) | 
|  | except ( | 
|  | ValueError, | 
|  | SyntaxError, | 
|  | NameError, | 
|  | AttributeError, | 
|  | TypeError, | 
|  | ) as e: | 
|  | self.logger.error("\tERROR: execute_python_eval: %s" % e) | 
|  | # Set the plugin error state. | 
|  | plugin_error_dict["exit_on_error"] = True | 
|  | self.logger.info("\treturn: PLUGIN_EVAL_ERROR") | 
|  | return "PLUGIN_EVAL_ERROR" | 
|  |  | 
|  | return result | 
|  |  | 
|  | def execute_plugin_block(self, plugin_cmd_list): | 
|  | r""" | 
|  | Pack the plugin commands into qualified Python string objects. | 
|  |  | 
|  | This method processes the plugin_cmd_list argument, which is expected | 
|  | to contain a list of plugin commands read from a YAML file. The method | 
|  | iterates through the list, constructs a qualified Python string object | 
|  | for each plugin command, and returns a list of these string objects. | 
|  |  | 
|  | Parameters: | 
|  | plugin_cmd_list (list): A list of plugin commands containing | 
|  | plugin names and arguments. | 
|  | Plugin block read from YAML | 
|  | [ | 
|  | {'plugin_name':'plugin.foo_func.my_func'}, | 
|  | {'plugin_args':[10]}, | 
|  | ] | 
|  |  | 
|  | Example: | 
|  | Execute and no return response | 
|  | - plugin: | 
|  | - plugin_name: plugin.foo_func.my_func | 
|  | - plugin_args: | 
|  | - arg1 | 
|  | - arg2 | 
|  |  | 
|  | Execute and return a response | 
|  | - plugin: | 
|  | - plugin_name: result = plugin.foo_func.my_func | 
|  | - plugin_args: | 
|  | - arg1 | 
|  | - arg2 | 
|  |  | 
|  | Execute and return multiple values response | 
|  | - plugin: | 
|  | - plugin_name: result1,result2 = plugin.foo_func.my_func | 
|  | - plugin_args: | 
|  | - arg1 | 
|  | - arg2 | 
|  |  | 
|  | Returns: | 
|  | str: Execute and not response or a string value(s) responses, | 
|  |  | 
|  | """ | 
|  | try: | 
|  | idx = self.key_index_list_dict("plugin_name", plugin_cmd_list) | 
|  | plugin_name = plugin_cmd_list[idx]["plugin_name"] | 
|  | # Equal separator means plugin function returns result. | 
|  | if " = " in plugin_name: | 
|  | # Ex. ['result', 'plugin.foo_func.my_func'] | 
|  | plugin_name_args = plugin_name.split(" = ") | 
|  | # plugin func return data. | 
|  | for arg in plugin_name_args: | 
|  | if arg == plugin_name_args[-1]: | 
|  | plugin_name = arg | 
|  | else: | 
|  | plugin_resp = arg.split(",") | 
|  | # ['result1','result2'] | 
|  | for x in plugin_resp: | 
|  | global_plugin_list.append(x) | 
|  | global_plugin_dict[x] = "" | 
|  |  | 
|  | # Walk the plugin args ['arg1,'arg2'] | 
|  | # If the YAML plugin statement 'plugin_args' is not declared. | 
|  | plugin_args = [] | 
|  | if any("plugin_args" in d for d in plugin_cmd_list): | 
|  | idx = self.key_index_list_dict("plugin_args", plugin_cmd_list) | 
|  | if idx is not None: | 
|  | plugin_args = plugin_cmd_list[idx].get("plugin_args", []) | 
|  | plugin_args = self.yaml_args_populate(plugin_args) | 
|  | else: | 
|  | plugin_args = self.yaml_args_populate([]) | 
|  |  | 
|  | # Pack the args list to string parameters for plugin function. | 
|  | parm_args_str = self.yaml_args_string(plugin_args) | 
|  |  | 
|  | """ | 
|  | Example of plugin_func: | 
|  | plugin.redfish.enumerate_request( | 
|  | "xx.xx.xx.xx:443", | 
|  | "root", | 
|  | "********", | 
|  | "/redfish/v1/", | 
|  | "json") | 
|  | """ | 
|  | if parm_args_str: | 
|  | plugin_func = f"{plugin_name}({parm_args_str})" | 
|  | else: | 
|  | plugin_func = f"{plugin_name}()" | 
|  |  | 
|  | # Execute plugin function. | 
|  | if global_plugin_dict: | 
|  | resp = self.execute_python_eval(plugin_func) | 
|  | # Update plugin vars dict if there is any. | 
|  | if resp != "PLUGIN_EVAL_ERROR": | 
|  | self.response_args_data(resp) | 
|  | else: | 
|  | resp = self.execute_python_eval(plugin_func) | 
|  | except Exception as e: | 
|  | # Set the plugin error state. | 
|  | plugin_error_dict["exit_on_error"] = True | 
|  | self.logger.error("\tERROR: execute_plugin_block: %s" % e) | 
|  | pass | 
|  |  | 
|  | # There is a real error executing the plugin function. | 
|  | if resp == "PLUGIN_EVAL_ERROR": | 
|  | return resp | 
|  |  | 
|  | # Check if plugin_expects_return (int, string, list,dict etc) | 
|  | if any("plugin_expects_return" in d for d in plugin_cmd_list): | 
|  | idx = self.key_index_list_dict( | 
|  | "plugin_expects_return", plugin_cmd_list | 
|  | ) | 
|  | plugin_expects = plugin_cmd_list[idx]["plugin_expects_return"] | 
|  | if plugin_expects: | 
|  | if resp: | 
|  | if ( | 
|  | self.plugin_expect_type(plugin_expects, resp) | 
|  | == "INVALID" | 
|  | ): | 
|  | self.logger.error("\tWARN: Plugin error check skipped") | 
|  | elif not self.plugin_expect_type(plugin_expects, resp): | 
|  | self.logger.error( | 
|  | "\tERROR: Plugin expects return data: %s" | 
|  | % plugin_expects | 
|  | ) | 
|  | plugin_error_dict["exit_on_error"] = True | 
|  | elif not resp: | 
|  | self.logger.error( | 
|  | "\tERROR: Plugin func failed to return data" | 
|  | ) | 
|  | plugin_error_dict["exit_on_error"] = True | 
|  |  | 
|  | return resp | 
|  |  | 
|  | def response_args_data(self, plugin_resp): | 
|  | r""" | 
|  | Parse the plugin function response and update plugin return variable. | 
|  |  | 
|  | plugin_resp       Response data from plugin function. | 
|  | """ | 
|  | resp_list = [] | 
|  | resp_data = "" | 
|  |  | 
|  | # There is nothing to update the plugin response. | 
|  | if len(global_plugin_list) == 0 or plugin_resp == "None": | 
|  | return | 
|  |  | 
|  | if isinstance(plugin_resp, str): | 
|  | resp_data = plugin_resp.strip("\r\n\t") | 
|  | resp_list.append(resp_data) | 
|  | elif isinstance(plugin_resp, bytes): | 
|  | resp_data = str(plugin_resp, "UTF-8").strip("\r\n\t") | 
|  | resp_list.append(resp_data) | 
|  | elif isinstance(plugin_resp, tuple): | 
|  | if len(global_plugin_list) == 1: | 
|  | resp_list.append(plugin_resp) | 
|  | else: | 
|  | resp_list = list(plugin_resp) | 
|  | resp_list = [x.strip("\r\n\t") for x in resp_list] | 
|  | elif isinstance(plugin_resp, list): | 
|  | if len(global_plugin_list) == 1: | 
|  | resp_list.append([x.strip("\r\n\t") for x in plugin_resp]) | 
|  | else: | 
|  | resp_list = [x.strip("\r\n\t") for x in plugin_resp] | 
|  | elif isinstance(plugin_resp, int) or isinstance(plugin_resp, float): | 
|  | resp_list.append(plugin_resp) | 
|  |  | 
|  | # Iterate if there is a list of plugin return vars to update. | 
|  | for idx, item in enumerate(resp_list, start=0): | 
|  | # Exit loop, done required loop. | 
|  | if idx >= len(global_plugin_list): | 
|  | break | 
|  | # Find the index of the return func in the list and | 
|  | # update the global func return dictionary. | 
|  | try: | 
|  | dict_idx = global_plugin_list[idx] | 
|  | global_plugin_dict[dict_idx] = item | 
|  | except (IndexError, ValueError) as e: | 
|  | self.logger.warn("\tWARN: response_args_data: %s" % e) | 
|  | pass | 
|  |  | 
|  | # Done updating plugin dict irrespective of pass or failed, | 
|  | # clear all the list element for next plugin block execute. | 
|  | global_plugin_list.clear() | 
|  |  | 
|  | def yaml_args_string(self, plugin_args): | 
|  | r""" | 
|  | Pack the arguments into a string representation. | 
|  |  | 
|  | This method processes the plugin_arg argument, which is expected to | 
|  | contain a list of arguments. The method iterates through the list, | 
|  | converts each argument to a string, and concatenates them into a | 
|  | single string. Special handling is applied for integer, float, and | 
|  | predefined plugin variable types. | 
|  |  | 
|  | Ecample: | 
|  | From | 
|  | ['xx.xx.xx.xx:443', 'root', '********', '/redfish/v1/', 'json'] | 
|  | to | 
|  | "xx.xx.xx.xx:443","root","********","/redfish/v1/","json" | 
|  |  | 
|  | Parameters: | 
|  | plugin_args (list):   A list of arguments to be packed into | 
|  | a string. | 
|  |  | 
|  | Returns: | 
|  | str:   A string representation of the arguments. | 
|  | """ | 
|  | args_str = "" | 
|  |  | 
|  | for i, arg in enumerate(plugin_args): | 
|  | if arg: | 
|  | if isinstance(arg, (int, float)): | 
|  | args_str += str(arg) | 
|  | elif arg in global_plugin_type_list: | 
|  | args_str += str(global_plugin_dict[arg]) | 
|  | else: | 
|  | args_str += f'"{arg.strip("\r\n\t")}"' | 
|  |  | 
|  | # Skip last list element. | 
|  | if i != len(plugin_args) - 1: | 
|  | args_str += "," | 
|  |  | 
|  | return args_str | 
|  |  | 
|  | def yaml_args_populate(self, yaml_arg_list): | 
|  | r""" | 
|  | Decode environment and plugin variables and populate the argument list. | 
|  |  | 
|  | This method processes the yaml_arg_list argument, which is expected to | 
|  | contain a list of arguments read from a YAML file. The method iterates | 
|  | through the list, decodes environment and plugin variables, and | 
|  | returns a populated list of arguments. | 
|  |  | 
|  | .. code-block:: yaml | 
|  |  | 
|  | - plugin_args: | 
|  | - arg1 | 
|  | - arg2 | 
|  |  | 
|  | ['${hostname}:${port_https}', '${username}', '/redfish/v1/', 'json'] | 
|  |  | 
|  | Returns the populated plugin list | 
|  | ['xx.xx.xx.xx:443', 'root', '/redfish/v1/', 'json'] | 
|  |  | 
|  | Parameters: | 
|  | yaml_arg_list (list):   A list of arguments containing environment | 
|  | and plugin variables. | 
|  |  | 
|  | Returns: | 
|  | list:   A populated list of arguments with decoded environment and | 
|  | plugin variables. | 
|  | """ | 
|  | if isinstance(yaml_arg_list, list): | 
|  | populated_list = [] | 
|  | for arg in yaml_arg_list: | 
|  | if isinstance(arg, (int, float)): | 
|  | populated_list.append(arg) | 
|  | elif isinstance(arg, str): | 
|  | arg_str = self.yaml_env_and_plugin_vars_populate(str(arg)) | 
|  | populated_list.append(arg_str) | 
|  | else: | 
|  | populated_list.append(arg) | 
|  |  | 
|  | return populated_list | 
|  |  | 
|  | def yaml_env_and_plugin_vars_populate(self, yaml_arg_str): | 
|  | r""" | 
|  | Update environment variables and plugin variables based on the | 
|  | provided YAML argument string. | 
|  |  | 
|  | This method processes the yaml_arg_str argument, which is expected | 
|  | to contain a string representing environment variables and plugin | 
|  | variables in the format: | 
|  |  | 
|  | .. code-block:: yaml | 
|  |  | 
|  | - cat ${MY_VAR} | 
|  | - ls -AX my_plugin_var | 
|  |  | 
|  | The method parses the string, extracts the variable names, and updates | 
|  | the corresponding environment variables and plugin variables. | 
|  |  | 
|  | Parameters: | 
|  | yaml_arg_str (str):   A string containing environment and plugin | 
|  | variable definitions in YAML format. | 
|  |  | 
|  | Returns: | 
|  | str:   The updated YAML argument string with plugin variables | 
|  | replaced. | 
|  | """ | 
|  |  | 
|  | # Parse and convert the Plugin YAML vars string to python vars | 
|  | # Example: | 
|  | #   ${my_hostname}:${port_https} -> ['my_hostname', 'port_https'] | 
|  | try: | 
|  | # Example, list of matching | 
|  | # env vars ['username', 'password', 'hostname'] | 
|  | # Extra escape \ for special symbols. '\$\{([^\}]+)\}' works good. | 
|  | env_var_regex = r"\$\{([^\}]+)\}" | 
|  | env_var_names_list = re.findall(env_var_regex, yaml_arg_str) | 
|  |  | 
|  | for var in env_var_names_list: | 
|  | env_var = os.environ.get(var) | 
|  | if env_var: | 
|  | env_replace = "${" + var + "}" | 
|  | yaml_arg_str = yaml_arg_str.replace(env_replace, env_var) | 
|  | except Exception as e: | 
|  | self.logger.error("\tERROR:yaml_env_vars_populate: %s" % e) | 
|  | pass | 
|  |  | 
|  | """ | 
|  | Parse the string for plugin vars. | 
|  | Implement the logic to update environment variables based on the | 
|  | extracted variable names. | 
|  | """ | 
|  | try: | 
|  | # Example, list of plugin vars env_var_names_list | 
|  | #    ['my_hostname', 'port_https'] | 
|  | global_plugin_dict_keys = set(global_plugin_dict.keys()) | 
|  | # Skip env var list already populated above code block list. | 
|  | plugin_var_name_list = [ | 
|  | var | 
|  | for var in global_plugin_dict_keys | 
|  | if var not in env_var_names_list | 
|  | ] | 
|  |  | 
|  | for var in plugin_var_name_list: | 
|  | plugin_var_value = global_plugin_dict[var] | 
|  | if yaml_arg_str in global_plugin_dict: | 
|  | """ | 
|  | If this plugin var exist but empty in dict, don't replace. | 
|  | his is either a YAML plugin statement incorrectly used or | 
|  | user added a plugin var which is not going to be populated. | 
|  | """ | 
|  | if isinstance(plugin_var_value, (list, dict)): | 
|  | """ | 
|  | List data type or dict can't be replaced, use | 
|  | directly in eval function call. | 
|  | """ | 
|  | global_plugin_type_list.append(var) | 
|  | else: | 
|  | yaml_arg_str = yaml_arg_str.replace( | 
|  | str(var), str(plugin_var_value) | 
|  | ) | 
|  | except (IndexError, ValueError) as e: | 
|  | self.logger.error("\tERROR: yaml_plugin_vars_populate: %s" % e) | 
|  | pass | 
|  |  | 
|  | # From ${my_hostname}:${port_https} -> ['my_hostname', 'port_https'] | 
|  | # to populated values string as | 
|  | # Example:  xx.xx.xx.xx:443 and return the string | 
|  | return yaml_arg_str | 
|  |  | 
|  | def plugin_error_check(self, plugin_dict): | 
|  | r""" | 
|  | Process plugin error dictionary and return the corresponding error | 
|  | message. | 
|  |  | 
|  | This method checks if any dictionary in the plugin_dict list contains | 
|  | a "plugin_error" key. If such a dictionary is found, it retrieves the | 
|  | value associated with the "plugin_error" key and returns the | 
|  | corresponding error message from the plugin_error_dict attribute. | 
|  |  | 
|  | Parameters: | 
|  | plugin_dict (list of dict): A list of dictionaries containing | 
|  | plugin error information. | 
|  |  | 
|  | Returns: | 
|  | str: The error message corresponding to the "plugin_error" value, | 
|  | or None if no error is found. | 
|  | """ | 
|  | if any("plugin_error" in d for d in plugin_dict): | 
|  | for d in plugin_dict: | 
|  | if "plugin_error" in d: | 
|  | value = d["plugin_error"] | 
|  | return self.plugin_error_dict.get(value, None) | 
|  | return None | 
|  |  | 
|  | def key_index_list_dict(self, key, list_dict): | 
|  | r""" | 
|  | Find the index of the first dictionary in the list that contains | 
|  | the specified key. | 
|  |  | 
|  | Parameters: | 
|  | key (str):                 The key to search for in the | 
|  | dictionaries. | 
|  | list_dict (list of dict):  A list of dictionaries to search | 
|  | through. | 
|  |  | 
|  | Returns: | 
|  | int: The index of the first dictionary containing the key, or -1 | 
|  | if no match is found. | 
|  | """ | 
|  | for i, d in enumerate(list_dict): | 
|  | if key in d: | 
|  | return i | 
|  | return -1 | 
|  |  | 
|  | def plugin_expect_type(self, type, data): | 
|  | r""" | 
|  | Check if the provided data matches the expected type. | 
|  |  | 
|  | This method checks if the data argument matches the specified type. | 
|  | It supports the following types: "int", "float", "str", "list", "dict", | 
|  | and "tuple". | 
|  |  | 
|  | If the type is not recognized, it logs an info message and returns | 
|  | "INVALID". | 
|  |  | 
|  | Parameters: | 
|  | type (str): The expected data type. | 
|  | data:       The data to check against the expected type. | 
|  |  | 
|  | Returns: | 
|  | bool or str: True if the data matches the expected type, False if | 
|  | not, or "INVALID" if the type is not recognized. | 
|  | """ | 
|  | if type == "int": | 
|  | return isinstance(data, int) | 
|  | elif type == "float": | 
|  | return isinstance(data, float) | 
|  | elif type == "str": | 
|  | return isinstance(data, str) | 
|  | elif type == "list": | 
|  | return isinstance(data, list) | 
|  | elif type == "dict": | 
|  | return isinstance(data, dict) | 
|  | elif type == "tuple": | 
|  | return isinstance(data, tuple) | 
|  | else: | 
|  | self.logger.info("\tInvalid data type requested: %s" % type) | 
|  | return "INVALID" |