blob: 54a16a4ad32c431dd38fb209b0871bd95892ab04 [file] [log] [blame]
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import jsonschema
r"""
Validates the phosphor-regulators configuration file. Checks it against a JSON
schema as well as doing some extra checks that can't be encoded in the schema.
"""
def handle_validation_error():
sys.exit("Validation failed.")
def get_values(json_element, key, result=None):
r"""
Finds all occurrences of a key within the specified JSON element and its
children. Returns the associated values.
To search the entire configuration file, pass the root JSON element
json_element: JSON element within the config file.
key: key name.
result: list of values found with the specified key.
"""
if result is None:
result = []
if type(json_element) is dict:
for json_key in json_element:
if json_key == key:
result.append(json_element[json_key])
elif type(json_element[json_key]) in (list, dict):
get_values(json_element[json_key], key, result)
elif type(json_element) is list:
for item in json_element:
if type(item) in (list, dict):
get_values(item, key, result)
return result
def get_rule_ids(config_json):
r"""
Get all rule IDs in the configuration file.
config_json: Configuration file JSON
"""
rule_ids = []
for rule in config_json.get("rules", {}):
rule_ids.append(rule["id"])
return rule_ids
def get_device_ids(config_json):
r"""
Get all device IDs in the configuration file.
config_json: Configuration file JSON
"""
device_ids = []
for chassis in config_json.get("chassis", {}):
for device in chassis.get("devices", {}):
device_ids.append(device["id"])
return device_ids
def check_number_of_elements_in_masks(config_json):
r"""
Check if the number of bit masks in the 'masks' property matches the number
of byte values in the 'values' property.
config_json: Configuration file JSON
"""
i2c_write_bytes = get_values(config_json, "i2c_write_bytes")
i2c_compare_bytes = get_values(config_json, "i2c_compare_bytes")
for object in i2c_write_bytes:
if "masks" in object:
if len(object.get("masks", [])) != len(object.get("values", [])):
sys.stderr.write(
"Error: Invalid i2c_write_bytes action.\n"
+ "The masks array must have the same size as the values"
+ " array. masks: "
+ str(object.get("masks", []))
+ ", values: "
+ str(object.get("values", []))
+ ".\n"
)
handle_validation_error()
for object in i2c_compare_bytes:
if "masks" in object:
if len(object.get("masks", [])) != len(object.get("values", [])):
sys.stderr.write(
"Error: Invalid i2c_compare_bytes action.\n"
+ "The masks array must have the same size as the values "
+ "array. masks: "
+ str(object.get("masks", []))
+ ", values: "
+ str(object.get("values", []))
+ ".\n"
)
handle_validation_error()
def check_rule_id_exists(config_json):
r"""
Check if a rule_id property specifies a rule ID that does not exist.
config_json: Configuration file JSON
"""
rule_ids = get_values(config_json, "rule_id")
valid_rule_ids = get_rule_ids(config_json)
for rule_id in rule_ids:
if rule_id not in valid_rule_ids:
sys.stderr.write(
"Error: Rule ID does not exist.\n"
+ "Found rule_id value that specifies invalid rule ID "
+ rule_id
+ "\n"
)
handle_validation_error()
def check_device_id_exists(config_json):
r"""
Check if a device_id property specifies a device ID that does not exist.
config_json: Configuration file JSON
"""
device_ids = get_values(config_json, "device_id")
valid_device_ids = get_device_ids(config_json)
for device_id in device_ids:
if device_id not in valid_device_ids:
sys.stderr.write(
"Error: Device ID does not exist.\n"
+ "Found device_id value that specifies invalid device ID "
+ device_id
+ "\n"
)
handle_validation_error()
def check_set_device_value_exists(config_json):
r"""
Check if a set_device action specifies a device ID that does not exist.
config_json: Configuration file JSON
"""
device_ids = get_values(config_json, "set_device")
valid_device_ids = get_device_ids(config_json)
for device_id in device_ids:
if device_id not in valid_device_ids:
sys.stderr.write(
"Error: Device ID does not exist.\n"
+ "Found set_device action that specifies invalid device ID "
+ device_id
+ "\n"
)
handle_validation_error()
def check_run_rule_value_exists(config_json):
r"""
Check if any run_rule actions specify a rule ID that does not exist.
config_json: Configuration file JSON
"""
rule_ids = get_values(config_json, "run_rule")
valid_rule_ids = get_rule_ids(config_json)
for rule_id in rule_ids:
if rule_id not in valid_rule_ids:
sys.stderr.write(
"Error: Rule ID does not exist.\n"
+ "Found run_rule action that specifies invalid rule ID "
+ rule_id
+ "\n"
)
handle_validation_error()
def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]):
r"""
Check if a 'run_rule' action in the specified rule causes an
infinite loop.
config_json: Configuration file JSON.
rule_json: A rule in the JSON config file.
call_stack: Current call stack of rules.
"""
call_stack.append(rule_json["id"])
for action in rule_json.get("actions", {}):
if "run_rule" in action:
run_rule_id = action["run_rule"]
if run_rule_id in call_stack:
call_stack.append(run_rule_id)
sys.stderr.write(
"Infinite loop caused by run_rule actions.\n"
+ str(call_stack)
+ "\n"
)
handle_validation_error()
else:
for rule in config_json.get("rules", {}):
if rule["id"] == run_rule_id:
check_infinite_loops_in_rule(
config_json, rule, call_stack
)
call_stack.pop()
def check_infinite_loops(config_json):
r"""
Check if rule in config file is called recursively, causing an
infinite loop.
config_json: Configuration file JSON
"""
for rule in config_json.get("rules", {}):
check_infinite_loops_in_rule(config_json, rule)
def check_duplicate_object_id(config_json):
r"""
Check that there aren't any JSON objects with the same 'id' property value.
config_json: Configuration file JSON
"""
json_ids = get_values(config_json, "id")
unique_ids = set()
for id in json_ids:
if id in unique_ids:
sys.stderr.write(
"Error: Duplicate ID.\n"
+ "Found multiple objects with the ID "
+ id
+ "\n"
)
handle_validation_error()
else:
unique_ids.add(id)
def check_duplicate_rule_id(config_json):
r"""
Check that there aren't any "rule" elements with the same 'id' field.
config_json: Configuration file JSON
"""
rule_ids = []
for rule in config_json.get("rules", {}):
rule_id = rule["id"]
if rule_id in rule_ids:
sys.stderr.write(
"Error: Duplicate rule ID.\n"
+ "Found multiple rules with the ID "
+ rule_id
+ "\n"
)
handle_validation_error()
else:
rule_ids.append(rule_id)
def check_duplicate_chassis_number(config_json):
r"""
Check that there aren't any "chassis" elements with the same 'number'
field.
config_json: Configuration file JSON
"""
numbers = []
for chassis in config_json.get("chassis", {}):
number = chassis["number"]
if number in numbers:
sys.stderr.write(
"Error: Duplicate chassis number.\n"
+ "Found multiple chassis with the number "
+ str(number)
+ "\n"
)
handle_validation_error()
else:
numbers.append(number)
def check_duplicate_device_id(config_json):
r"""
Check that there aren't any "devices" with the same 'id' field.
config_json: Configuration file JSON
"""
device_ids = []
for chassis in config_json.get("chassis", {}):
for device in chassis.get("devices", {}):
device_id = device["id"]
if device_id in device_ids:
sys.stderr.write(
"Error: Duplicate device ID.\n"
+ "Found multiple devices with the ID "
+ device_id
+ "\n"
)
handle_validation_error()
else:
device_ids.append(device_id)
def check_duplicate_rail_id(config_json):
r"""
Check that there aren't any "rails" with the same 'id' field.
config_json: Configuration file JSON
"""
rail_ids = []
for chassis in config_json.get("chassis", {}):
for device in chassis.get("devices", {}):
for rail in device.get("rails", {}):
rail_id = rail["id"]
if rail_id in rail_ids:
sys.stderr.write(
"Error: Duplicate rail ID.\n"
+ "Found multiple rails with the ID "
+ rail_id
+ "\n"
)
handle_validation_error()
else:
rail_ids.append(rail_id)
def check_for_duplicates(config_json):
r"""
Check for duplicate ID.
"""
check_duplicate_rule_id(config_json)
check_duplicate_chassis_number(config_json)
check_duplicate_device_id(config_json)
check_duplicate_rail_id(config_json)
check_duplicate_object_id(config_json)
def validate_schema(config, schema):
r"""
Validates the specified config file using the specified
schema file.
config: Path of the file containing the config JSON
schema: Path of the file containing the schema JSON
"""
with open(config) as config_handle:
config_json = json.load(config_handle)
with open(schema) as schema_handle:
schema_json = json.load(schema_handle)
try:
jsonschema.validate(config_json, schema_json)
except jsonschema.ValidationError as e:
print(e)
handle_validation_error()
return config_json
def validate_JSON_format(file):
with open(file) as json_data:
try:
return json.load(json_data)
except ValueError:
return False
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="phosphor-regulators configuration file validator"
)
parser.add_argument(
"-s",
"--schema-file",
dest="schema_file",
help="The phosphor-regulators schema file",
)
parser.add_argument(
"-c",
"--configuration-file",
dest="configuration_file",
help="The phosphor-regulators configuration file",
)
args = parser.parse_args()
if not args.schema_file:
parser.print_help()
sys.exit("Error: Schema file is required.")
if not os.path.exists(args.schema_file):
parser.print_help()
sys.exit("Error: Schema file does not exist.")
if not os.access(args.schema_file, os.R_OK):
parser.print_help()
sys.exit("Error: Schema file is not readable.")
if not validate_JSON_format(args.schema_file):
parser.print_help()
sys.exit("Error: Schema file is not in the JSON format.")
if not args.configuration_file:
parser.print_help()
sys.exit("Error: Configuration file is required.")
if not os.path.exists(args.configuration_file):
parser.print_help()
sys.exit("Error: Configuration file does not exist.")
if not os.access(args.configuration_file, os.R_OK):
parser.print_help()
sys.exit("Error: Configuration file is not readable.")
if not validate_JSON_format(args.configuration_file):
parser.print_help()
sys.exit("Error: Configuration file is not in the JSON format.")
config_json = validate_schema(args.configuration_file, args.schema_file)
check_for_duplicates(config_json)
check_infinite_loops(config_json)
check_run_rule_value_exists(config_json)
check_set_device_value_exists(config_json)
check_rule_id_exists(config_json)
check_device_id_exists(config_json)
check_number_of_elements_in_masks(config_json)