Source code for kf.valve
"""Interfaces to hardware valve systems. Uses the QtSerialPort module for
communication event processing using the Qt event loop.
"""
################################################################
# Written in 2018-2020 by Garth Zeglin <garthz@cmu.edu>
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
################################################################
# standard Python libraries
import logging
# for documentation on the PyQt5 API, see http://pyqt.sourceforge.net/Docs/PyQt5/index.html
from PyQt5 import QtCore, QtSerialPort
# set up logger for module
log = logging.getLogger('valve')
# filter out most logging; the default is NOTSET which passes along everything
# log.setLevel(logging.WARNING)
################################################################
[docs]class QtSerialValve(object):
"""Class to manage a serial connection to a hardware valve system using Qt
QSerialPort object for data transport. The data protocol is based on the
ValveControl Arduino sketch.
"""
def __init__(self):
self._portname = None
self._buffer = b''
self._port = None
return
[docs] def status_message(self):
if self._port is None:
return "<not open>"
else:
# ignore any input
pass
[docs] def available_ports(self):
"""Return a list of names of available serial ports."""
return [port.portName() for port in QtSerialPort.QSerialPortInfo.availablePorts()]
[docs] def set_port(self, name):
if name == "<no selection>":
log.debug("User picked the null serial port entry.")
self._portname = None
else:
self._portname = name
[docs] def open(self):
"""Open the serial port and initialize communications. If the port is already
open, this will close it first. If the current name is None, this will not open
anything. Returns True if the port is open, else False."""
if self._port is not None:
self.close()
if self._portname is None:
log.debug("No port name provided so not opening port.")
return False
return
self._port = QtSerialPort.QSerialPort()
self._port.setBaudRate(115200)
self._port.setPortName(self._portname)
# open the serial port, which should also reset the Arduino
if self._port.open(QtCore.QIODevice.ReadWrite):
log.info("Opened valve serial port %s", self._port.portName())
# always process data as it becomes available
self._port.readyRead.connect(self.read_input)
return True
else:
# Error codes: https://doc.qt.io/qt-5/qserialport.html#SerialPortError-enum
errcode = self._port.error()
if errcode == QtSerialPort.QSerialPort.PermissionError:
log.warning("Failed to open valve serial port %s with a QSerialPort PermissionError, which could involve an already running control process, a stale lock file, or dialout group permissions.", self._port.portName())
else:
log.warning("Failed to open valve serial port %s with a QSerialPort error code %d.", self._port.portName(), errcode)
self._port = None
return False
[docs] def set_and_open_port(self, name):
self.set_port(name)
self.open()
[docs] def close(self):
"""Shut down the serial connection to the Arduino."""
if self._port is not None:
log.info("Closing valve serial port %s", self._port.portName())
self._port.close()
self._port = None
return
# --- manage input from serial port ------------------
[docs] def data_received(self, data):
# Manage the possibility of partial reads by appending new data to any previously received partial line.
# The data arrives as a PyQT5.QtCore.QByteArray.
self._buffer += bytes(data)
while b'\n' in self._buffer:
first, self._buffer = self._buffer.split(b'\n', 1)
first = first.rstrip()
self._parse_status_line(first)
def _parse_status_line(self, data):
# parse a single line of status input provided as a bytestring
# tokens = data.split()
# just ignore
pass
# --- manage output to serial port ------------------
[docs] def write(self, data):
if self._port is not None:
self._port.write(data)
else:
log.debug("Valve serial port not open during write.")
def _send_command(self, string):
log.debug("Sending to valve: %s", string)
self.write(string.encode()+b'\n')
return
# --- ad hoc valve protocol ----------------
# This generates messages following the command protocol in ValveControl.ino
[docs] def ping(self):
self._send_command("ping")
return
[docs] def stop(self):
"""Send a stop command to shut off all valve flow."""
self._send_command("stop")
return
[docs] def set_speed(self, axis, velocity):
"""Set the constant velocity of one or more target positions.
:param axis: an integer axis index starting with 0
:param velocity: a valve PWM rate ranging from -100 to 100
"""
self._send_command("speed %d %d" % (axis + 1, velocity))
return