|  | #!/usr/bin/env python3 | 
|  |  | 
|  | r""" | 
|  | BMC redfish utility functions. | 
|  | """ | 
|  |  | 
|  | import json | 
|  | import re | 
|  | from robot.libraries.BuiltIn import BuiltIn | 
|  | import gen_print as gp | 
|  |  | 
|  | MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}") | 
|  |  | 
|  |  | 
|  | class bmc_redfish_utils(object): | 
|  |  | 
|  | ROBOT_LIBRARY_SCOPE = 'TEST SUITE' | 
|  |  | 
|  | def __init__(self): | 
|  | r""" | 
|  | Initialize the bmc_redfish_utils object. | 
|  | """ | 
|  | # Obtain a reference to the global redfish object. | 
|  | self.__inited__ = False | 
|  | self._redfish_ = BuiltIn().get_library_instance('redfish') | 
|  |  | 
|  | if MTLS_ENABLED == 'True': | 
|  | self.__inited__ = True | 
|  | else: | 
|  | # There is a possibility that a given driver support both redfish and | 
|  | # legacy REST. | 
|  | self._redfish_.login() | 
|  | self._rest_response_ = \ | 
|  | self._redfish_.get("/xyz/openbmc_project/", valid_status_codes=[200, 404]) | 
|  |  | 
|  | # If REST URL /xyz/openbmc_project/ is supported. | 
|  | if self._rest_response_.status == 200: | 
|  | self.__inited__ = True | 
|  |  | 
|  | BuiltIn().set_global_variable("${REDFISH_REST_SUPPORTED}", self.__inited__) | 
|  |  | 
|  | def get_redfish_session_info(self): | 
|  | r""" | 
|  | Returns redfish sessions info dictionary. | 
|  |  | 
|  | { | 
|  | 'key': 'yLXotJnrh5nDhXj5lLiH' , | 
|  | 'location': '/redfish/v1/SessionService/Sessions/nblYY4wlz0' | 
|  | } | 
|  | """ | 
|  | session_dict = { | 
|  | "key": self._redfish_.get_session_key(), | 
|  | "location": self._redfish_.get_session_location() | 
|  | } | 
|  | return session_dict | 
|  |  | 
|  | def get_attribute(self, resource_path, attribute, verify=None): | 
|  | r""" | 
|  | Get resource attribute. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path               URI resource absolute path (e.g. | 
|  | "/redfish/v1/Systems/1"). | 
|  | attribute                   Name of the attribute (e.g. 'PowerState'). | 
|  | """ | 
|  |  | 
|  | resp = self._redfish_.get(resource_path) | 
|  |  | 
|  | if verify: | 
|  | if resp.dict[attribute] == verify: | 
|  | return resp.dict[attribute] | 
|  | else: | 
|  | raise ValueError("Attribute value is not equal") | 
|  | elif attribute in resp.dict: | 
|  | return resp.dict[attribute] | 
|  |  | 
|  | return None | 
|  |  | 
|  | def get_properties(self, resource_path): | 
|  | r""" | 
|  | Returns dictionary of attributes for the resource. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path               URI resource absolute path (e.g. | 
|  | /redfish/v1/Systems/1"). | 
|  | """ | 
|  |  | 
|  | resp = self._redfish_.get(resource_path) | 
|  | return resp.dict | 
|  |  | 
|  | def get_members_uri(self, resource_path, attribute): | 
|  | r""" | 
|  | Returns the list of valid path which has a given attribute. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path            URI resource base path (e.g. | 
|  | '/redfish/v1/Systems/', | 
|  | '/redfish/v1/Chassis/'). | 
|  | attribute                Name of the attribute (e.g. 'PowerSupplies'). | 
|  | """ | 
|  |  | 
|  | # Set quiet variable to keep subordinate get() calls quiet. | 
|  | quiet = 1 | 
|  |  | 
|  | # Get the member id list. | 
|  | # e.g. ['/redfish/v1/Chassis/foo', '/redfish/v1/Chassis/bar'] | 
|  | resource_path_list = self.get_member_list(resource_path) | 
|  |  | 
|  | valid_path_list = [] | 
|  |  | 
|  | for path_idx in resource_path_list: | 
|  | # Get all the child object path under the member id e.g. | 
|  | # ['/redfish/v1/Chassis/foo/Power','/redfish/v1/Chassis/bar/Power'] | 
|  | child_path_list = self.list_request(path_idx) | 
|  |  | 
|  | # Iterate and check if path object has the attribute. | 
|  | for child_path_idx in child_path_list: | 
|  | if ('JsonSchemas' in child_path_idx)\ | 
|  | or ('SessionService' in child_path_idx)\ | 
|  | or ('#' in child_path_idx): | 
|  | continue | 
|  | if self.get_attribute(child_path_idx, attribute): | 
|  | valid_path_list.append(child_path_idx) | 
|  |  | 
|  | BuiltIn().log_to_console(valid_path_list) | 
|  | return valid_path_list | 
|  |  | 
|  | def get_endpoint_path_list(self, resource_path, end_point_prefix): | 
|  | r""" | 
|  | Returns list with entries ending in "/endpoint". | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path      URI resource base path (e.g. "/redfish/v1/Chassis/"). | 
|  | end_point_prefix   Name of the endpoint (e.g. 'Power'). | 
|  |  | 
|  | Find all list entries ending in "/endpoint" combination such as | 
|  | /redfish/v1/Chassis/<foo>/Power | 
|  | /redfish/v1/Chassis/<bar>/Power | 
|  | """ | 
|  |  | 
|  | end_point_list = self.list_request(resource_path) | 
|  |  | 
|  | # Regex to match entries ending in "/prefix" with optional underscore. | 
|  | regex = ".*/" + end_point_prefix + "[_]?[0-9]*$" | 
|  | return [x for x in end_point_list if re.match(regex, x, re.IGNORECASE)] | 
|  |  | 
|  | def get_target_actions(self, resource_path, target_attribute): | 
|  | r""" | 
|  | Returns resource target entry of the searched target attribute. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path      URI resource absolute path | 
|  | (e.g. "/redfish/v1/Systems/system"). | 
|  | target_attribute   Name of the attribute (e.g. 'ComputerSystem.Reset'). | 
|  |  | 
|  | Example: | 
|  | "Actions": { | 
|  | "#ComputerSystem.Reset": { | 
|  | "ResetType@Redfish.AllowableValues": [ | 
|  | "On", | 
|  | "ForceOff", | 
|  | "GracefulRestart", | 
|  | "GracefulShutdown" | 
|  | ], | 
|  | "target": "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset" | 
|  | } | 
|  | } | 
|  | """ | 
|  |  | 
|  | global target_list | 
|  | target_list = [] | 
|  |  | 
|  | resp_dict = self.get_attribute(resource_path, "Actions") | 
|  | if resp_dict is None: | 
|  | return None | 
|  |  | 
|  | # Recursively search the "target" key in the nested dictionary. | 
|  | # Populate the target_list of target entries. | 
|  | self.get_key_value_nested_dict(resp_dict, "target") | 
|  | # Return the matching target URL entry. | 
|  | for target in target_list: | 
|  | # target "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset" | 
|  | attribute_in_uri = target.rsplit('/', 1)[-1] | 
|  | # attribute_in_uri "ComputerSystem.Reset" | 
|  | if target_attribute == attribute_in_uri: | 
|  | return target | 
|  |  | 
|  | return None | 
|  |  | 
|  | def get_member_list(self, resource_path): | 
|  | r""" | 
|  | Perform a GET list request and return available members entries. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path  URI resource absolute path | 
|  | (e.g. "/redfish/v1/SessionService/Sessions"). | 
|  |  | 
|  | "Members": [ | 
|  | { | 
|  | "@odata.id": "/redfish/v1/SessionService/Sessions/Z5HummWPZ7" | 
|  | } | 
|  | { | 
|  | "@odata.id": "/redfish/v1/SessionService/Sessions/46CmQmEL7H" | 
|  | } | 
|  | ], | 
|  | """ | 
|  |  | 
|  | member_list = [] | 
|  | resp_list_dict = self.get_attribute(resource_path, "Members") | 
|  | if resp_list_dict is None: | 
|  | return member_list | 
|  |  | 
|  | for member_id in range(0, len(resp_list_dict)): | 
|  | member_list.append(resp_list_dict[member_id]["@odata.id"]) | 
|  |  | 
|  | return member_list | 
|  |  | 
|  | def list_request(self, resource_path): | 
|  | r""" | 
|  | Perform a GET list request and return available resource paths. | 
|  | Description of argument(s): | 
|  | resource_path  URI resource absolute path | 
|  | (e.g. "/redfish/v1/SessionService/Sessions"). | 
|  | """ | 
|  | gp.qprint_executing(style=gp.func_line_style_short) | 
|  | # Set quiet variable to keep subordinate get() calls quiet. | 
|  | quiet = 1 | 
|  | self.__pending_enumeration = set() | 
|  | self._rest_response_ = \ | 
|  | self._redfish_.get(resource_path, | 
|  | valid_status_codes=[200, 404, 500]) | 
|  |  | 
|  | # Return empty list. | 
|  | if self._rest_response_.status != 200: | 
|  | return self.__pending_enumeration | 
|  | self.walk_nested_dict(self._rest_response_.dict) | 
|  | if not self.__pending_enumeration: | 
|  | return resource_path | 
|  | for resource in self.__pending_enumeration.copy(): | 
|  | self._rest_response_ = \ | 
|  | self._redfish_.get(resource, | 
|  | valid_status_codes=[200, 404, 500]) | 
|  |  | 
|  | if self._rest_response_.status != 200: | 
|  | continue | 
|  | self.walk_nested_dict(self._rest_response_.dict) | 
|  | return list(sorted(self.__pending_enumeration)) | 
|  |  | 
|  | def enumerate_request(self, resource_path, return_json=1, | 
|  | include_dead_resources=False): | 
|  | r""" | 
|  | Perform a GET enumerate request and return available resource paths. | 
|  |  | 
|  | Description of argument(s): | 
|  | resource_path               URI resource absolute path (e.g. | 
|  | "/redfish/v1/SessionService/Sessions"). | 
|  | return_json                 Indicates whether the result should be | 
|  | returned as a json string or as a | 
|  | dictionary. | 
|  | include_dead_resources      Check and return a list of dead/broken URI | 
|  | resources. | 
|  | """ | 
|  |  | 
|  | gp.qprint_executing(style=gp.func_line_style_short) | 
|  |  | 
|  | return_json = int(return_json) | 
|  |  | 
|  | # Set quiet variable to keep subordinate get() calls quiet. | 
|  | quiet = 1 | 
|  |  | 
|  | # Variable to hold enumerated data. | 
|  | self.__result = {} | 
|  |  | 
|  | # Variable to hold the pending list of resources for which enumeration. | 
|  | # is yet to be obtained. | 
|  | self.__pending_enumeration = set() | 
|  |  | 
|  | self.__pending_enumeration.add(resource_path) | 
|  |  | 
|  | # Variable having resources for which enumeration is completed. | 
|  | enumerated_resources = set() | 
|  |  | 
|  | if include_dead_resources: | 
|  | dead_resources = {} | 
|  |  | 
|  | resources_to_be_enumerated = (resource_path,) | 
|  |  | 
|  | while resources_to_be_enumerated: | 
|  | for resource in resources_to_be_enumerated: | 
|  | # JsonSchemas, SessionService or URLs containing # are not | 
|  | # required in enumeration. | 
|  | # Example: '/redfish/v1/JsonSchemas/' and sub resources. | 
|  | #          '/redfish/v1/SessionService' | 
|  | #          '/redfish/v1/Managers/bmc#/Oem' | 
|  | if ('JsonSchemas' in resource) or ('SessionService' in resource)\ | 
|  | or ('PostCodes' in resource) or ('Registries' in resource)\ | 
|  | or ('Journal' in resource)\ | 
|  | or ('#' in resource): | 
|  | continue | 
|  |  | 
|  | self._rest_response_ = \ | 
|  | self._redfish_.get(resource, valid_status_codes=[200, 404, 405, 500]) | 
|  | # Enumeration is done for available resources ignoring the | 
|  | # ones for which response is not obtained. | 
|  | if self._rest_response_.status != 200: | 
|  | if include_dead_resources: | 
|  | try: | 
|  | dead_resources[self._rest_response_.status].append( | 
|  | resource) | 
|  | except KeyError: | 
|  | dead_resources[self._rest_response_.status] = \ | 
|  | [resource] | 
|  | continue | 
|  |  | 
|  | self.walk_nested_dict(self._rest_response_.dict, url=resource) | 
|  |  | 
|  | enumerated_resources.update(set(resources_to_be_enumerated)) | 
|  | resources_to_be_enumerated = \ | 
|  | tuple(self.__pending_enumeration - enumerated_resources) | 
|  |  | 
|  | if return_json: | 
|  | if include_dead_resources: | 
|  | return json.dumps(self.__result, sort_keys=True, | 
|  | indent=4, separators=(',', ': ')), dead_resources | 
|  | else: | 
|  | return json.dumps(self.__result, sort_keys=True, | 
|  | indent=4, separators=(',', ': ')) | 
|  | else: | 
|  | if include_dead_resources: | 
|  | return self.__result, dead_resources | 
|  | else: | 
|  | return self.__result | 
|  |  | 
|  | def walk_nested_dict(self, data, url=''): | 
|  | r""" | 
|  | Parse through the nested dictionary and get the resource id paths. | 
|  | Description of argument(s): | 
|  | data    Nested dictionary data from response message. | 
|  | url     Resource for which the response is obtained in data. | 
|  | """ | 
|  | url = url.rstrip('/') | 
|  |  | 
|  | for key, value in data.items(): | 
|  |  | 
|  | # Recursion if nested dictionary found. | 
|  | if isinstance(value, dict): | 
|  | self.walk_nested_dict(value) | 
|  | else: | 
|  | # Value contains a list of dictionaries having member data. | 
|  | if 'Members' == key: | 
|  | if isinstance(value, list): | 
|  | for memberDict in value: | 
|  | if isinstance(memberDict, str): | 
|  | self.__pending_enumeration.add(memberDict) | 
|  | else: | 
|  | self.__pending_enumeration.add(memberDict['@odata.id']) | 
|  |  | 
|  | if '@odata.id' == key: | 
|  | value = value.rstrip('/') | 
|  | # Data for the given url. | 
|  | if value == url: | 
|  | self.__result[url] = data | 
|  | # Data still needs to be looked up, | 
|  | else: | 
|  | self.__pending_enumeration.add(value) | 
|  |  | 
|  | def get_key_value_nested_dict(self, data, key): | 
|  | r""" | 
|  | Parse through the nested dictionary and get the searched key value. | 
|  |  | 
|  | Description of argument(s): | 
|  | data    Nested dictionary data from response message. | 
|  | key     Search dictionary key element. | 
|  | """ | 
|  |  | 
|  | for k, v in data.items(): | 
|  | if isinstance(v, dict): | 
|  | self.get_key_value_nested_dict(v, key) | 
|  |  | 
|  | if k == key: | 
|  | target_list.append(v) |