• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ARMmbed / mbed-os-tools / #457

24 Aug 2024 09:15PM UTC coverage: 0.0% (-59.9%) from 59.947%
#457

push

coveralls-python

web-flow
Merge 7c6dbce13 into c467d6f14

0 of 4902 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/mbed_os_tools/test/host_tests_conn_proxy/conn_primitive_serial.py
1
# Copyright (c) 2018, Arm Limited and affiliates.
2
# SPDX-License-Identifier: Apache-2.0
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16

17
import time
×
18
from serial import Serial, SerialException
×
19

20
from .. import host_tests_plugins
×
21
from ..host_tests_plugins.host_test_plugins import HostTestPluginBase
×
22
from .conn_primitive import ConnectorPrimitive, ConnectorPrimitiveException
×
23

24

25
class SerialConnectorPrimitive(ConnectorPrimitive):
×
26
    def __init__(self, name, port, baudrate, config):
×
27
        ConnectorPrimitive.__init__(self, name)
×
28
        self.port = port
×
29
        self.baudrate = int(baudrate)
×
30
        self.read_timeout = 0.01  # 10 milli sec
×
31
        self.write_timeout = 5
×
32
        self.config = config
×
33
        self.target_id = self.config.get('target_id', None)
×
34
        self.mcu = self.config.get('mcu', None)
×
35
        self.polling_timeout = config.get('polling_timeout', 60)
×
36
        self.forced_reset_timeout = config.get('forced_reset_timeout', 1)
×
37
        self.skip_reset = config.get('skip_reset', False)
×
38
        self.serial = None
×
39

40
        # Assume the provided serial port is good. Don't attempt to use the
41
        # target_id to re-discover the serial port, as the board may not be a
42
        # fully valid DAPLink-compatable or Mbed Enabled board (it may be
43
        # missing a mount point). Do not attempt to check if the serial port
44
        # for given target_id changed. We will attempt to open the port and
45
        # pass the already opened port object (not name) to the reset plugin.
46
        serial_port = None
×
47
        if self.port is not None:
×
48
            # A serial port was provided.
49
            # Don't pass in the target_id, so that no change in serial port via
50
            # auto-discovery happens.
51
            self.logger.prn_inf("using specified port '%s'" % (self.port))
×
52
            serial_port = HostTestPluginBase().check_serial_port_ready(self.port, target_id=None, timeout=self.polling_timeout)
×
53
        else:
54
            # No serial port was provided.
55
            # Fallback to auto-discovery via target_id.
56
            self.logger.prn_inf("getting serial port via mbedls)")
×
57
            serial_port = HostTestPluginBase().check_serial_port_ready(self.port, target_id=self.target_id, timeout=self.polling_timeout)
×
58

59
        if serial_port is None:
×
60
            raise ConnectorPrimitiveException("Serial port not ready!")
×
61

62
        if serial_port != self.port:
×
63
            # Serial port changed for given targetID
64
            self.logger.prn_inf("serial port changed from '%s to '%s')"% (self.port, serial_port))
×
65
            self.port = serial_port
×
66

67
        startTime = time.time()
×
68
        self.logger.prn_inf("serial(port=%s, baudrate=%d, read_timeout=%s, write_timeout=%d)"% (self.port, self.baudrate, self.read_timeout, self.write_timeout))
×
69
        while time.time() - startTime < self.polling_timeout:
×
70
            try:
×
71
                # TIMEOUT: While creating Serial object timeout is delibrately passed as 0. Because blocking in Serial.read
72
                # impacts thread and mutliprocess functioning in Python. Hence, instead in self.read() s delay (sleep()) is
73
                # inserted to let serial buffer collect data and avoid spinning on non blocking read().
74
                self.serial = Serial(self.port, baudrate=self.baudrate, timeout=0, write_timeout=self.write_timeout)
×
75
            except SerialException as e:
×
76
                self.serial = None
×
77
                self.LAST_ERROR = "connection lost, serial.Serial(%s, %d, %d, %d): %s"% (self.port,
×
78
                    self.baudrate,
79
                    self.read_timeout,
80
                    self.write_timeout,
81
                    str(e))
82
                self.logger.prn_err(str(e))
×
83
                self.logger.prn_err("Retry after 1 sec until %s seconds" % self.polling_timeout)
×
84
            else:
85
                if not self.skip_reset:
×
86
                    self.reset_dev_via_serial(delay=self.forced_reset_timeout)
×
87
                break
×
88
            time.sleep(1)
×
89

90
    def reset_dev_via_serial(self, delay=1):
×
91
        """! Reset device using selected method, calls one of the reset plugins """
92
        reset_type = self.config.get('reset_type', 'default')
×
93
        if not reset_type:
×
94
            reset_type = 'default'
×
95
        disk = self.config.get('disk', None)
×
96

97
        self.logger.prn_inf("reset device using '%s' plugin..."% reset_type)
×
98
        result = host_tests_plugins.call_plugin('ResetMethod',
×
99
            reset_type,
100
            serial=self.serial,
101
            disk=disk,
102
            mcu=self.mcu,
103
            target_id=self.target_id,
104
            polling_timeout=self.config.get('polling_timeout'))
105
        # Post-reset sleep
106
        if delay:
×
107
            self.logger.prn_inf("waiting %.2f sec after reset"% delay)
×
108
            time.sleep(delay)
×
109
        self.logger.prn_inf("wait for it...")
×
110
        return result
×
111

112
    def read(self, count):
×
113
        """! Read data from serial port RX buffer """
114
        # TIMEOUT: Since read is called in a loop, wait for self.timeout period before calling serial.read(). See
115
        # comment on serial.Serial() call above about timeout.
116
        time.sleep(self.read_timeout)
×
117
        c = str()
×
118
        try:
×
119
            if self.serial:
×
120
                c = self.serial.read(count)
×
121
        except SerialException as e:
×
122
            self.serial = None
×
123
            self.LAST_ERROR = "connection lost, serial.read(%d): %s"% (count, str(e))
×
124
            self.logger.prn_err(str(e))
×
125
        return c
×
126

127
    def write(self, payload, log=False):
×
128
        """! Write data to serial port TX buffer """
129
        try:
×
130
            if self.serial:
×
131
                self.serial.write(payload.encode('utf-8'))
×
132
                if log:
×
133
                    self.logger.prn_txd(payload)
×
134
                return True
×
135
        except SerialException as e:
×
136
            self.serial = None
×
137
            self.LAST_ERROR = "connection lost, serial.write(%d bytes): %s"% (len(payload), str(e))
×
138
            self.logger.prn_err(str(e))
×
139
        return False
×
140

141
    def flush(self):
×
142
        if self.serial:
×
143
            self.serial.flush()
×
144

145
    def connected(self):
×
146
        return bool(self.serial)
×
147

148
    def finish(self):
×
149
        if self.serial:
×
150
            self.serial.close()
×
151

152
    def reset(self):
×
153
        self.reset_dev_via_serial(self.forced_reset_timeout)
×
154

155
    def __del__(self):
×
156
        self.finish()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc