• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stlehmann / pyads / 8404357596

23 Mar 2024 08:36PM UTC coverage: 94.404%. First build
8404357596

push

github

web-flow
Merge pull request #373 from chrisbeardy/3.4.0

changed workflow to pip pytest 7.4.4 - fixes CI

1704 of 1805 relevant lines covered (94.4%)

3.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.57
/pyads/testserver/advanced_handler.py
1
"""Advanced handler module for testserver.
2

3
:author: Stefan Lehmann <stlm@posteo.de>
4
:license: MIT, see license file or https://opensource.org/licenses/MIT
5
:created on: 2017-09-15
6

7
"""
8
from typing import Optional, Union, Dict, Tuple, List
4✔
9
import struct
4✔
10
import ctypes
4✔
11
from datetime import datetime
4✔
12

13
from .handler import AbstractHandler, AmsPacket, AmsResponseData, logger
4✔
14
from pyads import constants, structs
4✔
15
from pyads.filetimes import dt_to_filetime
4✔
16
from pyads.pyads_ex import callback_store
4✔
17

18

19
class PLCVariable:
4✔
20
    """Storage item for named data.
21

22
    Also include variable type so it can be retrieved later.
23
    This basically mirrors SAdsSymbolEntry or AdsSymbol, however we want to
24
    avoid using those directly since they are test subjects.
25
    """
26

27
    handle_count = 10000  # Keep track of the latest awarded handle
4✔
28
    notification_count = 10  # Keep track the latest notification handle
4✔
29

30
    INDEX_GROUP = 12345
4✔
31
    INDEX_OFFSET_BASE = 10000
4✔
32

33
    def __init__(
4✔
34
        self,
35
        name: str,
36
        value: Union[int, float, bytes],
37
        ads_type: int,
38
        symbol_type: str,
39
        index_group: Optional[int] = None,
40
        index_offset: Optional[int] = None,
41
    ) -> None:
42
        """
43
        Handle and indices are set by default (to random but safe values)
44

45
        :param str name: variable name
46
        :param bytes value: variable value as bytes
47
        :param int ads_type: constants.ADST_*
48
        :param str symbol_type: PLC-style name of type
49
        :param Optional[int] index_group: set index_group manually
50
        :param Optional[int] index_offset: set index_offset manually
51
        """
52
        self.name = name.strip("\x00")
4✔
53

54
        # value is stored in binary!
55
        if isinstance(value, bytes):
4✔
56
            self.value = value
4✔
57
        else:
58
            # try to pack value according to ads_type
59
            fmt = constants.DATATYPE_MAP[constants.ads_type_to_ctype[ads_type]]
4✔
60
            self.value = struct.pack(fmt, value)
4✔
61

62
        self.ads_type = ads_type
4✔
63
        self.symbol_type = symbol_type
4✔
64

65
        self.handle = PLCVariable.handle_count
4✔
66
        PLCVariable.handle_count += 1
4✔
67

68
        if index_group is None:
4✔
69
            self.index_group = (
4✔
70
                PLCVariable.INDEX_GROUP
71
            )  # default value - shouldn't matter much
72
        else:
73
            self.index_group = index_group
4✔
74

75
        if index_offset is None:
4✔
76
            # cheat by using the handle as offset (since we know it will be unique)
77
            self.index_offset = PLCVariable.INDEX_OFFSET_BASE + self.handle
4✔
78
        else:
79
            self.index_offset = index_offset
4✔
80

81
        self.comment: str = ""
4✔
82

83
        self.notifications: List[int] = []  # List of associated notification handles
4✔
84

85
    @property
4✔
86
    def size(self) -> int:
4✔
87
        """Return size of value."""
88
        return len(self.value)
4✔
89

90
    def get_packed_info(self) -> bytes:
4✔
91
        """Get bytes array of symbol info"""
92
        if self.comment is None:
4✔
93
            self.comment = ""
×
94
        name_bytes = self.name.encode("utf-8")
4✔
95
        symbol_type_bytes = self.symbol_type.encode("utf-8")
4✔
96
        comment_bytes = self.comment.encode("utf-8")
4✔
97

98
        entry_length = (
4✔
99
            6 * 4
100
            + 3 * 2
101
            + len(name_bytes)
102
            + 1
103
            + len(symbol_type_bytes)
104
            + 1
105
            + len(comment_bytes)
106
        )
107

108
        read_data = (
4✔
109
            struct.pack(
110
                "<IIIIIIHHH",
111
                entry_length,  # Number of packed bytes
112
                self.index_group,
113
                self.index_offset,
114
                self.size,
115
                self.ads_type,
116
                0,  # Flags
117
                len(name_bytes),
118
                len(symbol_type_bytes),
119
                len(comment_bytes),
120
            )
121
            + name_bytes
122
            + b"\x20"
123
            + symbol_type_bytes
124
            + b"\x20"
125
            + comment_bytes
126
        )
127

128
        return read_data
4✔
129

130
    def write(self, value: bytes, request: AmsPacket = None):
4✔
131
        """Update the variable value, respecting notifications"""
132

133
        if self.value != value:
4✔
134
            if self.notifications:
4✔
135

136
                header = structs.SAdsNotificationHeader()
4✔
137
                header.hNotification = 0
4✔
138
                header.nTimeStamp = dt_to_filetime(datetime.now())
4✔
139
                header.cbSampleSize = len(value)
4✔
140

141
                # Perform byte-write into the header
142
                dst = ctypes.addressof(header) + structs.SAdsNotificationHeader.data.offset
4✔
143
                ctypes.memmove(dst, value, len(value))
4✔
144

145
                for notification_handle in self.notifications:
4✔
146

147
                    # It's hard to guess the exact AmsAddr from here, so instead
148
                    # ignore the address and search for the note_handle
149

150
                    for key, func in callback_store.items():
4✔
151

152
                        # callback_store is keyed by (AmsAddr, int)
153
                        if key[1] != notification_handle:
4✔
154
                            continue
×
155

156
                        header.hNotification = notification_handle
4✔
157
                        addr = key[0]
4✔
158

159
                        # Call c-wrapper for user callback
160
                        func(addr.amsAddrStruct(), header, 0)
4✔
161

162
        self.value = value
4✔
163

164
    def register_notification(self) -> int:
4✔
165
        """Register a new notification."""
166

167
        handle = self.notification_count
4✔
168
        self.notifications.append(handle)
4✔
169
        self.notification_count += 1
4✔
170
        return handle
4✔
171

172
    def unregister_notification(self, handle: int = None):
4✔
173
        """Unregister a notification.
174

175
        :param handle: Set to `None` (default) to unregister all notifications
176
        """
177

178
        if handle is None:
4✔
179
            self.notifications = []
×
180
        else:
181
            if handle in self.notifications:
4✔
182
                self.notifications.remove(handle)
4✔
183

184

185
class AdvancedHandler(AbstractHandler):
4✔
186
    """The advanced handler allows to store and restore data.
187

188
    The advanced handler allows to store and restore data via read, write and
189
    read_write functions. There is a storage area for each symbol. The
190
    purpose of this handler to test read/write access and test basic
191
    interaction.
192
    Variables can be read/write through indices, name and handle.
193

194
    An error will be thrown when an attempt is made to read from a
195
    non-existent variable. You can either: i) write the variable first (it
196
    is implicitly created) or ii) create the variable yourself and place it
197
    in the handler.
198
    Note that the variable type cannot be set correctly in the implicit
199
    creation! (It will default to UINT16.) Use explicit creation if a
200
    non-default type is important.
201
    """
202

203
    def __init__(self) -> None:
4✔
204
        self._data: Dict[Tuple[int, int], PLCVariable] = {}
4✔
205
        # This will be our variables database
206
        # We won't both with indexing it by handle or name, speed is not
207
        # important. We store by group + offset index and will have to
208
        # search inefficiently for name or handle. (Unlike real ADS!)
209

210
        self.reset()
4✔
211

212
    def reset(self) -> None:
4✔
213
        """Clear saved variables in handler"""
214
        self._data = {}
4✔
215

216
    def handle_request(self, request: AmsPacket) -> AmsResponseData:
4✔
217
        """Handle incoming requests and create a response."""
218
        # Extract command id from the request
219
        command_id_bytes = request.ams_header.command_id
4✔
220
        command_id = struct.unpack("<H", command_id_bytes)[0]
4✔
221

222
        # Set AMS state correctly for response
223
        state = struct.unpack("<H", request.ams_header.state_flags)[0]
4✔
224
        state = state | 0x0001  # Set response flag
4✔
225
        state = struct.pack("<H", state)
4✔
226

227
        def handle_read_device_info() -> bytes:
4✔
228
            """Create dummy response: version 1.2.3, device name 'TestServer'."""
229
            logger.info("Command received: READ_DEVICE_INFO")
4✔
230

231
            major_version = "\x01".encode("utf-8")
4✔
232
            minor_version = "\x02".encode("utf-8")
4✔
233
            version_build = "\x03\x00".encode("utf-8")
4✔
234
            device_name = "TestServer\x00".encode("utf-8")
4✔
235

236
            response_content = (
4✔
237
                major_version + minor_version + version_build + device_name
238
            )
239

240
            return response_content
4✔
241

242
        def handle_read() -> bytes:
4✔
243
            """Handle read request."""
244
            data = request.ams_header.data
4✔
245

246
            index_group = struct.unpack("<I", data[:4])[0]
4✔
247
            index_offset = struct.unpack("<I", data[4:8])[0]
4✔
248
            plc_datatype = struct.unpack("<I", data[8:12])[0]
4✔
249

250
            logger.info(
4✔
251
                (
252
                    "Command received: READ (index group={}, index offset={}, "
253
                    "data length={})"
254
                ).format(hex(index_group), hex(index_offset), plc_datatype)
255
            )
256

257
            # value by handle is demanded return from named data store
258
            if index_group == constants.ADSIGRP_SYM_VALBYHND:
4✔
259
                response_value = self.get_variable_by_handle(index_offset).value
4✔
260

261
            elif index_group == constants.ADSIGRP_SYM_UPLOADINFO2:
4✔
262
                symbol_count = len(self._data)
4✔
263
                response_length = 120 * symbol_count
4✔
264
                response_value = struct.pack("II", symbol_count, response_length)
4✔
265

266
            elif index_group == constants.ADSIGRP_SYM_UPLOAD:
4✔
267
                response_value = b""
4✔
268
                for (group, offset) in self._data.keys():
4✔
269
                    response_value += struct.pack("III", 120, group, offset)
4✔
270
                    response_value += b"\x00" * 108
4✔
271

272
            else:
273
                # Create response of repeated 0x0F with a null
274
                # terminator for strings
275
                var = self.get_variable_by_indices(index_group, index_offset)
4✔
276
                response_value = var.value[:plc_datatype]
4✔
277

278
            return struct.pack("<I", len(response_value)) + response_value
4✔
279

280
        def handle_write() -> bytes:
4✔
281
            """Handle write request."""
282
            data = request.ams_header.data
4✔
283

284
            index_group = struct.unpack("<I", data[:4])[0]
4✔
285
            index_offset = struct.unpack("<I", data[4:8])[0]
4✔
286
            plc_datatype = struct.unpack("<I", data[8:12])[0]
4✔
287
            value = data[12 : (12 + plc_datatype)]
4✔
288

289
            logger.info(
4✔
290
                (
291
                    "Command received: WRITE (index group={}, index offset={}, "
292
                    "data length={}, value={}"
293
                ).format(hex(index_group), hex(index_offset), plc_datatype, value)
294
            )
295

296
            if index_group == constants.ADSIGRP_SYM_RELEASEHND:
4✔
297
                return b""
4✔
298

299
            elif index_group == constants.ADSIGRP_SYM_VALBYHND:
4✔
300
                var = self.get_variable_by_handle(index_offset)
4✔
301
                var.write(value, request)
4✔
302
                return b""
4✔
303

304
            var = self.get_variable_by_indices(index_group, index_offset)
4✔
305

306
            var.write(value, request)
4✔
307

308
            # no return value needed
309
            return b""
4✔
310

311
        def handle_read_write() -> bytes:
4✔
312
            """Handle read-write request."""
313
            data = request.ams_header.data
4✔
314

315
            # parse the request
316
            index_group = struct.unpack("<I", data[:4])[0]
4✔
317
            index_offset = struct.unpack("<I", data[4:8])[0]
4✔
318
            read_length = struct.unpack("<I", data[8:12])[0]
4✔
319
            write_length = struct.unpack("<I", data[12:16])[0]
4✔
320
            write_data = data[16 : (16 + write_length)]
4✔
321

322
            logger.info(
4✔
323
                (
324
                    "Command received: READWRITE "
325
                    "(index group={}, index offset={}, read length={}, "
326
                    "write length={}, write data={})"
327
                ).format(
328
                    hex(index_group),
329
                    hex(index_offset),
330
                    read_length,
331
                    write_length,
332
                    write_data,
333
                )
334
            )
335

336
            # Get variable handle by name if demanded
337
            if index_group == constants.ADSIGRP_SYM_HNDBYNAME:
4✔
338

339
                var_name = write_data.decode()
4✔
340

341
                # This could be part of a write-by-name, so create the
342
                # variable if it does not yet exist
343
                var = self.get_variable_by_name(var_name)
4✔
344

345
                read_data = struct.pack("<I", var.handle)
4✔
346

347
            # Get the symbol if requested
348
            elif index_group == constants.ADSIGRP_SYM_INFOBYNAMEEX:
4✔
349

350
                var_name = write_data.decode()
4✔
351
                var = self.get_variable_by_name(var_name)
4✔
352

353
                read_data = var.get_packed_info()
4✔
354

355
            # Write to a list of variables
356
            elif index_group == constants.ADSIGRP_SUMUP_WRITE:
4✔
357
                num_requests = index_offset  # number of requests is coded in the offset for sumup_write
4✔
358
                rq_list = [
4✔
359
                    (
360
                        struct.unpack("<I", write_data[i : i + 4])[0],  # index_group
361
                        struct.unpack("<I", write_data[i + 4 : i + 8])[
362
                            0
363
                        ],  # index_offset
364
                        struct.unpack("<I", write_data[i + 8 : i + 12])[0],  # size
365
                    )
366
                    for i in range(0, num_requests * 12, 12)
367
                ]
368

369
                data = write_data[num_requests * 12 :]
4✔
370
                offset = 0
4✔
371

372
                for index_group, index_offset, size in rq_list:
4✔
373
                    var = self.get_variable_by_indices(index_group, index_offset)
4✔
374
                    var.write(data[offset : offset + size], request)
4✔
375
                    offset += size
4✔
376

377
                read_data = struct.pack("<" + num_requests * "I", *(num_requests * [0]))
4✔
378

379
            # Read a list of variables
380
            elif index_group == constants.ADSIGRP_SUMUP_READ:
4✔
381
                num_requests = index_offset
4✔
382
                rq_list = [
4✔
383
                    (
384
                        struct.unpack("<I", write_data[i : i + 4])[0],  # index_group
385
                        struct.unpack("<I", write_data[i + 4 : i + 8])[
386
                            0
387
                        ],  # index_offset
388
                        struct.unpack("<I", write_data[i + 8 : i + 12])[0],  # size
389
                    )
390
                    for i in range(0, num_requests * 12, 12)
391
                ]
392

393
                read_data = struct.pack("<" + num_requests * "I", *(num_requests * [0]))
4✔
394
                for index_group, index_offset, size in rq_list:
4✔
395
                    var = self.get_variable_by_indices(index_group, index_offset)
4✔
396
                    read_data += var.value
4✔
397

398
            # Else just return the value stored
399
            else:
400

401
                # read stored data
402
                var = self.get_variable_by_indices(index_group, index_offset)
4✔
403
                read_data = var.value[:read_length]
4✔
404

405
                # store write data
406
                var.write(write_data, request)
4✔
407

408
            return struct.pack("<I", len(read_data)) + read_data
4✔
409

410
        def handle_read_state() -> bytes:
4✔
411
            """Handle read-state request."""
412
            logger.info("Command received: READ_STATE")
4✔
413
            ads_state = struct.pack("<H", constants.ADSSTATE_RUN)
4✔
414
            # I don't know what an appropriate value for device state is.
415
            # I suspect it may be unused..
416
            device_state = struct.pack("<H", 0)
4✔
417
            return ads_state + device_state
4✔
418

419
        def handle_writectrl() -> bytes:
4✔
420
            """Handle writectrl request."""
421
            logger.info("Command received: WRITE_CONTROL")
4✔
422
            # No response data required
423
            return b""
4✔
424

425
        def handle_add_devicenote() -> bytes:
4✔
426
            """Handle add_devicenode request.
427

428
            The actual callback is stored in `pyads_ex.callback_store`. All we need to do
429
            here is remember to prompt the client with an updated value if a callback was
430
            placed. The client will remember which callback belongs to it.
431
            """
432

433
            data = request.ams_header.data
4✔
434

435
            index_group, index_offset, length, mode, max_delay, cycle_time = \
4✔
436
                struct.unpack("<IIIIII", data[:24])
437

438
            logger.info(
4✔
439
                "Command received: ADD_DEVICE_NOTIFICATION (index_group={}, "
440
                "index_group={})".format(index_group, index_offset)
441
            )
442

443
            # Return value is the notification_handle
444
            # The notification handle is an incrementing value
445

446
            var = self.get_variable_by_indices(index_group, index_offset)
4✔
447

448
            handle = var.register_notification()
4✔
449

450
            return handle.to_bytes(4, byteorder='little')
4✔
451

452
        def handle_delete_devicenote() -> bytes:
4✔
453
            """Handle delete_devicenode request."""
454

455
            data = request.ams_header.data
4✔
456

457
            handle = struct.unpack("<I", data[:4])[0]
4✔
458

459
            logger.info("Command received: DELETE_DEVICE_NOTIFICATION (handle={})".format(handle))
4✔
460

461
            var = self.get_variable_by_notification_handle(handle)
4✔
462
            var.unregister_notification(handle)
4✔
463

464
            # No response data required
465
            return b""
4✔
466

467
        def handle_devicenote() -> bytes:
4✔
468
            """Handle a device notification."""
469
            logger.info("Command received: DEVICE_NOTIFICATION")
×
470
            # No response data required
471
            return b""
×
472

473
        # Function map
474
        function_map = {
4✔
475
            constants.ADSCOMMAND_READDEVICEINFO: handle_read_device_info,
476
            constants.ADSCOMMAND_READ: handle_read,
477
            constants.ADSCOMMAND_WRITE: handle_write,
478
            constants.ADSCOMMAND_READWRITE: handle_read_write,
479
            constants.ADSCOMMAND_READSTATE: handle_read_state,
480
            constants.ADSCOMMAND_WRITECTRL: handle_writectrl,
481
            constants.ADSCOMMAND_ADDDEVICENOTE: handle_add_devicenote,
482
            constants.ADSCOMMAND_DELDEVICENOTE: handle_delete_devicenote,
483
            constants.ADSCOMMAND_DEVICENOTE: handle_devicenote,
484
        }
485

486
        # Try to map the command id to a function, else return error code
487
        if command_id in function_map:
4✔
488
            content = function_map[command_id]()
4✔
489

490
        else:
491
            logger.info("Unknown Command: {0}".format(hex(command_id)))
×
492
            # Set error code to 'unknown command ID'
493
            error_code = "\x08\x00\x00\x00".encode("utf-8")
×
494
            return AmsResponseData(state, error_code, "".encode("utf-8"))
×
495

496
        # Set no error in response
497
        error_code = ("\x00" * 4).encode("utf-8")
4✔
498
        response_data = error_code + content
4✔
499

500
        return AmsResponseData(state, request.ams_header.error_code, response_data)
4✔
501

502
    def get_variable_by_handle(self, handle: int) -> PLCVariable:
4✔
503
        """Get PLC variable by handle, throw error when not found"""
504
        for idx, var in self._data.items():
4✔
505
            if var.handle == handle:
4✔
506
                return var
4✔
507

508
        raise KeyError(
×
509
            "Variable with handle `{}` not found - Create it first "
510
            "explicitly or write to it".format(handle)
511
        )
512

513
    def get_variable_by_indices(
4✔
514
        self, index_group: int, index_offset: int
515
    ) -> PLCVariable:
516
        """Get PLC variable by handle, throw error when not found"""
517
        tup = (index_group, index_offset)
4✔
518
        if tup in self._data:
4✔
519
            return self._data[tup]
4✔
520
        raise KeyError(
×
521
            "Variable with indices ({}, {}) not found - Create "
522
            "it first explicitly or write to it".format(index_group, index_offset)
523
        )
524

525
    def get_variable_by_name(self, name: str) -> PLCVariable:
4✔
526
        """Get variable by name, throw error if not found"""
527
        name = name.strip("\x00")
4✔
528
        for key, var in self._data.items():
4✔
529
            if var.name == name:
4✔
530
                return var
4✔
531
        raise KeyError(
×
532
            "Variable with name `{}` not found - Create it first "
533
            "explicitly or write to it".format(name)
534
        )
535

536
    def get_variable_by_notification_handle(self, handle: int) -> PLCVariable:
4✔
537
        """Get variable by a notification handle, throw error if not found"""
538
        for _, var in self._data.items():
4✔
539
            if handle in var.notifications:
4✔
540
                return var
4✔
541

542
        raise KeyError("Notification handle `{}` could not be resolved".format(handle))
×
543

544
    def add_variable(self, var: PLCVariable) -> None:
4✔
545
        """Add a new variable."""
546
        tup = (var.index_group, var.index_offset)
4✔
547
        self._data[tup] = var
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc