blob: 125bd0c4071d5dfda3a1e71b850d4c3b589f0799 [file] [log] [blame]
#!/usr/bin/env python3
import argparse
import json
import jsonschema
import os
import sys
r"""
Validates the phosphor-regulators configuration file. Checks it against a JSON
schema as well as doing some extra checks that can't be encoded in the schema.
"""
def handle_validation_error():
sys.exit("Validation failed.")
def get_values(json_element, key, result = None):
r"""
Finds all occurrences of a key within the specified JSON element and its
children. Returns the associated values.
To search the entire configuration file, pass the root JSON element
json_element: JSON element within the config file.
key: key name.
result: list of values found with the specified key.
"""
if result is None:
result = []
if type(json_element) is dict:
for json_key in json_element:
if json_key == key:
result.append(json_element[json_key])
elif type(json_element[json_key]) in (list, dict):
get_values(json_element[json_key], key, result)
elif type(json_element) is list:
for item in json_element:
if type(item) in (list, dict):
get_values(item, key, result)
return result
def get_rule_ids(config_json):
r"""
Get all rule IDs in the configuration file.
config_json: Configuration file JSON
"""
rule_ids = []
for rule in config_json.get('rules', {}):
rule_ids.append(rule['id'])
return rule_ids
def get_device_ids(config_json):
r"""
Get all device IDs in the configuration file.
config_json: Configuration file JSON
"""
device_ids = []
for chassis in config_json.get('chassis', {}):
for device in chassis.get('devices', {}):
device_ids.append(device['id'])
return device_ids
def check_number_of_elements_in_masks(config_json):
r"""
Check if the number of bit masks in the 'masks' property matches the number
of byte values in the 'values' property.
config_json: Configuration file JSON
"""
i2c_write_bytes = get_values(config_json, 'i2c_write_bytes')
i2c_compare_bytes = get_values(config_json, 'i2c_compare_bytes')
for object in i2c_write_bytes:
if 'masks' in object:
if len(object.get('masks', [])) != len(object.get('values', [])):
sys.stderr.write("Error: Invalid i2c_write_bytes action.\n"+\
"The masks array must have the same size as the values array. "+\
"masks: "+str(object.get('masks', []))+\
", values: "+str(object.get('values', []))+'.\n')
handle_validation_error()
for object in i2c_compare_bytes:
if 'masks' in object:
if len(object.get('masks', [])) != len(object.get('values', [])):
sys.stderr.write("Error: Invalid i2c_compare_bytes action.\n"+\
"The masks array must have the same size as the values array. "+\
"masks: "+str(object.get('masks', []))+\
", values: "+str(object.get('values', []))+'.\n')
handle_validation_error()
def check_rule_id_exists(config_json):
r"""
Check if a rule_id property specifies a rule ID that does not exist.
config_json: Configuration file JSON
"""
rule_ids = get_values(config_json, 'rule_id')
valid_rule_ids = get_rule_ids(config_json)
for rule_id in rule_ids:
if rule_id not in valid_rule_ids:
sys.stderr.write("Error: Rule ID does not exist.\n"+\
"Found rule_id value that specifies invalid rule ID "+\
rule_id+'\n')
handle_validation_error()
def check_set_device_value_exists(config_json):
r"""
Check if a set_device action specifies a device ID that does not exist.
config_json: Configuration file JSON
"""
device_ids = get_values(config_json, 'set_device')
valid_device_ids = get_device_ids(config_json)
for device_id in device_ids:
if device_id not in valid_device_ids:
sys.stderr.write("Error: Device ID does not exist.\n"+\
"Found set_device action that specifies invalid device ID "+\
device_id+'\n')
handle_validation_error()
def check_run_rule_value_exists(config_json):
r"""
Check if any run_rule actions specify a rule ID that does not exist.
config_json: Configuration file JSON
"""
rule_ids = get_values(config_json, 'run_rule')
valid_rule_ids = get_rule_ids(config_json)
for rule_id in rule_ids:
if rule_id not in valid_rule_ids:
sys.stderr.write("Error: Rule ID does not exist.\n"+\
"Found run_rule action that specifies invalid rule ID "+\
rule_id+'\n')
handle_validation_error()
def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]):
r"""
Check if a 'run_rule' action in the specified rule causes an
infinite loop.
config_json: Configuration file JSON.
rule_json: A rule in the JSON config file.
call_stack: Current call stack of rules.
"""
call_stack.append(rule_json['id'])
for action in rule_json.get('actions', {}):
if 'run_rule' in action:
run_rule_id = action['run_rule']
if run_rule_id in call_stack:
call_stack.append(run_rule_id)
sys.stderr.write(\
"Infinite loop caused by run_rule actions.\n"+\
str(call_stack)+'\n')
handle_validation_error()
else:
for rule in config_json.get('rules', {}):
if rule['id'] == run_rule_id:
check_infinite_loops_in_rule(\
config_json, rule, call_stack)
call_stack.pop()
def check_infinite_loops(config_json):
r"""
Check if rule in config file is called recursively, causing an
infinite loop.
config_json: Configuration file JSON
"""
for rule in config_json.get('rules', {}):
check_infinite_loops_in_rule(config_json, rule)
def check_duplicate_object_id(config_json):
r"""
Check that there aren't any JSON objects with the same 'id' property value.
config_json: Configuration file JSON
"""
json_ids = get_values(config_json, 'id')
unique_ids = set()
for id in json_ids:
if id in unique_ids:
sys.stderr.write("Error: Duplicate ID.\n"+\
"Found multiple objects with the ID "+id+'\n')
handle_validation_error()
else:
unique_ids.add(id)
def check_duplicate_rule_id(config_json):
r"""
Check that there aren't any "rule" elements with the same 'id' field.
config_json: Configuration file JSON
"""
rule_ids = []
for rule in config_json.get('rules', {}):
rule_id = rule['id']
if rule_id in rule_ids:
sys.stderr.write("Error: Duplicate rule ID.\n"+\
"Found multiple rules with the ID "+rule_id+'\n')
handle_validation_error()
else:
rule_ids.append(rule_id)
def check_duplicate_chassis_number(config_json):
r"""
Check that there aren't any "chassis" elements with the same 'number' field.
config_json: Configuration file JSON
"""
numbers = []
for chassis in config_json.get('chassis', {}):
number = chassis['number']
if number in numbers:
sys.stderr.write("Error: Duplicate chassis number.\n"+\
"Found multiple chassis with the number "+str(number)+'\n')
handle_validation_error()
else:
numbers.append(number)
def check_duplicate_device_id(config_json):
r"""
Check that there aren't any "devices" with the same 'id' field.
config_json: Configuration file JSON
"""
device_ids = []
for chassis in config_json.get('chassis', {}):
for device in chassis.get('devices', {}):
device_id = device['id']
if device_id in device_ids:
sys.stderr.write("Error: Duplicate device ID.\n"+\
"Found multiple devices with the ID "+device_id+'\n')
handle_validation_error()
else:
device_ids.append(device_id)
def check_duplicate_rail_id(config_json):
r"""
Check that there aren't any "rails" with the same 'id' field.
config_json: Configuration file JSON
"""
rail_ids = []
for chassis in config_json.get('chassis', {}):
for device in chassis.get('devices', {}):
for rail in device.get('rails', {}):
rail_id = rail['id']
if rail_id in rail_ids:
sys.stderr.write("Error: Duplicate rail ID.\n"+\
"Found multiple rails with the ID "+rail_id+'\n')
handle_validation_error()
else:
rail_ids.append(rail_id)
def check_for_duplicates(config_json):
r"""
Check for duplicate ID.
"""
check_duplicate_rule_id(config_json)
check_duplicate_chassis_number(config_json)
check_duplicate_device_id(config_json)
check_duplicate_rail_id(config_json)
check_duplicate_object_id(config_json)
def validate_schema(config, schema):
r"""
Validates the specified config file using the specified
schema file.
config: Path of the file containing the config JSON
schema: Path of the file containing the schema JSON
"""
with open(config) as config_handle:
config_json = json.load(config_handle)
with open(schema) as schema_handle:
schema_json = json.load(schema_handle)
try:
jsonschema.validate(config_json, schema_json)
except jsonschema.ValidationError as e:
print(e)
handle_validation_error()
return config_json
def validate_JSON_format(file):
with open(file) as json_data:
try:
return json.load(json_data)
except ValueError as err:
return False
return True
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='phosphor-regulators configuration file validator')
parser.add_argument('-s', '--schema-file', dest='schema_file',
help='The phosphor-regulators schema file')
parser.add_argument('-c', '--configuration-file', dest='configuration_file',
help='The phosphor-regulators configuration file')
args = parser.parse_args()
if not args.schema_file:
parser.print_help()
sys.exit("Error: Schema file is required.")
if not os.path.exists(args.schema_file):
parser.print_help()
sys.exit("Error: Schema file does not exist.")
if not os.access(args.schema_file, os.R_OK):
parser.print_help()
sys.exit("Error: Schema file is not readable.")
if not validate_JSON_format(args.schema_file):
parser.print_help()
sys.exit("Error: Schema file is not in the JSON format.")
if not args.configuration_file:
parser.print_help()
sys.exit("Error: Configuration file is required.")
if not os.path.exists(args.configuration_file):
parser.print_help()
sys.exit("Error: Configuration file does not exist.")
if not os.access(args.configuration_file, os.R_OK):
parser.print_help()
sys.exit("Error: Configuration file is not readable.")
if not validate_JSON_format(args.configuration_file):
parser.print_help()
sys.exit("Error: Configuration file is not in the JSON format.")
config_json = validate_schema(args.configuration_file, args.schema_file)
check_for_duplicates(config_json)
check_infinite_loops(config_json)
check_run_rule_value_exists(config_json)
check_set_device_value_exists(config_json)
check_rule_id_exists(config_json)
check_number_of_elements_in_masks(config_json)