• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 9841431065

08 Jul 2024 02:23PM UTC coverage: 98.824%. Remained the same
9841431065

Pull #242

github

web-flow
Merge 2fb1e1212 into 6e3fbad16
Pull Request #242: Testing Python 3.13

840 of 850 relevant lines covered (98.82%)

3.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/mocket/mockredis.py
1
from itertools import chain
4✔
2

3
from .compat import byte_type, decode_from_bytes, encode_to_bytes, shsplit, text_type
4✔
4
from .mocket import Mocket, MocketEntry
4✔
5

6

7
class Request:
4✔
8
    def __init__(self, data):
4✔
9
        self.data = data
4✔
10

11

12
class Response:
4✔
13
    def __init__(self, data=None):
4✔
14
        self.data = Redisizer.redisize(data or OK)
4✔
15

16

17
class Redisizer(byte_type):
4✔
18
    @staticmethod
4✔
19
    def tokens(iterable):
4✔
20
        iterable = [encode_to_bytes(x) for x in iterable]
4✔
21
        return [f"*{len(iterable)}".encode()] + list(
4✔
22
            chain(*zip([f"${len(x)}".encode() for x in iterable], iterable))
23
        )
24

25
    @staticmethod
4✔
26
    def redisize(data):
4✔
27
        def get_conversion(t):
4✔
28
            return {
4✔
29
                dict: lambda x: b"\r\n".join(
30
                    Redisizer.tokens(list(chain(*tuple(x.items()))))
31
                ),
32
                int: lambda x: f":{x}".encode(),
33
                text_type: lambda x: "${}\r\n{}".format(
34
                    len(x.encode("utf-8")), x
35
                ).encode("utf-8"),
36
                list: lambda x: b"\r\n".join(Redisizer.tokens(x)),
37
            }[t]
38

39
        if isinstance(data, Redisizer):
4✔
40
            return data
4✔
41
        if isinstance(data, byte_type):
4✔
42
            data = decode_from_bytes(data)
4✔
43
        return Redisizer(get_conversion(data.__class__)(data) + b"\r\n")
4✔
44

45
    @staticmethod
4✔
46
    def command(description, _type="+"):
4✔
47
        return Redisizer("{}{}{}".format(_type, description, "\r\n").encode("utf-8"))
4✔
48

49
    @staticmethod
4✔
50
    def error(description):
4✔
51
        return Redisizer.command(description, _type="-")
4✔
52

53

54
OK = Redisizer.command("OK")
4✔
55
QUEUED = Redisizer.command("QUEUED")
4✔
56
ERROR = Redisizer.error
4✔
57

58

59
class Entry(MocketEntry):
4✔
60
    request_cls = Request
4✔
61
    response_cls = Response
4✔
62

63
    def __init__(self, addr, command, responses):
4✔
64
        super().__init__(addr or ("localhost", 6379), responses)
4✔
65
        d = shsplit(command)
4✔
66
        d[0] = d[0].upper()
4✔
67
        self.command = Redisizer.tokens(d)
4✔
68

69
    def can_handle(self, data):
4✔
70
        return data.splitlines() == self.command
4✔
71

72
    @classmethod
4✔
73
    def register(cls, addr, command, *responses):
4✔
74
        responses = [
4✔
75
            r if isinstance(r, BaseException) else cls.response_cls(r)
76
            for r in responses
77
        ]
78
        Mocket.register(cls(addr, command, responses))
4✔
79

80
    @classmethod
4✔
81
    def register_response(cls, command, response, addr=None):
4✔
82
        cls.register(addr, command, response)
4✔
83

84
    @classmethod
4✔
85
    def register_responses(cls, command, responses, addr=None):
4✔
86
        cls.register(addr, command, *responses)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc