| #!/usr/bin/env python3 | 
 | # SPDX-License-Identifier: Apache-2.0 | 
 | """ | 
 | A tool for validating entity manager configurations. | 
 | """ | 
 | import argparse | 
 | import json | 
 | import jsonschema.validators | 
 | import os | 
 | import sys | 
 |  | 
 | DEFAULT_SCHEMA_FILENAME = "global.json" | 
 |  | 
 |  | 
 | def main(): | 
 |     parser = argparse.ArgumentParser( | 
 |         description="Entity manager configuration validator", | 
 |     ) | 
 |     parser.add_argument( | 
 |         "-s", "--schema", help=( | 
 |             "Use the specified schema file instead of the default " | 
 |             "(__file__/../../schemas/global.json)")) | 
 |     parser.add_argument( | 
 |         "-c", "--config", action='append', help=( | 
 |             "Validate the specified configuration files (can be " | 
 |             "specified more than once) instead of the default " | 
 |             "(__file__/../../configurations/**.json)")) | 
 |     parser.add_argument( | 
 |         "-e", "--expected-fails", help=( | 
 |             "A file with a list of configurations to ignore should " | 
 |             "they fail to validate")) | 
 |     parser.add_argument( | 
 |         "-k", "--continue", action='store_true', help=( | 
 |             "keep validating after a failure")) | 
 |     parser.add_argument( | 
 |         "-v", "--verbose", action='store_true', help=( | 
 |             "be noisy")) | 
 |     args = parser.parse_args() | 
 |  | 
 |     schema_file = args.schema | 
 |     if schema_file is None: | 
 |         try: | 
 |             source_dir = os.path.realpath(__file__).split(os.sep)[:-2] | 
 |             schema_file = os.sep + os.path.join( | 
 |                 *source_dir, 'schemas', DEFAULT_SCHEMA_FILENAME) | 
 |         except Exception as e: | 
 |             sys.stderr.write( | 
 |                 "Could not guess location of {}\n".format( | 
 |                     DEFAULT_SCHEMA_FILENAME)) | 
 |             sys.exit(2) | 
 |  | 
 |     schema = {} | 
 |     try: | 
 |         with open(schema_file) as fd: | 
 |             schema = json.load(fd) | 
 |     except FileNotFoundError as e: | 
 |         sys.stderr.write( | 
 |             "Could not read schema file '{}'\n".format(schema_file)) | 
 |         sys.exit(2) | 
 |  | 
 |     config_files = args.config or [] | 
 |     if len(config_files) == 0: | 
 |         try: | 
 |             source_dir = os.path.realpath(__file__).split(os.sep)[:-2] | 
 |             configs_dir = os.sep + os.path.join(*source_dir, 'configurations') | 
 |             data = os.walk(configs_dir) | 
 |             for root, _, files in data: | 
 |                 for f in files: | 
 |                     if f.endswith('.json'): | 
 |                         config_files.append(os.path.join(root, f)) | 
 |         except Exception as e: | 
 |             sys.stderr.write( | 
 |                 "Could not guess location of configurations\n") | 
 |             sys.exit(2) | 
 |  | 
 |     configs = [] | 
 |     for config_file in config_files: | 
 |         try: | 
 |             with open(config_file) as fd: | 
 |                 configs.append(json.load(fd)) | 
 |         except FileNotFoundError as e: | 
 |             sys.stderr.write( | 
 |                     "Could not parse config file '{}'\n".format(config_file)) | 
 |             sys.exit(2) | 
 |  | 
 |     expected_fails = [] | 
 |     if args.expected_fails: | 
 |         try: | 
 |             with open(args.expected_fails) as fd: | 
 |                 for line in fd: | 
 |                     expected_fails.append(line.strip()) | 
 |         except Exception as e: | 
 |             sys.stderr.write( | 
 |                     "Could not read expected fails file '{}'\n".format( | 
 |                         args.expected_fails)) | 
 |             sys.exit(2) | 
 |  | 
 |     base_uri = "file://{}/".format( | 
 |         os.path.split(os.path.realpath(schema_file))[0]) | 
 |     resolver = jsonschema.RefResolver(base_uri, schema) | 
 |     validator = jsonschema.Draft7Validator(schema, resolver=resolver) | 
 |  | 
 |     results = { | 
 |         "invalid": [], | 
 |         "unexpected_pass": [], | 
 |     } | 
 |     for config_file, config in zip(config_files, configs): | 
 |         name = os.path.split(config_file)[1] | 
 |         expect_fail = name in expected_fails | 
 |         try: | 
 |             validator.validate(config) | 
 |             if expect_fail: | 
 |                 results["unexpected_pass"].append(name) | 
 |                 if not getattr(args, "continue"): | 
 |                     break | 
 |         except jsonschema.exceptions.ValidationError as e: | 
 |             if not expect_fail: | 
 |                 results["invalid"].append(name) | 
 |                 if args.verbose: | 
 |                     print(e) | 
 |             if expect_fail or getattr(args, "continue"): | 
 |                 continue | 
 |             break | 
 |  | 
 |     exit_status = 0 | 
 |     if len(results["invalid"]) + len(results["unexpected_pass"]): | 
 |         exit_status = 1 | 
 |         unexpected_pass_suffix = " **" | 
 |         show_suffix_explanation = False | 
 |         print("results:") | 
 |         for f in config_files: | 
 |             if any([x in f for x in results["unexpected_pass"]]): | 
 |                 show_suffix_explanation = True | 
 |                 print("  '{}' passed!{}".format(f, unexpected_pass_suffix)) | 
 |             if any([x in f for x in results["invalid"]]): | 
 |                 print("  '{}' failed!".format(f)) | 
 |  | 
 |         if show_suffix_explanation: | 
 |             print("\n** configuration expected to fail") | 
 |  | 
 |     sys.exit(exit_status) | 
 |  | 
 |  | 
 | if __name__ == "__main__": | 
 |     main() |