Source code for ase.SerialTransport
"""Encapsulating a pySerial object as a transport class.
Copyright (c) 2015-2018, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import os, sys, time, argparse, select, logging
# This requires a pySerial installation.
import serial
import serial.tools.list_ports
# Import modules from the course library.
import ase.base
#================================================================
[docs]class SerialTransport(ase.base.Transport):
"""Manage a serial port as a bidirectional transport which passes along received
data to a Protocol object, and can pass along write requests to the
underlying port.
:param protocol: object to received streamed data and generate messages
:param portname: string naming the serial device
:param rate: standard integer baud rate for communication. Default is 57600.
"""
def __init__(self, protocol, portname, rate = 57600):
self.protocol = protocol
self._portname = portname
self._baud_rate = rate
# configure logging
self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log
self.logger.setLevel(logging.DEBUG)
return
[docs] def open(self):
"""Open the serial port and initialize communications."""
# open the serial port, which should also reset the Arduino
self._port = serial.Serial(self._portname, self._baud_rate, timeout=5 )
self.logger.debug("Opened serial port named %s", self._port.name)
# pass this transport object to the protocol object
self.protocol.connection_made(self)
return
[docs] def close(self):
"""Shut down the serial connection to the Arduino, after which this object may no longer be used."""
self.logger.debug("Closing serial port named %s", self._port.name)
self._port.close()
self._port = None
self.protocol.connection_lost()
return
[docs] def write(self, data):
self._port.write(data)
[docs] def fileno(self):
# N.B. Windows does not support file descriptors for serial ports.
return self._port.fileno()
[docs] def flush(self):
self._port.flush()
[docs] def wait_for_data(self):
"""Wait for at least one character of input from the serial port and process it,
updating status variables. This will block if data is not ready; if
using in an event-driven system, it is important to check the port for
available data prior to calling.
"""
# read as much input as available, blocking if no data is available
data = self._port.read(max(1, self._port.in_waiting))
self.protocol.data_received(data)
return
#================================================================
[docs]def print_serial_port_list():
print("Available serial ports:")
for p in serial.tools.list_ports.comports():
print(" ", p.device)
#================================================================