|  | #!/usr/bin/env python3 | 
|  |  | 
|  | r""" | 
|  | See class prolog below for details. | 
|  | """ | 
|  |  | 
|  | import json | 
|  | import re | 
|  | import sys | 
|  | from json.decoder import JSONDecodeError | 
|  |  | 
|  | import func_args as fa | 
|  | import gen_print as gp | 
|  | from redfish.rest.v1 import InvalidCredentialsError | 
|  | from redfish_plus import redfish_plus | 
|  | from robot.libraries.BuiltIn import BuiltIn | 
|  |  | 
|  | MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}") | 
|  |  | 
|  |  | 
|  | class bmc_redfish(redfish_plus): | 
|  | r""" | 
|  | bmc_redfish is a child class of redfish_plus that is designed to provide | 
|  | benefits specifically for using redfish to communicate with an OpenBMC. | 
|  |  | 
|  | See the prologs of the methods below for details. | 
|  | """ | 
|  |  | 
|  | def __init__(self, *args, **kwargs): | 
|  | r""" | 
|  | Do BMC-related redfish initialization. | 
|  |  | 
|  | Presently, older versions of BMC code may not support redfish | 
|  | requests.  This can lead to unsightly error text being printed out for | 
|  | programs that may use lib/bmc_redfish_resource.robot even though they | 
|  | don't necessarily intend to make redfish requests. | 
|  |  | 
|  | This class method will make an attempt to tolerate this situation.  At | 
|  | some future point, when all BMCs can be expected to support redfish, | 
|  | this class method may be considered for deletion.  If it is deleted, | 
|  | the self.__inited__ test code in the login() class method below should | 
|  | likewise be deleted. | 
|  | """ | 
|  | self.__inited__ = False | 
|  | try: | 
|  | if MTLS_ENABLED == "True": | 
|  | self.__inited__ = True | 
|  | else: | 
|  | super(bmc_redfish, self).__init__(*args, **kwargs) | 
|  | self.__inited__ = True | 
|  | except ValueError as get_exception: | 
|  | except_type, except_value, except_traceback = sys.exc_info() | 
|  | regex = r"The HTTP status code was not valid:[\r\n]+status:[ ]+502" | 
|  | result = re.match(regex, str(except_value), flags=re.MULTILINE) | 
|  | if not result: | 
|  | gp.lprint_var(except_type) | 
|  | gp.lprint_varx("except_value", str(except_value)) | 
|  | raise (get_exception) | 
|  | except AttributeError as e: | 
|  | BuiltIn().log_to_console( | 
|  | "AttributeError: Error response from server" | 
|  | ) | 
|  | except Exception as e: | 
|  | error_response = type(e).__name__ + " from bmc_redfish class" | 
|  | BuiltIn().log_to_console(error_response) | 
|  |  | 
|  | BuiltIn().set_global_variable("${REDFISH_SUPPORTED}", self.__inited__) | 
|  |  | 
|  | def login(self, *args, **kwargs): | 
|  | r""" | 
|  | Assign BMC default values for username, password and auth arguments | 
|  | and call parent class login method. | 
|  |  | 
|  | Description of argument(s): | 
|  | args                        See parent class method prolog for details. | 
|  | kwargs                      See parent class method prolog for details. | 
|  | """ | 
|  |  | 
|  | if MTLS_ENABLED == "True": | 
|  | return None | 
|  | if not self.__inited__: | 
|  | message = "bmc_redfish.__init__() was never successfully run.  It " | 
|  | message += "is likely that the target BMC firmware code level " | 
|  | message += "does not support redfish.\n" | 
|  | raise ValueError(message) | 
|  | # Assign default values for username, password, auth where necessary. | 
|  | openbmc_username = BuiltIn().get_variable_value("${OPENBMC_USERNAME}") | 
|  | openbmc_password = BuiltIn().get_variable_value("${OPENBMC_PASSWORD}") | 
|  | username, args, kwargs = fa.pop_arg(openbmc_username, *args, **kwargs) | 
|  | password, args, kwargs = fa.pop_arg(openbmc_password, *args, **kwargs) | 
|  | auth, args, kwargs = fa.pop_arg("session", *args, **kwargs) | 
|  |  | 
|  | try: | 
|  | super(bmc_redfish, self).login( | 
|  | username, password, auth, *args, **kwargs | 
|  | ) | 
|  | # Handle InvalidCredentialsError. | 
|  | # (raise redfish.rest.v1.InvalidCredentialsError if not [200, 201, 202, 204]) | 
|  | except InvalidCredentialsError: | 
|  | except_type, except_value, except_traceback = sys.exc_info() | 
|  | BuiltIn().log_to_console(str(except_type)) | 
|  | BuiltIn().log_to_console(str(except_value)) | 
|  | e_message = "Re-try login due to exception and " | 
|  | e_message += "it is likely error response from server side." | 
|  | BuiltIn().log_to_console(e_message) | 
|  | super(bmc_redfish, self).login( | 
|  | username, password, auth, *args, **kwargs | 
|  | ) | 
|  | # Handle JSONDecodeError and others. | 
|  | except JSONDecodeError: | 
|  | except_type, except_value, except_traceback = sys.exc_info() | 
|  | BuiltIn().log_to_console(str(except_type)) | 
|  | BuiltIn().log_to_console(str(except_value)) | 
|  | e_message = "Re-try login due to JSONDecodeError exception and " | 
|  | e_message += "it is likely error response from server side." | 
|  | BuiltIn().log_to_console(e_message) | 
|  | super(bmc_redfish, self).login( | 
|  | username, password, auth, *args, **kwargs | 
|  | ) | 
|  | except ValueError: | 
|  | except_type, except_value, except_traceback = sys.exc_info() | 
|  | BuiltIn().log_to_console(str(except_type)) | 
|  | BuiltIn().log_to_console(str(except_value)) | 
|  | e_message = "Unexpected exception." | 
|  | BuiltIn().log_to_console(e_message) | 
|  | except KeyError: | 
|  | except_type, except_value, except_traceback = sys.exc_info() | 
|  | BuiltIn().log_to_console(str(except_type)) | 
|  | BuiltIn().log_to_console(str(except_value)) | 
|  | e_message = "Login failed, something went wrong during request. " | 
|  | e_message += "Error usually due to SessionCreationError or " | 
|  | e_message += "InvalidCredentialsError resulting in failure" | 
|  | BuiltIn().log_to_console(e_message) | 
|  | # Custom BaseException error message from KeyError exception. | 
|  | raise KeyError("SessionCreationError or InvalidCredentialsError") | 
|  |  | 
|  | def logout(self): | 
|  | if MTLS_ENABLED == "True": | 
|  | return None | 
|  | else: | 
|  | super(bmc_redfish, self).logout() | 
|  |  | 
|  | def get_properties(self, *args, **kwargs): | 
|  | r""" | 
|  | Return dictionary of attributes for a given path. | 
|  |  | 
|  | The difference between calling this function and calling get() | 
|  | directly is that this function returns ONLY the dictionary portion of | 
|  | the response object. | 
|  |  | 
|  | Example robot code: | 
|  |  | 
|  | ${properties}=  Get Properties  /redfish/v1/Systems/system/ | 
|  | Rprint Vars  properties | 
|  |  | 
|  | Output: | 
|  |  | 
|  | properties: | 
|  | [PowerState]:      Off | 
|  | [Processors]: | 
|  | [@odata.id]:     /redfish/v1/Systems/system/Processors | 
|  | [SerialNumber]:    1234567 | 
|  | ... | 
|  |  | 
|  | Description of argument(s): | 
|  | args                        See parent class get() prolog for details. | 
|  | kwargs                      See parent class get() prolog for details. | 
|  | """ | 
|  |  | 
|  | resp = self.get(*args, **kwargs) | 
|  | return resp.dict if hasattr(resp, "dict") else {} | 
|  |  | 
|  | def get_attribute(self, path, attribute, default=None, *args, **kwargs): | 
|  | r""" | 
|  | Get and return the named attribute from the properties for a given | 
|  | path. | 
|  |  | 
|  | This method has the following advantages over calling get_properties | 
|  | directly: | 
|  | - The caller can specify a default value to be returned if the | 
|  | attribute does not exist. | 
|  |  | 
|  | Example robot code: | 
|  |  | 
|  | ${attribute}=  Get Attribute  /redfish/v1/AccountService | 
|  | ...  MaxPasswordLength  default=600 | 
|  | Rprint Vars  attribute | 
|  |  | 
|  | Output: | 
|  |  | 
|  | attribute:           31 | 
|  |  | 
|  | Description of argument(s): | 
|  | path                        The path (e.g. | 
|  | "/redfish/v1/AccountService"). | 
|  | attribute                   The name of the attribute to be retrieved | 
|  | (e.g. "MaxPasswordLength"). | 
|  | default                     The default value to be returned if the | 
|  | attribute does not exist (e.g. ""). | 
|  | args                        See parent class get() prolog for details. | 
|  | kwargs                      See parent class get() prolog for details. | 
|  | """ | 
|  |  | 
|  | return self.get_properties(path, *args, **kwargs).get( | 
|  | attribute, default | 
|  | ) | 
|  |  | 
|  | def get_session_info(self): | 
|  | r""" | 
|  | Get and return session info as a tuple consisting of session_key and | 
|  | session_location. | 
|  | """ | 
|  |  | 
|  | return self.get_session_key(), self.get_session_location() | 
|  |  | 
|  | def get_session_response(self): | 
|  | r""" | 
|  | Return session response dictionary data. | 
|  | """ | 
|  |  | 
|  | return self.__dict__ | 
|  |  | 
|  | def enumerate( | 
|  | self, resource_path, return_json=1, include_dead_resources=False | 
|  | ): | 
|  | r""" | 
|  | Perform a GET enumerate request and return available resource paths. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path               URI resource absolute path (e.g. "/redfish/v1/SessionService/Sessions"). | 
|  | return_json                 Indicates whether the result should be returned as a json string or as a | 
|  | dictionary. | 
|  | include_dead_resources      Check and return a list of dead/broken URI resources. | 
|  | """ | 
|  |  | 
|  | gp.qprint_executing(style=gp.func_line_style_short) | 
|  | # Set quiet variable to keep subordinate get() calls quiet. | 
|  | quiet = 1 | 
|  |  | 
|  | self.__result = {} | 
|  | # Variable to hold the pending list of resources for which enumeration is yet to be obtained. | 
|  | self.__pending_enumeration = set() | 
|  | self.__pending_enumeration.add(resource_path) | 
|  |  | 
|  | # Variable having resources for which enumeration is completed. | 
|  | enumerated_resources = set() | 
|  | dead_resources = {} | 
|  | resources_to_be_enumerated = (resource_path,) | 
|  | while resources_to_be_enumerated: | 
|  | for resource in resources_to_be_enumerated: | 
|  | # JsonSchemas, SessionService or URLs containing # are not required in enumeration. | 
|  | # Example: '/redfish/v1/JsonSchemas/' and sub resources. | 
|  | #          '/redfish/v1/SessionService' | 
|  | #          '/redfish/v1/Managers/${MANAGER_ID}#/Oem' | 
|  | if ( | 
|  | ("JsonSchemas" in resource) | 
|  | or ("SessionService" in resource) | 
|  | or ("#" in resource) | 
|  | ): | 
|  | continue | 
|  |  | 
|  | self._rest_response_ = self.get( | 
|  | resource, valid_status_codes=[200, 404, 500] | 
|  | ) | 
|  | # Enumeration is done for available resources ignoring the ones for which response is not | 
|  | # obtained. | 
|  | if self._rest_response_.status != 200: | 
|  | if include_dead_resources: | 
|  | try: | 
|  | dead_resources[self._rest_response_.status].append( | 
|  | resource | 
|  | ) | 
|  | except KeyError: | 
|  | dead_resources[self._rest_response_.status] = [ | 
|  | resource | 
|  | ] | 
|  | continue | 
|  | self.walk_nested_dict(self._rest_response_.dict, url=resource) | 
|  |  | 
|  | enumerated_resources.update(set(resources_to_be_enumerated)) | 
|  | resources_to_be_enumerated = tuple( | 
|  | self.__pending_enumeration - enumerated_resources | 
|  | ) | 
|  |  | 
|  | if return_json: | 
|  | if include_dead_resources: | 
|  | return ( | 
|  | json.dumps( | 
|  | self.__result, | 
|  | sort_keys=True, | 
|  | indent=4, | 
|  | separators=(",", ": "), | 
|  | ), | 
|  | dead_resources, | 
|  | ) | 
|  | else: | 
|  | return json.dumps( | 
|  | self.__result, | 
|  | sort_keys=True, | 
|  | indent=4, | 
|  | separators=(",", ": "), | 
|  | ) | 
|  | else: | 
|  | if include_dead_resources: | 
|  | return self.__result, dead_resources | 
|  | else: | 
|  | return self.__result | 
|  |  | 
|  | def walk_nested_dict(self, data, url=""): | 
|  | r""" | 
|  | Parse through the nested dictionary and get the resource id paths. | 
|  |  | 
|  | Description of argument(s): | 
|  | data                        Nested dictionary data from response message. | 
|  | url                         Resource for which the response is obtained in data. | 
|  | """ | 
|  | url = url.rstrip("/") | 
|  |  | 
|  | for key, value in data.items(): | 
|  | # Recursion if nested dictionary found. | 
|  | if isinstance(value, dict): | 
|  | self.walk_nested_dict(value) | 
|  | else: | 
|  | # Value contains a list of dictionaries having member data. | 
|  | if "Members" == key: | 
|  | if isinstance(value, list): | 
|  | for memberDict in value: | 
|  | self.__pending_enumeration.add( | 
|  | memberDict["@odata.id"] | 
|  | ) | 
|  | if "@odata.id" == key: | 
|  | value = value.rstrip("/") | 
|  | # Data for the given url. | 
|  | if value == url: | 
|  | self.__result[url] = data | 
|  | # Data still needs to be looked up, | 
|  | else: | 
|  | self.__pending_enumeration.add(value) | 
|  |  | 
|  | def get_members_list(self, resource_path, filter=None): | 
|  | r""" | 
|  | Return members list in a given URL. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path    URI resource absolute path (e.g. "/redfish/v1/AccountService/Accounts"). | 
|  | filter           strings or regex | 
|  |  | 
|  | /redfish/v1/AccountService/Accounts/ | 
|  | { | 
|  | "@odata.id": "/redfish/v1/AccountService/Accounts", | 
|  | "@odata.type": "#ManagerAccountCollection.ManagerAccountCollection", | 
|  | "Description": "BMC User Accounts", | 
|  | "Members": [ | 
|  | { | 
|  | "@odata.id": "/redfish/v1/AccountService/Accounts/root" | 
|  | }, | 
|  | { | 
|  | "@odata.id": "/redfish/v1/AccountService/Accounts/admin" | 
|  | } | 
|  | ], | 
|  | "Members@odata.count": 2, | 
|  | "Name": "Accounts Collection" | 
|  | } | 
|  |  | 
|  | Return list of members if no filter is applied as: | 
|  | ['/redfish/v1/AccountService/Accounts/root', "/redfish/v1/AccountService/Accounts/admin"] | 
|  |  | 
|  | Return list of members if filter (e.g "root") is applied as: | 
|  | ['/redfish/v1/AccountService/Accounts/root'] | 
|  |  | 
|  |  | 
|  | Calling from robot code: | 
|  | ${resp}=  Redfish.Get Members List  /redfish/v1/AccountService/Accounts | 
|  | ${resp}=  Redfish.Get Members List  /redfish/v1/AccountService/Accounts  filter=root | 
|  | """ | 
|  |  | 
|  | member_list = [] | 
|  | self._rest_response_ = self.get( | 
|  | resource_path, valid_status_codes=[200] | 
|  | ) | 
|  |  | 
|  | try: | 
|  | for member in self._rest_response_.dict["Members"]: | 
|  | member_list.append(member["@odata.id"]) | 
|  | except KeyError: | 
|  | # Non Members child objects at the top level, ignore. | 
|  | pass | 
|  |  | 
|  | # Filter elements in the list and return matched elements. | 
|  | if filter is not None: | 
|  | regex = ".*/" + filter + "[^/]*$" | 
|  | return [x for x in member_list if re.match(regex, x)] | 
|  |  | 
|  | return member_list |