Matthew Barth | 86458e3 | 2019-11-06 10:04:44 -0600 | [diff] [blame^] | 1 | #!/usr/bin/python |
| 2 | |
| 3 | """ |
| 4 | This script reads in JSON files containing a set of expected entries to be |
| 5 | found within a given input JSON file. An optional "filtering" JSON |
| 6 | file may also be provided that contains a set of data that will be used to |
| 7 | filter configured expected entries from being checked in the input JSON. |
| 8 | """ |
| 9 | |
| 10 | import os |
| 11 | import sys |
| 12 | import json |
| 13 | from argparse import ArgumentParser |
| 14 | |
| 15 | |
| 16 | def findEntry(entry, jsonObject): |
| 17 | |
| 18 | if isinstance(jsonObject, dict): |
| 19 | for key in jsonObject: |
| 20 | if key == entry: |
| 21 | return jsonObject[key] |
| 22 | else: |
| 23 | found = findEntry(entry, jsonObject[key]) |
| 24 | if found: |
| 25 | return found |
| 26 | |
| 27 | |
| 28 | def buildDict(entry, jsonObject, resultDict): |
| 29 | |
| 30 | key = list(entry)[0] |
| 31 | jsonObject = findEntry(key, jsonObject) |
| 32 | if jsonObject is None: |
| 33 | return {} |
| 34 | entry = entry[key] |
| 35 | if isinstance(entry, dict): |
| 36 | resultDict[key] = buildDict(entry, jsonObject, resultDict) |
| 37 | else: |
| 38 | return {key: jsonObject} |
| 39 | |
| 40 | |
| 41 | def doAnd(andList, filters): |
| 42 | |
| 43 | allTrue = True |
| 44 | for entry in andList: |
| 45 | # $and entries must be atleast a single layer dict |
| 46 | value = dict() |
| 47 | buildDict(entry, filters, value) |
| 48 | if value != entry: |
| 49 | allTrue = False |
| 50 | |
| 51 | return allTrue |
| 52 | |
| 53 | |
| 54 | def doOr(orList, filters): |
| 55 | |
| 56 | anyTrue = False |
| 57 | for entry in orList: |
| 58 | # $or entries must be atleast a single layer dict |
| 59 | value = dict() |
| 60 | buildDict(entry, filters, value) |
| 61 | if value == entry: |
| 62 | anyTrue = True |
| 63 | break |
| 64 | |
| 65 | return anyTrue |
| 66 | |
| 67 | |
| 68 | def doNor(norList, filters): |
| 69 | |
| 70 | allFalse = True |
| 71 | for entry in norList: |
| 72 | # $nor entries must be atleast a single layer dict |
| 73 | value = dict() |
| 74 | buildDict(entry, filters, value) |
| 75 | if value == entry: |
| 76 | allFalse = False |
| 77 | |
| 78 | return allFalse |
| 79 | |
| 80 | |
| 81 | def doNot(notDict, filters): |
| 82 | |
| 83 | # $not entry must be atleast a single layer dict |
| 84 | value = dict() |
| 85 | buildDict(notDict, filters, value) |
| 86 | if value == notDict: |
| 87 | return False |
| 88 | |
| 89 | return True |
| 90 | |
| 91 | |
| 92 | def applyFilters(expected, filters): |
| 93 | |
| 94 | switch = { |
| 95 | # $and - Performs an AND operation on an array with at least two |
| 96 | # expressions and returns the document that meets all the |
| 97 | # expressions. i.e.) {"$and": [{"age": 5}, {"name": "Joe"}]} |
| 98 | "$and": doAnd, |
| 99 | # $or - Performs an OR operation on an array with at least two |
| 100 | # expressions and returns the documents that meet at least one of |
| 101 | # the expressions. i.e.) {"$or": [{"age": 4}, {"name": "Joe"}]} |
| 102 | "$or": doOr, |
| 103 | # $nor - Performs a NOR operation on an array with at least two |
| 104 | # expressions and returns the documents that do not meet any of |
| 105 | # the expressions. i.e.) {"$nor": [{"age": 3}, {"name": "Moe"}]} |
| 106 | "$nor": doNor, |
| 107 | # $not - Performs a NOT operation on the specified expression and |
| 108 | # returns the documents that do not meet the expression. |
| 109 | # i.e.) {"$not": {"age": 4}} |
| 110 | "$not": doNot |
| 111 | } |
| 112 | |
| 113 | isExpected = {} |
| 114 | for entry in expected: |
| 115 | expectedList = list() |
| 116 | if entry == "$op": |
| 117 | addInput = True |
| 118 | for op in expected[entry]: |
| 119 | if op != "$input": |
| 120 | func = switch.get(op) |
| 121 | if not func(expected[entry][op], filters): |
| 122 | addInput = False |
| 123 | if addInput: |
| 124 | expectedList = expected[entry]["$input"] |
| 125 | else: |
| 126 | expectedList = [dict({entry: expected[entry]})] |
| 127 | |
| 128 | for i in expectedList: |
| 129 | for key in i: |
| 130 | isExpected[key] = i[key] |
| 131 | |
| 132 | return isExpected |
| 133 | |
| 134 | |
| 135 | def findExpected(expected, input): |
| 136 | |
| 137 | result = {} |
| 138 | for key in expected: |
| 139 | jsonObject = findEntry(key, input) |
| 140 | if isinstance(expected[key], dict) and expected[key] and jsonObject: |
| 141 | notExpected = findExpected(expected[key], jsonObject) |
| 142 | if notExpected: |
| 143 | result[key] = notExpected |
| 144 | else: |
| 145 | # If expected value is not "dont care" and |
| 146 | # does not equal what's expected |
| 147 | if str(expected[key]) != "{}" and expected[key] != jsonObject: |
| 148 | if jsonObject is None: |
| 149 | result[key] = None |
| 150 | else: |
| 151 | result[key] = expected[key] |
| 152 | return result |
| 153 | |
| 154 | |
| 155 | if __name__ == '__main__': |
| 156 | parser = ArgumentParser( |
| 157 | description="Expected JSON cross-checker. Similar to a JSON schema \ |
| 158 | validator, however this cross-checks a set of expected \ |
| 159 | property states against the contents of a JSON input \ |
| 160 | file with the ability to apply an optional set of \ |
| 161 | filters against what's expected based on the property \ |
| 162 | states within the provided filter JSON.") |
| 163 | |
| 164 | parser.add_argument('index', |
| 165 | help='Index name into a set of entries within the \ |
| 166 | expected JSON file') |
| 167 | parser.add_argument('expected_json', |
| 168 | help='JSON input file containing the expected set of \ |
| 169 | entries, by index name, to be contained within \ |
| 170 | the JSON input file') |
| 171 | parser.add_argument('input_json', |
| 172 | help='JSON input file containing the JSON data to be \ |
| 173 | cross-checked against what is expected') |
| 174 | parser.add_argument('-f', '--filters', dest='filter_json', |
| 175 | help='JSON file containing path:property:value \ |
| 176 | associations to optional filters configured \ |
| 177 | within the expected set of JSON entries') |
| 178 | |
| 179 | args = parser.parse_args() |
| 180 | |
| 181 | with open(args.expected_json, 'r') as expected_json: |
| 182 | expected = json.load(expected_json) or {} |
| 183 | with open(args.input_json, 'r') as input_json: |
| 184 | input = json.load(input_json) or {} |
| 185 | |
| 186 | filters = {} |
| 187 | if args.filter_json: |
| 188 | with open(args.filter_json, 'r') as filters_json: |
| 189 | filters = json.load(filters_json) or {} |
| 190 | |
| 191 | if args.index in expected and expected[args.index] is not None: |
| 192 | expected = applyFilters(expected[args.index], filters) |
| 193 | result = findExpected(expected, input) |
| 194 | if result: |
| 195 | print("NOT FOUND:") |
| 196 | for key in result: |
| 197 | print(key + ": " + str(result[key])) |
| 198 | else: |
| 199 | print("Error: " + args.index + " not found in " + args.expected_json) |
| 200 | sys.exit(1) |