Source code for pneumatics_animation.ArduinoProtocol
"""\
ArduinoProtocol.py : abstract class to manage serial communication with an Arduino sketch.
Copyright (c) 2015-2017, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
#================================================================
from __future__ import print_function
import time
# This requires a pySerial installation.
# Package details: https://pypi.python.org/pypi/pyserial,
# Documentation: http://pythonhosted.org/pyserial/
import serial
#================================================================
[docs]class ArduinoProtocol(object):
"""Abstract class to manage a serial connection to an Arduino sketch.
This class is sufficient to send and receive structured data messages as
line-delimited text, but is more useful as a parent class for an
application-specific class which manages the specific data types in the
stream.
:param port: the name of the serial port device
:param verbose: flag to increase console output
:param debug: flag to print raw inputs on console
:param rate: serial baud rate
:param kwargs: collect any unused keyword arguments
"""
def __init__(self, port=None, verbose=False, debug=False, rate=115200, **kwargs ):
# initialize the protocol state
self.output = None #: Output port object (usually same as input).
self.input = None #: Input port object (usually same as output).
self.messages_received = 0 #: Count of total lines received.
self.messages_sent = 0 #: Count of total lines sent.
self._verbose = verbose
self._debug = debug
self._portname = port
self._port = None
self._start_time = 0.0
self._rate = rate
return
[docs] def fileno(self):
"""Return the file descriptor for the input port, useful for blocking on available input with select."""
return self.input.fileno()
[docs] def is_connected(self):
"""Return true if the serial port device is open."""
return self._port is not None
[docs] def set_serial_port_name(self, name):
"""Set the name of the serial port device."""
self._portname = name
return
[docs] def open_serial_port(self):
"""Open the serial connection to the Arduino and initialize communications."""
# open the serial port, which should also reset the Arduino
self._port = serial.Serial( self._portname, self._rate, timeout=5 )
if self._verbose:
print("Opened serial port named", self._port.name)
# initialize timer to allow waiting briefly for the Arduino to finish booting
self._start_time = time.time()
# save separate copies of the file object; this will ease simulation using other sources
self.output = self._port
self.input = self._port
return
[docs] def is_startup_delay_complete(self):
"""Check whether the boot time interval has completed after opening the serial port."""
return time.time() > self._start_time + 2.0
[docs] def close_serial_port(self):
"""Shut down the serial connection to the Arduino, after which this object may no longer be used."""
self._port.close()
self._port = None
return
[docs] def message_received(self, tokens):
"""Method called whenever a new message is received from the Arduino.
The default implentation is null but this may be overridden in
subclasses as a hook for this event.
:param tokens: list of strings representing whitespace-delimited tokens in received message
"""
pass
[docs] def wait_for_message(self):
"""Wait for a line of input from the Arduino and process it, updating status
variables. This will block if data is not ready; if using in an
event-driven system, it is important to check the port for available
data prior to calling.
"""
# read one line of input, blocking if no data is available
line = self._port.readline().rstrip()
if line:
self.messages_received += 1
elements = line.split()
if self._debug:
print("Received: '%s'" % line)
self.message_received(elements)
return
[docs] def send_message_string(self, string):
"""Issue a message string to the Arduino.
:param string: message as single string without terminating newline
"""
if self.output is None:
print("Port not open for output.")
else:
if self._debug:
print("Sending: '%s'" % string)
self.output.write( string+'\n')
self.messages_sent += 1
return
[docs] def send_message(self, tokens):
"""Issue a message to the Arduino sketch.
:param tokens: message as list of values
"""
string = " ".join([str(elem) for elem in tokens])
return self.send_message_string(string)
#================================================================