• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stlehmann / pyads / 8404357596

23 Mar 2024 08:36PM UTC coverage: 94.404%. First build
8404357596

push

github

web-flow
Merge pull request #373 from chrisbeardy/3.4.0

changed workflow to pip pytest 7.4.4 - fixes CI

1704 of 1805 relevant lines covered (94.4%)

3.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.95
/pyads/testserver/basic_handler.py
1
"""Basic handler module for testserver.
2

3
:author: Stefan Lehmann <stlm@posteo.de>
4
:license: MIT, see license file or https://opensource.org/licenses/MIT
5
:created on: 2017-09-15
6

7
"""
8
from typing import List, Union
4✔
9
import struct
4✔
10

11
from .handler import AbstractHandler, AmsPacket, AmsResponseData, logger
4✔
12
from pyads import constants
4✔
13

14

15
class BasicHandler(AbstractHandler):
4✔
16
    """Basic request handler.
17

18
    Basic request handler to print the request data and return some default values.
19

20
    """
21

22
    def handle_request(self, request: AmsPacket) -> AmsResponseData:
4✔
23
        """Handle incoming requests and send a response."""
24
        # Extract command id from the request
25
        command_id_bytes = request.ams_header.command_id
4✔
26
        command_id = struct.unpack("<H", command_id_bytes)[0]
4✔
27

28
        # Set AMS state correctly for response
29
        state = struct.unpack("<H", request.ams_header.state_flags)[0]
4✔
30
        state = state | 0x0001  # Set response flag
4✔
31
        state = struct.pack("<H", state)
4✔
32

33
        # Handle request
34
        if command_id == constants.ADSCOMMAND_READDEVICEINFO:
4✔
35
            logger.info("Command received: READ_DEVICE_INFO")
4✔
36

37
            # Create dummy response: version 1.2.3, device name 'TestServer'
38
            major_version = "\x01".encode("utf-8")
4✔
39
            minor_version = "\x02".encode("utf-8")
4✔
40
            version_build = "\x03\x00".encode("utf-8")
4✔
41
            device_name = "TestServer\x00".encode("utf-8")
4✔
42

43
            response_content = (
4✔
44
                    major_version + minor_version + version_build + device_name
45
            )
46

47
        elif command_id == constants.ADSCOMMAND_READ:
4✔
48
            logger.info("Command received: READ")
4✔
49
            # Parse requested data length
50
            response_length = \
4✔
51
                struct.unpack("<I", request.ams_header.data[8:12])[0]
52
            # Create response of repeated 0x0F with a null terminator for strings
53
            response_value = (
4✔
54
                    ("\x0F" * (response_length - 1)) + "\x00").encode(
55
                "utf-8")
56
            response_content = struct.pack("<I", len(
4✔
57
                response_value)) + response_value
58

59
        elif command_id == constants.ADSCOMMAND_WRITE:
4✔
60
            logger.info("Command received: WRITE")
4✔
61
            # No response data required
62
            response_content = "".encode("utf-8")
4✔
63

64
        elif command_id == constants.ADSCOMMAND_READSTATE:
4✔
65
            logger.info("Command received: READ_STATE")
4✔
66
            ads_state = struct.pack("<H", constants.ADSSTATE_RUN)
4✔
67
            # I don't know what an appropriate value for device state is.
68
            # I suspect it may be unused..
69
            device_state = struct.pack("<H", 0)
4✔
70

71
            response_content = ads_state + device_state
4✔
72

73
        elif command_id == constants.ADSCOMMAND_WRITECTRL:
4✔
74
            logger.info("Command received: WRITE_CONTROL")
4✔
75
            # No response data required
76
            response_content = "".encode("utf-8")
4✔
77

78
        elif command_id == constants.ADSCOMMAND_ADDDEVICENOTE:
4✔
79
            logger.info("Command received: ADD_DEVICE_NOTIFICATION")
4✔
80
            handle = ("\x0F" * 4).encode("utf-8")
4✔
81
            response_content = handle
4✔
82

83
        elif command_id == constants.ADSCOMMAND_DELDEVICENOTE:
4✔
84
            logger.info("Command received: DELETE_DEVICE_NOTIFICATION")
4✔
85
            # No response data required
86
            response_content = "".encode("utf-8")
4✔
87

88
        elif command_id == constants.ADSCOMMAND_DEVICENOTE:
4✔
89
            logger.info("Command received: DEVICE_NOTIFICATION")
×
90
            # No response data required
91
            response_content = "".encode("utf-8")
×
92

93
        elif command_id == constants.ADSCOMMAND_READWRITE:
4✔
94
            logger.info("Command received: READ_WRITE")
4✔
95
            # parse the request
96
            index_group = struct.unpack("<I", request.ams_header.data[:4])[0]
4✔
97
            response_length = \
4✔
98
                struct.unpack("<I", request.ams_header.data[8:12])[0]
99
            write_length = struct.unpack("<I", request.ams_header.data[12:16])[
4✔
100
                0]
101
            write_data = request.ams_header.data[16: (16 + write_length)]
4✔
102

103
            if index_group == constants.ADSIGRP_SYM_INFOBYNAMEEX:
4✔
104
                # Pack the structure in the same format as SAdsSymbolEntry.
105
                # Only 'EntrySize' (first field) and Type will be filled.
106
                # Use fixed UINT8 type
107
                if "str_" in write_data.decode():
4✔
108
                    response_value = struct.pack(
4✔
109
                        "<IIIIIIHHH", 30, 0, 0, 5, constants.ADST_STRING, 0, 0,
110
                        0, 0
111
                    )
112
                # Non-existent type
113
                elif "no_type" in write_data.decode():
4✔
114
                    response_value = struct.pack(
×
115
                        "<IIIIIIHHH", 30, 0, 0, 5, 1, 0, 0, 0, 0
116
                    )
117
                # Array
118
                elif "ar_" in write_data.decode():
4✔
119
                    response_value = struct.pack(
×
120
                        "<IIIIIIHHH", 30, 0, 0, 2, constants.ADST_UINT8, 0, 0,
121
                        0, 0
122
                    )
123
                else:
124
                    logger.info("Packing ADST_UINT8...")
4✔
125
                    response_value = struct.pack(
4✔
126
                        "<IIIIIIHHH", 30, 0, 0, 1, constants.ADST_UINT8, 0, 0,
127
                        0, 0
128
                    )
129

130
            elif index_group == constants.ADSIGRP_SUMUP_READ:
4✔
131
                n_reads = len(write_data) // 12
4✔
132
                fmt = "<" + n_reads * "I"
4✔
133
                vals: List[Union[int, bytes]] = [0 for _ in range(n_reads)]
4✔
134

135
                for i in range(n_reads):
4✔
136
                    buf = write_data[i * 12 + 8:i * 12 + 12]
4✔
137
                    is_str = struct.unpack("<I", buf)[0] == 5
4✔
138

139
                    if is_str:
4✔
140
                        fmt += "5s"
4✔
141
                        vals.append(b"test\x00")
4✔
142
                    else:
143
                        fmt += "B"
4✔
144
                        vals.append(i + 1)
4✔
145
                response_value = struct.pack(fmt, *vals)
4✔
146

147
            elif index_group == constants.ADSIGRP_SUMUP_WRITE:
4✔
148
                n_writes = len(write_data) // 12
4✔
149
                fmt = "<" + n_writes * "I"
4✔
150
                vals = n_writes * [0]
4✔
151
                response_value = struct.pack(fmt, *vals)
4✔
152

153
            elif response_length > 0:
4✔
154
                # Create response of repeated 0x0F with a null terminator for strings
155
                response_value = (
4✔
156
                        ("\x0F" * (response_length - 1)) + "\x00").encode(
157
                    "utf-8"
158
                )
159
            else:
160
                response_value = b""
4✔
161

162
            response_content = struct.pack("<I", len(
4✔
163
                response_value)) + response_value
164

165
        else:
166
            logger.info("Unknown Command: {0}".format(hex(command_id)))
×
167
            # Set error code to 'unknown command ID'
168
            error_code = "\x08\x00\x00\x00".encode("utf-8")
×
169
            return AmsResponseData(state, error_code, "".encode("utf-8"))
×
170

171
        # Set no error in response
172
        error_code = ("\x00" * 4).encode("utf-8")
4✔
173
        response_data = error_code + response_content
4✔
174

175
        return AmsResponseData(state, request.ams_header.error_code,
4✔
176
                               response_data)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc