• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

unioslo / mreg / 6249718570

20 Sep 2023 02:08PM UTC coverage: 99.176% (-0.01%) from 99.19%
6249718570

Pull #519

github

oyvindhagberg
Bugfix: Add a type check to filter_sensitive_data

The filter_sensitive_data function must check whether the content is a
dict or a str before attempting to replace the password with dots.
Pull Request #519: Fix curl auth issue

4 of 5 new or added lines in 1 file covered. (80.0%)

6981 of 7039 relevant lines covered (99.18%)

4.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.33
/mreg/log_processors.py
1
"""Logging processors for mreg."""
2

3
from collections import defaultdict
5✔
4
from typing import Any
5✔
5
import re
5✔
6

7
from rich.console import Console
5✔
8
from rich.text import Text
5✔
9
from structlog.typing import EventDict
5✔
10

11

12
def _replace_token(token: str) -> str:
5✔
13
    """Replace a token with a shortened and safe version of it."""
14
    if len(token) > 10:
5✔
15
        return token[:3] + "..." + token[-3:]
5✔
16

17
    return "..."
5✔
18

19

20
def filter_sensitive_data(_: Any, __: Any, event_dict: EventDict) -> EventDict:
5✔
21
    """Filter sensitive data from a structlogs event_dict.
22

23
    :param _: Unused parameter
24
    :param __: Unused parameter
25
    :param event_dict: Dictionary containing event data.
26

27
    :returns: Event dictionary with sensitive data filtered.
28
    """
29
    LOGIN_PATH = "/api/token-auth/"
5✔
30

31
    if "model" in event_dict and event_dict["model"] in ["ExpiringToken", "Session"]:
5✔
32
        clean_token = _replace_token(event_dict["_str"])
5✔
33
        event_dict["_str"] = clean_token
5✔
34
        event_dict["id"] = clean_token
5✔
35

36
    is_login_event = (
5✔
37
        "path" in event_dict
38
        and event_dict["path"] == LOGIN_PATH
39
        and "method" in event_dict
40
        and event_dict["method"] == "POST"
41
    )
42

43
    if is_login_event:
5✔
44
        content: str = event_dict["content"]
5✔
45
        event: str = event_dict["event"]
5✔
46

47
        if event == "request" and "password" in content:
5✔
48
            if isinstance(event_dict["content"],dict):
5✔
49
                event_dict["content"]["password"] = '...'
5✔
50
            if isinstance(event_dict["content"],str):
5✔
NEW
51
                re.sub(r'"password"\s*=\s*".*?"', '"password"="..."', event_dict["content"])
×
52
        elif event == "response" and "token" in content:
5✔
53
            token = content.split('"token":"')[1].split('"')[0]
5✔
54
            event_dict["content"] = content.replace(token, _replace_token(token))
5✔
55

56
    return event_dict
5✔
57

58

59
def collapse_request_id_processor(_: Any, __: Any, event_dict: EventDict) -> EventDict:
5✔
60
    """Collapse request_id into a shorter form."""
61
    if "request_id" in event_dict:
5✔
62
        event_dict["request_id"] = _replace_token(event_dict["request_id"])
5✔
63
    return event_dict
5✔
64

65

66
def reorder_keys_processor(_: Any, __: Any, event_dict: EventDict) -> EventDict:
5✔
67
    """Reorder keys in a structlogs event_dict, ensuring that request_id always comes first."""
68
    event_dict = {
5✔
69
        k: event_dict[k]
70
        for k in sorted(event_dict.keys(), key=lambda k: k != "request_id")
71
    }
72
    return event_dict
5✔
73

74

75
class RequestColorTracker:
5✔
76
    """Add an easy to track colored bubble based on an events request_id.
77

78
    :ivar COLORS: A list of color names to use for the bubbles.
79
    :ivar request_to_color: A dictionary mapping request_ids to colors.
80
    """
81

82
    COLORS = ["red", "white", "green", "yellow", "blue", "magenta", "cyan"]
5✔
83

84
    def __init__(self):
5✔
85
        """Initialize a new RequestColorizer.
86

87
        Sets the initial mapping of request_ids to colors to be an empty defaultdict.
88
        """
89
        self.console = Console()
5✔
90
        self.request_to_color = defaultdict(self._color_generator().__next__)
5✔
91

92
    def _colorize(self, color: str, s: str) -> str:
5✔
93
        """Colorize a string using Rich.
94

95
        :param color: The name of the color to use.
96
        :param s: The string to colorize.
97
        :return: The colorized string.
98
        """
99
        text = Text(s, style=f"bold {color}")
5✔
100

101
        with self.console.capture() as capture:
5✔
102
            self.console.print(text)
5✔
103

104
        output = capture.get()
5✔
105

106
        return output.rstrip()  # remove trailing newline
5✔
107

108
    def _color_generator(self):
5✔
109
        """Create a generator that cycles through the colors.
110

111
        :yield: A color from the COLORS list.
112
        """
113
        i = 0
5✔
114
        while True:
3✔
115
            yield self.COLORS[i % len(self.COLORS)]
5✔
116
            i += 1
5✔
117

118
    def __call__(self, _: Any, __: Any, event_dict: EventDict) -> EventDict:
5✔
119
        """Add a colored bubble to the event message.
120

121
        :param _: The logger instance. This argument is ignored.
122
        :param __: The log level. This argument is ignored.
123
        :param event_dict: The event dictionary of the log entry.
124
        :return: The modified event dictionary.
125
        """
126
        request_id = event_dict.get("request_id")
5✔
127

128
        color = "black"
5✔
129
        if request_id:
5✔
130
            color = self.request_to_color[request_id]
5✔
131

132
        colored_bubble = self._colorize(color, " • ")
5✔
133
        event_dict["event"] = colored_bubble + event_dict.get("event", "")
5✔
134

135
        return event_dict
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc