| #!/usr/bin/env python3 |
| |
| import argparse |
| import json |
| import sys |
| import jsonschema |
| |
| r""" |
| Validates the phosphor-regulators configuration file. Checks it against a JSON |
| schema as well as doing some extra checks that can't be encoded in the schema. |
| """ |
| |
| def handle_validation_error(): |
| sys.exit("Validation failed.") |
| |
| def get_values(json_element, key, result = None): |
| r""" |
| Finds all occurrences of a key within the specified JSON element and its |
| children. Returns the associated values. |
| To search the entire configuration file, pass the root JSON element |
| json_element: JSON element within the config file. |
| key: key name. |
| result: list of values found with the specified key. |
| """ |
| |
| if result is None: |
| result = [] |
| if type(json_element) is dict: |
| for json_key in json_element: |
| if json_key == key: |
| result.append(json_element[json_key]) |
| elif type(json_element[json_key]) in (list, dict): |
| get_values(json_element[json_key], key, result) |
| elif type(json_element) is list: |
| for item in json_element: |
| if type(item) in (list, dict): |
| get_values(item, key, result) |
| return result |
| |
| def get_rule_ids(config_json): |
| r""" |
| Get all rule IDs in the configuration file. |
| config_json: Configuration file JSON |
| """ |
| rule_ids = [] |
| for rule in config_json.get('rules', {}): |
| rule_ids.append(rule['id']) |
| return rule_ids |
| |
| def get_device_ids(config_json): |
| r""" |
| Get all device IDs in the configuration file. |
| config_json: Configuration file JSON |
| """ |
| device_ids = [] |
| for chassis in config_json.get('chassis', {}): |
| for device in chassis.get('devices', {}): |
| device_ids.append(device['id']) |
| return device_ids |
| |
| def check_number_of_elements_in_masks(config_json): |
| r""" |
| Check if the number of bit masks in the 'masks' property matches the number |
| of byte values in the 'values' property. |
| config_json: Configuration file JSON |
| """ |
| |
| i2c_write_bytes = get_values(config_json, 'i2c_write_bytes') |
| i2c_compare_bytes = get_values(config_json, 'i2c_compare_bytes') |
| |
| for object in i2c_write_bytes: |
| if 'masks' in object: |
| if len(object.get('masks', [])) != len(object.get('values', [])): |
| sys.stderr.write("Error: Invalid i2c_write_bytes action.\n"+\ |
| "The masks array must have the same size as the values array. "+\ |
| "masks: "+str(object.get('masks', []))+\ |
| ", values: "+str(object.get('values', []))+'.\n') |
| handle_validation_error() |
| |
| for object in i2c_compare_bytes: |
| if 'masks' in object: |
| if len(object.get('masks', [])) != len(object.get('values', [])): |
| sys.stderr.write("Error: Invalid i2c_compare_bytes action.\n"+\ |
| "The masks array must have the same size as the values array. "+\ |
| "masks: "+str(object.get('masks', []))+\ |
| ", values: "+str(object.get('values', []))+'.\n') |
| handle_validation_error() |
| |
| def check_rule_id_exists(config_json): |
| r""" |
| Check if a rule_id property specifies a rule ID that does not exist. |
| config_json: Configuration file JSON |
| """ |
| |
| rule_ids = get_values(config_json, 'rule_id') |
| valid_rule_ids = get_rule_ids(config_json) |
| for rule_id in rule_ids: |
| if rule_id not in valid_rule_ids: |
| sys.stderr.write("Error: Rule ID does not exist.\n"+\ |
| "Found rule_id value that specifies invalid rule ID "+\ |
| rule_id+'\n') |
| handle_validation_error() |
| |
| def check_set_device_value_exists(config_json): |
| r""" |
| Check if a set_device action specifies a device ID that does not exist. |
| config_json: Configuration file JSON |
| """ |
| |
| device_ids = get_values(config_json, 'set_device') |
| valid_device_ids = get_device_ids(config_json) |
| for device_id in device_ids: |
| if device_id not in valid_device_ids: |
| sys.stderr.write("Error: Device ID does not exist.\n"+\ |
| "Found set_device action that specifies invalid device ID "+\ |
| device_id+'\n') |
| handle_validation_error() |
| |
| def check_run_rule_value_exists(config_json): |
| r""" |
| Check if any run_rule actions specify a rule ID that does not exist. |
| config_json: Configuration file JSON |
| """ |
| |
| rule_ids = get_values(config_json, 'run_rule') |
| valid_rule_ids = get_rule_ids(config_json) |
| for rule_id in rule_ids: |
| if rule_id not in valid_rule_ids: |
| sys.stderr.write("Error: Rule ID does not exist.\n"+\ |
| "Found run_rule action that specifies invalid rule ID "+\ |
| rule_id+'\n') |
| handle_validation_error() |
| |
| def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]): |
| r""" |
| Check if a 'run_rule' action in the specified rule causes an |
| infinite loop. |
| config_json: Configuration file JSON. |
| rule_json: A rule in the JSON config file. |
| call_stack: Current call stack of rules. |
| """ |
| |
| call_stack.append(rule_json['id']) |
| for action in rule_json.get('actions', {}): |
| if 'run_rule' in action: |
| run_rule_id = action['run_rule'] |
| if run_rule_id in call_stack: |
| call_stack.append(run_rule_id) |
| sys.stderr.write(\ |
| "Infinite loop caused by run_rule actions.\n"+\ |
| str(call_stack)+'\n') |
| handle_validation_error() |
| else: |
| for rule in config_json.get('rules', {}): |
| if rule['id'] == run_rule_id: |
| check_infinite_loops_in_rule(\ |
| config_json, rule, call_stack) |
| call_stack.pop() |
| |
| def check_infinite_loops(config_json): |
| r""" |
| Check if rule in config file is called recursively, causing an |
| infinite loop. |
| config_json: Configuration file JSON |
| """ |
| |
| for rule in config_json.get('rules', {}): |
| check_infinite_loops_in_rule(config_json, rule) |
| |
| def check_duplicate_object_id(config_json): |
| r""" |
| Check that there aren't any JSON objects with the same 'id' property value. |
| config_json: Configuration file JSON |
| """ |
| |
| json_ids = get_values(config_json, 'id') |
| unique_ids = set() |
| for id in json_ids: |
| if id in unique_ids: |
| sys.stderr.write("Error: Duplicate ID.\n"+\ |
| "Found multiple objects with the ID "+id+'\n') |
| handle_validation_error() |
| else: |
| unique_ids.add(id) |
| |
| def check_duplicate_rule_id(config_json): |
| r""" |
| Check that there aren't any "rule" elements with the same 'id' field. |
| config_json: Configuration file JSON |
| """ |
| rule_ids = [] |
| for rule in config_json.get('rules', {}): |
| rule_id = rule['id'] |
| if rule_id in rule_ids: |
| sys.stderr.write("Error: Duplicate rule ID.\n"+\ |
| "Found multiple rules with the ID "+rule_id+'\n') |
| handle_validation_error() |
| else: |
| rule_ids.append(rule_id) |
| |
| def check_duplicate_chassis_number(config_json): |
| r""" |
| Check that there aren't any "chassis" elements with the same 'number' field. |
| config_json: Configuration file JSON |
| """ |
| numbers = [] |
| for chassis in config_json.get('chassis', {}): |
| number = chassis['number'] |
| if number in numbers: |
| sys.stderr.write("Error: Duplicate chassis number.\n"+\ |
| "Found multiple chassis with the number "+str(number)+'\n') |
| handle_validation_error() |
| else: |
| numbers.append(number) |
| |
| def check_duplicate_device_id(config_json): |
| r""" |
| Check that there aren't any "devices" with the same 'id' field. |
| config_json: Configuration file JSON |
| """ |
| device_ids = [] |
| for chassis in config_json.get('chassis', {}): |
| for device in chassis.get('devices', {}): |
| device_id = device['id'] |
| if device_id in device_ids: |
| sys.stderr.write("Error: Duplicate device ID.\n"+\ |
| "Found multiple devices with the ID "+device_id+'\n') |
| handle_validation_error() |
| else: |
| device_ids.append(device_id) |
| |
| def check_duplicate_rail_id(config_json): |
| r""" |
| Check that there aren't any "rails" with the same 'id' field. |
| config_json: Configuration file JSON |
| """ |
| rail_ids = [] |
| for chassis in config_json.get('chassis', {}): |
| for device in chassis.get('devices', {}): |
| for rail in device.get('rails', {}): |
| rail_id = rail['id'] |
| if rail_id in rail_ids: |
| sys.stderr.write("Error: Duplicate rail ID.\n"+\ |
| "Found multiple rails with the ID "+rail_id+'\n') |
| handle_validation_error() |
| else: |
| rail_ids.append(rail_id) |
| |
| def check_for_duplicates(config_json): |
| r""" |
| Check for duplicate ID. |
| """ |
| check_duplicate_rule_id(config_json) |
| |
| check_duplicate_chassis_number(config_json) |
| |
| check_duplicate_device_id(config_json) |
| |
| check_duplicate_rail_id(config_json) |
| |
| check_duplicate_object_id(config_json) |
| |
| def validate_schema(config, schema): |
| r""" |
| Validates the specified config file using the specified |
| schema file. |
| |
| config: Path of the file containing the config JSON |
| schema: Path of the file containing the schema JSON |
| """ |
| |
| with open(config) as config_handle: |
| config_json = json.load(config_handle) |
| |
| with open(schema) as schema_handle: |
| schema_json = json.load(schema_handle) |
| |
| try: |
| jsonschema.validate(config_json, schema_json) |
| except jsonschema.ValidationError as e: |
| print(e) |
| handle_validation_error() |
| |
| return config_json |
| |
| if __name__ == '__main__': |
| |
| parser = argparse.ArgumentParser( |
| description='phosphor-regulators configuration file validator') |
| |
| parser.add_argument('-s', '--schema-file', dest='schema_file', |
| help='The phosphor-regulators schema file') |
| |
| parser.add_argument('-c', '--configuration-file', dest='configuration_file', |
| help='The phosphor-regulators configuration file') |
| |
| args = parser.parse_args() |
| |
| if not args.schema_file: |
| parser.print_help() |
| sys.exit("Error: Schema file is required.") |
| if not args.configuration_file: |
| parser.print_help() |
| sys.exit("Error: Configuration file is required.") |
| |
| config_json = validate_schema(args.configuration_file, args.schema_file) |
| |
| check_for_duplicates(config_json) |
| |
| check_infinite_loops(config_json) |
| |
| check_run_rule_value_exists(config_json) |
| |
| check_set_device_value_exists(config_json) |
| |
| check_rule_id_exists(config_json) |
| |
| check_number_of_elements_in_masks(config_json) |