|  | #!/usr/bin/env python3 | 
|  |  | 
|  | r""" | 
|  | PEL functions. | 
|  | """ | 
|  |  | 
|  | import json | 
|  | import os | 
|  | import sys | 
|  | from datetime import datetime | 
|  |  | 
|  | import bmc_ssh_utils as bsu | 
|  | import func_args as fa | 
|  |  | 
|  | base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | 
|  | sys.path.append(base_path + "/data/") | 
|  |  | 
|  | import pel_variables  # NOQA | 
|  |  | 
|  |  | 
|  | class PeltoolException(Exception): | 
|  | r""" | 
|  | Base class for peltool related exceptions. | 
|  | """ | 
|  |  | 
|  | def __init__(self, message): | 
|  | self.message = message | 
|  | super().__init__(self.message) | 
|  |  | 
|  |  | 
|  | def peltool( | 
|  | option_string, peltool_extension=None, parse_json=True, **bsu_options | 
|  | ): | 
|  | r""" | 
|  | Run peltool on the BMC with the caller's option string and return the result. | 
|  |  | 
|  | Example: | 
|  |  | 
|  | ${pel_results}=  Peltool  -l | 
|  | Rprint Vars  pel_results | 
|  |  | 
|  | pel_results: | 
|  | [0x50000031]: | 
|  | [CompID]:                       0x1000 | 
|  | [PLID]:                         0x50000031 | 
|  | [Subsystem]:                    BMC Firmware | 
|  | [Message]:                      An application had an internal failure | 
|  | [SRC]:                          BD8D1002 | 
|  | [Commit Time]:                  02/25/2020  04:51:31 | 
|  | [Sev]:                          Unrecoverable Error | 
|  | [CreatorID]:                    BMC | 
|  |  | 
|  | Description of argument(s): | 
|  | option_string                   A string of options which are to be | 
|  | processed by the peltool command. | 
|  | peltool_extension               Provide peltool extension format. | 
|  | Default: None. | 
|  | parse_json                      Indicates that the raw JSON data should | 
|  | parsed into a list of dictionaries. | 
|  | bsu_options                     Options to be passed directly to | 
|  | bmc_execute_command. See its prolog for | 
|  | details. | 
|  | """ | 
|  |  | 
|  | bsu_options = fa.args_to_objects(bsu_options) | 
|  | peltool_cmd = "peltool" | 
|  | if peltool_extension: | 
|  | peltool_cmd = peltool_cmd + peltool_extension | 
|  |  | 
|  | out_buf, _, _ = bsu.bmc_execute_command( | 
|  | peltool_cmd + " " + option_string, **bsu_options | 
|  | ) | 
|  | if parse_json: | 
|  | try: | 
|  | return json.loads(out_buf) | 
|  | except ValueError as e: | 
|  | if type(out_buf) is str: | 
|  | return out_buf | 
|  | else: | 
|  | print(str(e)) | 
|  | return {} | 
|  | return out_buf | 
|  |  | 
|  |  | 
|  | def get_pel_data_from_bmc( | 
|  | include_hidden_pels=False, include_informational_pels=False | 
|  | ): | 
|  | r""" | 
|  | Returns PEL data from BMC else throws exception. | 
|  |  | 
|  | Description of arguments: | 
|  | include_hidden_pels           True/False (default: False). | 
|  | Set True to get hidden PELs else False. | 
|  | include_informational_pels    True/False (default: False). | 
|  | Set True to get informational PELs else False. | 
|  | """ | 
|  | try: | 
|  | pel_cmd = " -l" | 
|  | if include_hidden_pels: | 
|  | pel_cmd = pel_cmd + " -h" | 
|  | if include_informational_pels: | 
|  | pel_cmd = pel_cmd + " -f" | 
|  | pel_data = peltool(pel_cmd) | 
|  | if not pel_data: | 
|  | print("No PEL data present in BMC ...") | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to get PEL data from BMC : " + str(exception) | 
|  | ) from exception | 
|  | return pel_data | 
|  |  | 
|  |  | 
|  | def verify_no_pel_exists_on_bmc(): | 
|  | r""" | 
|  | Verify that no PEL exists in BMC. Raise an exception if it does. | 
|  | """ | 
|  |  | 
|  | try: | 
|  | pel_data = get_pel_data_from_bmc() | 
|  |  | 
|  | if len(pel_data) == 0: | 
|  | return True | 
|  |  | 
|  | print("PEL data present. \n", pel_data) | 
|  | raise PeltoolException("PEL data present in BMC") | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to get PEL data from BMC : " + str(exception) | 
|  | ) from exception | 
|  |  | 
|  |  | 
|  | def compare_pel_and_redfish_event_log(pel_record, event_record): | 
|  | r""" | 
|  | Compare PEL log attributes like "SRC", "Created at" with Redfish | 
|  | event log attributes like "EventId", "Created". | 
|  | Return False if they do not match. | 
|  |  | 
|  | Description of arguments: | 
|  | pel_record     PEL record. | 
|  | event_record   Redfish event which is equivalent of PEL record. | 
|  | """ | 
|  |  | 
|  | try: | 
|  | # Below is format of PEL record / event record and following | 
|  | # i.e. "SRC", "Created at" from | 
|  | # PEL record is compared with "EventId", "Created" from event record. | 
|  |  | 
|  | # PEL Log attributes | 
|  | # SRC        : XXXXXXXX | 
|  | # Created at : 11/14/2022 12:38:04 | 
|  |  | 
|  | # Event log attributes | 
|  | # EventId : XXXXXXXX XXXXXXXX XXXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX | 
|  |  | 
|  | # Created : 2022-11-14T12:38:04+00:00 | 
|  |  | 
|  | print(f"\nPEL records : {pel_record}") | 
|  | print(f"\nEvent records : {event_record}") | 
|  |  | 
|  | pel_src = pel_record["pel_data"]["SRC"] | 
|  | pel_created_time = pel_record["pel_detail_data"]["Private Header"][ | 
|  | "Created at" | 
|  | ] | 
|  |  | 
|  | event_ids = (event_record["EventId"]).split(" ") | 
|  |  | 
|  | event_time_format = (event_record["Created"]).split("T") | 
|  | event_date = (event_time_format[0]).split("-") | 
|  | event_date = datetime( | 
|  | int(event_date[0]), int(event_date[1]), int(event_date[2]) | 
|  | ) | 
|  | event_date = event_date.strftime("%m/%d/%Y") | 
|  | event_sub_time_format = (event_time_format[1]).split("+") | 
|  | event_date_time = event_date + " " + event_sub_time_format[0] | 
|  |  | 
|  | event_created_time = event_date_time.replace("-", "/") | 
|  |  | 
|  | print(f"\nPEL SRC : {pel_src} | PEL Created Time : {pel_created_time}") | 
|  |  | 
|  | print( | 
|  | f"\nError event ID : {event_ids[0]} | Error Log Created Time " | 
|  | + ": {event_created_time}" | 
|  | ) | 
|  |  | 
|  | if pel_src == event_ids[0] and pel_created_time == event_created_time: | 
|  | print( | 
|  | "\nPEL SRC and created date time match with " | 
|  | "event ID, created time" | 
|  | ) | 
|  | else: | 
|  | raise PeltoolException( | 
|  | "\nPEL SRC and created date time did not " | 
|  | "match with event ID, created time" | 
|  | ) | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Exception occurred during PEL and Event log " | 
|  | "comparison for SRC or event ID and created " | 
|  | "time : " + str(exception) | 
|  | ) from exception | 
|  |  | 
|  |  | 
|  | def fetch_all_pel_ids_for_src(src_id, severity, include_hidden_pels=False): | 
|  | r""" | 
|  | Fetch all PEL IDs for the input SRC ID based on the severity type | 
|  | in the list format. | 
|  |  | 
|  | Description of arguments: | 
|  | src_id                SRC ID (e.g. BCXXYYYY). | 
|  | severity              PEL severity (e.g. "Predictive Error" | 
|  | "Recovered Error"). | 
|  | include_hidden_pels   True/False (default: False). | 
|  | Set True to get hidden PELs else False. | 
|  | """ | 
|  |  | 
|  | try: | 
|  | src_pel_ids = [] | 
|  | pel_data = get_pel_data_from_bmc(include_hidden_pels) | 
|  | pel_id_list = pel_data.keys() | 
|  | for pel_id in pel_id_list: | 
|  | # Check if required SRC ID with severity is present | 
|  | if src_id in pel_data[pel_id]["SRC"]: | 
|  | if pel_data[pel_id]["Sev"] == severity: | 
|  | src_pel_ids.append(pel_id) | 
|  |  | 
|  | if not src_pel_ids: | 
|  | raise PeltoolException( | 
|  | src_id + " with severity " + severity + " not present" | 
|  | ) | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to fetch PEL ID for required SRC : " + str(exception) | 
|  | ) from exception | 
|  | return src_pel_ids | 
|  |  | 
|  |  | 
|  | def fetch_all_src(include_hidden_pels=False): | 
|  | r""" | 
|  | Fetch all SRC IDs from peltool in the list format. | 
|  |  | 
|  | include_hidden_pels       True/False (default: False). | 
|  | Set True to get hidden PELs else False. | 
|  | """ | 
|  | try: | 
|  | src_id = [] | 
|  | pel_data = get_pel_data_from_bmc(include_hidden_pels) | 
|  | if pel_data: | 
|  | pel_id_list = pel_data.keys() | 
|  | for pel_id in pel_id_list: | 
|  | src_id.append(pel_data[pel_id]["SRC"]) | 
|  | print("SRC IDs: " + str(src_id)) | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to fetch all SRCs : " + str(exception) | 
|  | ) from exception | 
|  | return src_id | 
|  |  | 
|  |  | 
|  | def check_for_unexpected_src( | 
|  | unexpected_src_list=None, include_hidden_pels=False | 
|  | ): | 
|  | r""" | 
|  | From the given unexpected SRC list, check if any unexpected SRC created | 
|  | on the BMC. Returns 0 if no SRC found else throws exception. | 
|  |  | 
|  | Description of arguments: | 
|  | unexpected_src_list       Give unexpected SRCs in the list format. | 
|  | e.g.: ["BBXXYYYY", "AAXXYYYY"]. | 
|  |  | 
|  | include_hidden_pels       True/False (default: False). | 
|  | Set True to get hidden PELs else False. | 
|  | """ | 
|  | try: | 
|  | unexpected_src_count = 0 | 
|  | if not unexpected_src_list: | 
|  | print("Unexpected SRC list is empty.") | 
|  | src_data = fetch_all_src(include_hidden_pels) | 
|  | for src in unexpected_src_list: | 
|  | if src in src_data: | 
|  | print("Found an unexpected SRC : " + src) | 
|  | unexpected_src_count = unexpected_src_count + 1 | 
|  | if unexpected_src_count >= 1: | 
|  | raise PeltoolException("Unexpected SRC found.") | 
|  |  | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to verify unexpected SRC list : " + str(exception) | 
|  | ) from exception | 
|  | return unexpected_src_count | 
|  |  | 
|  |  | 
|  | def filter_unexpected_srcs(expected_srcs=None): | 
|  | r""" | 
|  | Return list of SRCs found in BMC after filtering expected SRCs. | 
|  | If expected_srcs is None then all SRCs found in system are returned. | 
|  |  | 
|  | Description of arguments: | 
|  | expected_srcs       List of expected SRCs. E.g. ["BBXXYYYY", "AAXXYYYY"]. | 
|  | """ | 
|  |  | 
|  | temp_srcs_found = fetch_all_src() | 
|  |  | 
|  | if not expected_srcs: | 
|  | expected_srcs = [] | 
|  | srcs_found = list() | 
|  | # Remove any extra space after SRC ID | 
|  | for item in temp_srcs_found: | 
|  | var = item.split() | 
|  | srcs_found.append(var[0]) | 
|  | print(srcs_found) | 
|  | print(expected_srcs) | 
|  | return list(set(srcs_found) - set(expected_srcs)) | 
|  |  | 
|  |  | 
|  | def get_bmc_event_log_id_for_pel(pel_id): | 
|  | r""" | 
|  | Return BMC event log ID for the given PEL ID. | 
|  |  | 
|  | Description of arguments: | 
|  | pel_id       PEL ID. E.g. 0x50000021. | 
|  | """ | 
|  |  | 
|  | pel_data = peltool("-i " + pel_id) | 
|  | print(pel_data) | 
|  | bmc_id_for_pel = pel_data["Private Header"]["BMC Event Log Id"] | 
|  | return bmc_id_for_pel | 
|  |  | 
|  |  | 
|  | def get_latest_pels( | 
|  | number_of_pels=1, | 
|  | include_hidden_pels=False, | 
|  | include_informational_pels=False, | 
|  | ): | 
|  | r""" | 
|  | Return latest PEL IDs. | 
|  |  | 
|  | Description of arguments: | 
|  | number_of_pels               Number of PELS to be returned. | 
|  | include_hidden_pels          True/False  (default: False). | 
|  | Set True to get hidden PELs else False. | 
|  | include_informational_pels   True/False (default: False). | 
|  | Set True to get informational PELs else False. | 
|  | """ | 
|  |  | 
|  | pel_cmd = " -lr" | 
|  | if include_hidden_pels: | 
|  | pel_cmd = pel_cmd + " -h" | 
|  | if include_informational_pels: | 
|  | pel_cmd = pel_cmd + " -f" | 
|  | pel_data = peltool(pel_cmd) | 
|  | pel_ids = list(pel_data.keys()) | 
|  | return pel_ids[:number_of_pels] | 
|  |  | 
|  |  | 
|  | def fetch_all_pel_ids_based_on_error_message( | 
|  | error_msg, include_hidden_pels=False, include_informational_pels=False | 
|  | ): | 
|  | r""" | 
|  | Fetch all PEL IDs based on the input error message and return | 
|  | in the list format. | 
|  |  | 
|  | Description of arguments: | 
|  | error_msg                     Error message | 
|  | include_hidden_pels           True/False (default: False). | 
|  | Set True to get hidden PELs else False. | 
|  | include_informational_pels    True/False (default: False). | 
|  | Set True to get informational PELs else False | 
|  | """ | 
|  |  | 
|  | try: | 
|  | err_pel_ids = [] | 
|  | pel_data = get_pel_data_from_bmc( | 
|  | include_hidden_pels, include_informational_pels | 
|  | ) | 
|  | pel_id_list = pel_data.keys() | 
|  | for pel_id in pel_id_list: | 
|  | # Check if required PEL with error message is created. | 
|  | if error_msg in pel_data[pel_id]["Message"]: | 
|  | err_pel_ids.append(pel_id) | 
|  |  | 
|  | if not err_pel_ids: | 
|  | raise PeltoolException( | 
|  | "Failed to get PEL ID with error message : " + error_msg | 
|  | ) | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to fetch PEL ID for required SRC : " + str(exception) | 
|  | ) from exception | 
|  | return err_pel_ids | 
|  |  | 
|  |  | 
|  | def check_if_pel_transmitted_to_host( | 
|  | pel_id, expected_pel_trans_state_to_host="Acked" | 
|  | ): | 
|  | r""" | 
|  | Return True if PEL is transmitted to Host else False. | 
|  |  | 
|  | Description of arguments: | 
|  | pel_id                              PEL ID. E.g. 0x50000021. | 
|  | expected_pel_trans_state_to_host    Expected host transmission state for the PEL data. | 
|  | Default is "Acked". | 
|  | """ | 
|  |  | 
|  | try: | 
|  | pel_data = peltool("-i " + pel_id) | 
|  | print(pel_data) | 
|  | host_state = pel_data["User Header"]["Host Transmission"] | 
|  | if host_state != expected_pel_trans_state_to_host: | 
|  | return False | 
|  | except Exception as exception: | 
|  | raise PeltoolException( | 
|  | "Failed to parse PEL data : " + str(exception) | 
|  | ) from exception | 
|  | return True |