• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Problematy / goodmap / 10249557159

05 Aug 2024 01:15PM UTC coverage: 94.737% (+1.5%) from 93.258%
10249557159

push

github

web-flow
 Create script checking data validity #92 (#110)

72 of 74 branches covered (97.3%)

Branch coverage included in aggregate %.

64 of 66 new or added lines in 1 file covered. (96.97%)

180 of 192 relevant lines covered (93.75%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/goodmap/data_validator.py
1
import json
1✔
2
from enum import Enum
1✔
3
from sys import argv, stderr
1✔
4

5

6
class ViolationType(Enum):
1✔
7
    INVALID_JSON_FORMAT = 0
1✔
8
    MISSING_OBLIGATORY_FIELD = 1
1✔
9
    INVALID_VALUE_IN_CATEGORY = 2
1✔
10
    NULL_VALUE = 3
1✔
11

12
    def get_error_message(self):
1✔
NEW
13
        error_message_dict = {
×
14
            0: "invalid json format",
15
            1: "missing obligatory field",
16
            2: "invalid value in category",
17
            3: "attribute has null value",
18
        }
NEW
19
        return error_message_dict[self.value]
×
20

21

22
class DataViolation:
1✔
23
    def __init__(self, violation_type: ViolationType):
1✔
24
        self.violation_type = violation_type
1✔
25

26

27
class FormatViolation(DataViolation):
1✔
28
    def __init__(self, decoding_error):
1✔
29
        super().__init__(ViolationType.INVALID_JSON_FORMAT)
1✔
30
        self.decoding_error = decoding_error
1✔
31

32

33
class FieldViolation(DataViolation):
1✔
34
    def __init__(self, violation_type: ViolationType, datapoint, violating_field):
1✔
35
        super().__init__(violation_type)
1✔
36
        self.datapoint = datapoint
1✔
37
        self.violating_field = violating_field
1✔
38

39

40
def get_missing_obligatory_fields_violations(p, obligatory_fields):
1✔
41
    violations = []
1✔
42
    for field in obligatory_fields:
1✔
43
        if field not in p.keys():
1✔
44
            violations.append(FieldViolation(ViolationType.MISSING_OBLIGATORY_FIELD, p, field))
1✔
45
    return violations
1✔
46

47

48
def get_invalid_value_in_category_violations(p, categories):
1✔
49
    violations = []
1✔
50
    for category in categories & p.keys():
1✔
51
        category_value_in_point = p[category]
1✔
52
        valid_values_set = categories[category]
1✔
53
        if type(category_value_in_point) is list:
1✔
54
            for attribute_value in category_value_in_point:
1✔
55
                if attribute_value not in valid_values_set:
1✔
56
                    violations.append(
1✔
57
                        FieldViolation(ViolationType.INVALID_VALUE_IN_CATEGORY, p, category)
58
                    )
59
        else:
60
            if category_value_in_point not in valid_values_set:
1✔
61
                violations.append(
1✔
62
                    FieldViolation(ViolationType.INVALID_VALUE_IN_CATEGORY, p, category)
63
                )
64
    return violations
1✔
65

66

67
def get_null_values_violations(p):
1✔
68
    violations = []
1✔
69
    for attribute, value in p.items():
1✔
70
        if value is None:
1✔
71
            violations.append(FieldViolation(ViolationType.NULL_VALUE, p, attribute))
1✔
72
    return violations
1✔
73

74

75
def report_data_violations_from_json(json_database):
1✔
76
    map_data = json_database["map"]
1✔
77
    datapoints = map_data["data"]
1✔
78
    categories = map_data["categories"]
1✔
79
    obligatory_fields = map_data["obligatory_fields"]
1✔
80

81
    data_violations = []
1✔
82

83
    for p in datapoints:
1✔
84
        data_violations += get_missing_obligatory_fields_violations(p, obligatory_fields)
1✔
85
        data_violations += get_invalid_value_in_category_violations(p, categories)
1✔
86
        data_violations += get_null_values_violations(p)
1✔
87

88
    return data_violations
1✔
89

90

91
def report_data_violations_from_json_file(path_to_json_file):
1✔
92
    with open(path_to_json_file) as json_file:
1✔
93
        try:
1✔
94
            return report_data_violations_from_json(json.load(json_file))
1✔
95
        except json.JSONDecodeError as e:
1✔
96
            return [FormatViolation(e)]
1✔
97

98

99
def print_reported_violations(data_violations):  # pragma: no cover
1✔
100
    for violation in data_violations:
101
        violation_type = violation.violation_type
102
        if violation_type == ViolationType.INVALID_JSON_FORMAT:
103
            print("DATA ERROR: invalid json format", file=stderr)
104
            print(violation.decoding_error, file=stderr)
105
        else:
106
            violating_field = violation.violating_field
107
            violation_type_error = violation_type.get_error_message()
108
            print(
109
                f"DATA ERROR: {violation_type_error} {violating_field} in datapoint:", file=stderr
110
            )
111
            print(violation.datapoint, file=stderr)
112

113

114
if __name__ == "__main__":  # pragma: no cover
1✔
115
    data_violations = report_data_violations_from_json_file(argv[1])
116
    if data_violations == []:
117
        print("All data is valid", file=stderr)
118
    else:
119
        print_reported_violations(data_violations)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc