Source code for pneumatics_animation.ArduinoProtocol

"""\
ArduinoProtocol.py : abstract class to manage serial communication with an Arduino sketch.

Copyright (c) 2015-2017, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.

"""

#================================================================
from __future__ import print_function
import time

# This requires a pySerial installation.
#  Package details: https://pypi.python.org/pypi/pyserial,
#  Documentation: http://pythonhosted.org/pyserial/
import serial

#================================================================
[docs] class ArduinoProtocol(object): """Abstract class to manage a serial connection to an Arduino sketch. This class is sufficient to send and receive structured data messages as line-delimited text, but is more useful as a parent class for an application-specific class which manages the specific data types in the stream. :param port: the name of the serial port device :param verbose: flag to increase console output :param debug: flag to print raw inputs on console :param rate: serial baud rate :param kwargs: collect any unused keyword arguments """ def __init__(self, port=None, verbose=False, debug=False, rate=115200, **kwargs ): # initialize the protocol state self.output = None #: Output port object (usually same as input). self.input = None #: Input port object (usually same as output). self.messages_received = 0 #: Count of total lines received. self.messages_sent = 0 #: Count of total lines sent. self._verbose = verbose self._debug = debug self._portname = port self._port = None self._start_time = 0.0 self._rate = rate return
[docs] def fileno(self): """Return the file descriptor for the input port, useful for blocking on available input with select.""" return self.input.fileno()
[docs] def is_connected(self): """Return true if the serial port device is open.""" return self._port is not None
[docs] def set_serial_port_name(self, name): """Set the name of the serial port device.""" self._portname = name return
[docs] def open_serial_port(self): """Open the serial connection to the Arduino and initialize communications.""" # open the serial port, which should also reset the Arduino self._port = serial.Serial( self._portname, self._rate, timeout=5 ) if self._verbose: print("Opened serial port named", self._port.name) # initialize timer to allow waiting briefly for the Arduino to finish booting self._start_time = time.time() # save separate copies of the file object; this will ease simulation using other sources self.output = self._port self.input = self._port return
[docs] def is_startup_delay_complete(self): """Check whether the boot time interval has completed after opening the serial port.""" return time.time() > self._start_time + 2.0
[docs] def flush_serial_input(self): """Clear the input buffer.""" if self.input is not None: self.input.flushInput()
[docs] def close_serial_port(self): """Shut down the serial connection to the Arduino, after which this object may no longer be used.""" self._port.close() self._port = None return
[docs] def message_received(self, tokens): """Method called whenever a new message is received from the Arduino. The default implentation is null but this may be overridden in subclasses as a hook for this event. :param tokens: list of strings representing whitespace-delimited tokens in received message """ pass
[docs] def wait_for_message(self): """Wait for a line of input from the Arduino and process it, updating status variables. This will block if data is not ready; if using in an event-driven system, it is important to check the port for available data prior to calling. """ # read one line of input, blocking if no data is available line = self._port.readline().rstrip() if line: self.messages_received += 1 elements = line.split() if self._debug: print("Received: '%s'" % line) self.message_received(elements) return
[docs] def send_message_string(self, string): """Issue a message string to the Arduino. :param string: message as single string without terminating newline """ if self.output is None: print("Port not open for output.") else: if self._debug: print("Sending: '%s'" % string) self.output.write( string+'\n') self.messages_sent += 1 return
[docs] def send_message(self, tokens): """Issue a message to the Arduino sketch. :param tokens: message as list of values """ string = " ".join([str(elem) for elem in tokens]) return self.send_message_string(string)
#================================================================