Mbed Host Tests
conn_primitive_serial.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 mbed SDK
4 Copyright (c) 2011-2016 ARM Limited
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 """
18 
19 
20 import time
21 from serial import Serial, SerialException
22 from mbed_host_tests import host_tests_plugins
23 from mbed_host_tests.host_tests_plugins.host_test_plugins import HostTestPluginBase
24 from .conn_primitive import ConnectorPrimitive, ConnectorPrimitiveException
25 
26 
28  def __init__(self, name, port, baudrate, config):
29  ConnectorPrimitive.__init__(self, name)
30  self.port = port
31  self.baudrate = int(baudrate)
32  self.read_timeout = 0.01 # 10 milli sec
33  self.write_timeout = 5
34  self.config = config
35  self.target_id = self.config.get('target_id', None)
36  self.polling_timeout = config.get('polling_timeout', 60)
37  self.forced_reset_timeout = config.get('forced_reset_timeout', 1)
38  self.skip_reset = config.get('skip_reset', False)
39  self.serial = None
40 
41  # Check if serial port for given target_id changed
42  # If it does we will use new port to open connections and make sure reset plugin
43  # later can reuse opened already serial port
44  #
45  # Note: This listener opens serial port and keeps connection so reset plugin uses
46  # serial port object not serial port name!
47  serial_port = HostTestPluginBase().check_serial_port_ready(self.port, target_id=self.target_id, timeout=self.polling_timeout)
48  if serial_port is None:
49  raise ConnectorPrimitiveException("Serial port not ready!")
50 
51  if serial_port != self.port:
52  # Serial port changed for given targetID
53  self.logger.prn_inf("serial port changed from '%s to '%s')"% (self.port, serial_port))
54  self.port = serial_port
55 
56  startTime = time.time()
57  self.logger.prn_inf("serial(port=%s, baudrate=%d, read_timeout=%s, write_timeout=%d)"% (self.port, self.baudrate, self.read_timeout, self.write_timeout))
58  while time.time() - startTime < self.polling_timeout:
59  try:
60  # TIMEOUT: While creating Serial object timeout is delibrately passed as 0. Because blocking in Serial.read
61  # impacts thread and mutliprocess functioning in Python. Hence, instead in self.read() s delay (sleep()) is
62  # inserted to let serial buffer collect data and avoid spinning on non blocking read().
63  self.serial = Serial(self.port, baudrate=self.baudrate, timeout=0, write_timeout=self.write_timeout)
64  except SerialException as e:
65  self.serial = None
66  self.LAST_ERROR = "connection lost, serial.Serial(%s, %d, %d, %d): %s"% (self.port,
67  self.baudrate,
68  self.read_timeout,
69  self.write_timeout,
70  str(e))
71  self.logger.prn_err(str(e))
72  self.logger.prn_err("Retry after 1 sec until %s seconds" % self.polling_timeout)
73  else:
74  if not self.skip_reset:
76  break
77  time.sleep(1)
78 
79  def reset_dev_via_serial(self, delay=1):
80  """! Reset device using selected method, calls one of the reset plugins """
81  reset_type = self.config.get('reset_type', 'default')
82  if not reset_type:
83  reset_type = 'default'
84  disk = self.config.get('disk', None)
85 
86  self.logger.prn_inf("reset device using '%s' plugin..."% reset_type)
87  result = host_tests_plugins.call_plugin('ResetMethod',
88  reset_type,
89  serial=self.serial,
90  disk=disk,
91  target_id=self.target_id,
92  polling_timeout=self.config.get('polling_timeout'))
93  # Post-reset sleep
94  if delay:
95  self.logger.prn_inf("waiting %.2f sec after reset"% delay)
96  time.sleep(delay)
97  self.logger.prn_inf("wait for it...")
98  return result
99 
100  def read(self, count):
101  """! Read data from serial port RX buffer """
102  # TIMEOUT: Since read is called in a loop, wait for self.timeout period before calling serial.read(). See
103  # comment on serial.Serial() call above about timeout.
104  time.sleep(self.read_timeout)
105  c = str()
106  try:
107  if self.serial:
108  c = self.serial.read(count)
109  except SerialException as e:
110  self.serial = None
111  self.LAST_ERROR = "connection lost, serial.read(%d): %s"% (count, str(e))
112  self.logger.prn_err(str(e))
113  return c
114 
115  def write(self, payload, log=False):
116  """! Write data to serial port TX buffer """
117  try:
118  if self.serial:
119  self.serial.write(payload.encode('utf-8'))
120  if log:
121  self.logger.prn_txd(payload)
122  return True
123  except SerialException as e:
124  self.serial = None
125  self.LAST_ERROR = "connection lost, serial.write(%d bytes): %s"% (len(payload), str(e))
126  self.logger.prn_err(str(e))
127  return False
128 
129  def flush(self):
130  if self.serial:
131  self.serial.flush()
132 
133  def connected(self):
134  return bool(self.serial)
135 
136  def finish(self):
137  if self.serial:
138  self.serial.close()
139 
140  def reset(self):
142 
143  def __del__(self):
144  self.finish()
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.__del__
def __del__(self)
Definition: conn_primitive_serial.py:143
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.write
def write(self, payload, log=False)
Write data to serial port TX buffer.
Definition: conn_primitive_serial.py:115
mbed_host_tests.host_tests_conn_proxy.conn_primitive.ConnectorPrimitive.LAST_ERROR
LAST_ERROR
Definition: conn_primitive.py:32
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.connected
def connected(self)
Check if there is a connection to DUT.
Definition: conn_primitive_serial.py:133
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.write_timeout
write_timeout
Definition: conn_primitive_serial.py:33
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.forced_reset_timeout
forced_reset_timeout
Definition: conn_primitive_serial.py:37
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.finish
def finish(self)
Handle DUT dtor like (close resource) operations here.
Definition: conn_primitive_serial.py:136
mbed_host_tests.host_tests_plugins.host_test_plugins.HostTestPluginBase
Base class for all plugins used with host tests.
Definition: host_test_plugins.py:32
mbed_host_tests.host_tests_conn_proxy.conn_primitive.ConnectorPrimitive.finish
def finish(self)
Handle DUT dtor like (close resource) operations here.
Definition: conn_primitive.py:87
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.serial
serial
Definition: conn_primitive_serial.py:39
mbed_host_tests.host_tests_plugins.host_test_plugins
Definition: host_test_plugins.py:1
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.reset
def reset(self)
Reset the dut.
Definition: conn_primitive_serial.py:140
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.read
def read(self, count)
Read data from serial port RX buffer.
Definition: conn_primitive_serial.py:100
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.read_timeout
read_timeout
Definition: conn_primitive_serial.py:32
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.__init__
def __init__(self, name, port, baudrate, config)
Definition: conn_primitive_serial.py:28
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.port
port
Definition: conn_primitive_serial.py:30
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.baudrate
baudrate
Definition: conn_primitive_serial.py:31
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.reset_dev_via_serial
def reset_dev_via_serial(self, delay=1)
Reset device using selected method, calls one of the reset plugins.
Definition: conn_primitive_serial.py:79
mbed_host_tests.host_tests_conn_proxy.conn_primitive.ConnectorPrimitive.logger
logger
Definition: conn_primitive.py:33
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.target_id
target_id
Definition: conn_primitive_serial.py:35
mbed_host_tests.host_tests_conn_proxy.conn_primitive.ConnectorPrimitive.polling_timeout
polling_timeout
Definition: conn_primitive.py:34
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.config
config
Definition: conn_primitive_serial.py:34
mbed_host_tests.host_tests_conn_proxy.conn_primitive.ConnectorPrimitive
Definition: conn_primitive.py:29
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.skip_reset
skip_reset
Definition: conn_primitive_serial.py:38
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive
Definition: conn_primitive_serial.py:27
mbed_host_tests.host_tests_conn_proxy.conn_primitive_serial.SerialConnectorPrimitive.flush
def flush(self)
Flush read/write channels of DUT.
Definition: conn_primitive_serial.py:129
mbed_host_tests.host_tests_conn_proxy.conn_primitive.ConnectorPrimitiveException
Definition: conn_primitive.py:22