Source code for ase.SerialTransport

"""Encapsulating a pySerial object as a transport class.

Copyright (c) 2015-2018, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals

# Standard library modules.
import os, sys, time, argparse, select, logging

# This requires a pySerial installation.
import serial
import serial.tools.list_ports

# Import modules from the course library.
import ase.base

#================================================================
[docs]class SerialTransport(ase.base.Transport): """Manage a serial port as a bidirectional transport which passes along received data to a Protocol object, and can pass along write requests to the underlying port. :param protocol: object to received streamed data and generate messages :param portname: string naming the serial device :param rate: standard integer baud rate for communication. Default is 57600. """ def __init__(self, protocol, portname, rate = 57600): self.protocol = protocol self._portname = portname self._baud_rate = rate # configure logging self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log self.logger.setLevel(logging.DEBUG) return
[docs] def open(self): """Open the serial port and initialize communications.""" # open the serial port, which should also reset the Arduino self._port = serial.Serial(self._portname, self._baud_rate, timeout=5 ) self.logger.debug("Opened serial port named %s", self._port.name) # pass this transport object to the protocol object self.protocol.connection_made(self) return
[docs] def close(self): """Shut down the serial connection to the Arduino, after which this object may no longer be used.""" self.logger.debug("Closing serial port named %s", self._port.name) self._port.close() self._port = None self.protocol.connection_lost() return
[docs] def write(self, data): self._port.write(data)
[docs] def fileno(self): # N.B. Windows does not support file descriptors for serial ports. return self._port.fileno()
[docs] def flush(self): self._port.flush()
[docs] def flush_input(self): self._port.reset_input_buffer()
[docs] def wait_for_data(self): """Wait for at least one character of input from the serial port and process it, updating status variables. This will block if data is not ready; if using in an event-driven system, it is important to check the port for available data prior to calling. """ # read as much input as available, blocking if no data is available data = self._port.read(max(1, self._port.in_waiting)) self.protocol.data_received(data) return
#================================================================ #================================================================