Source code for CncShieldGUI.CncShieldProtocol
#!/usr/bin/env python
"""\
CncShieldProtocol.py : communications logic to control an Arduino running CNC_Shield_Server
Copyright (c) 2015, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
#================================================================
from __future__ import print_function
import argparse
import time
# This requires a pySerial installation.
# Package details: https://pypi.python.org/pypi/pyserial,
# Documentation: http://pythonhosted.org/pyserial/
import serial
#================================================================
[docs]class CncShieldClient(object):
"""Class to manage a connection to a CNC_Shield_Server running on a
serial-connected Arduino.
:param port: the name of the serial port device
:param verbose: flag to increase console output
:param debug: flag to print raw inputs on sconsole
:param kwargs: collect any unused keyword arguments
"""
def __init__(self, port=None, verbose=False, debug=False, **kwargs ):
# initialize the client state
self.arduino_time = 0
self.position = [0, 0, 0]
self.target = [0, 0, 0]
self.verbose = verbose
self.message_count = 0
self.position_message_count = 0
self.debug = debug
self.awake = False
self.portname = port
self.port = None
self.output = None
self.input = None
self.start_time = 0.0
return
[docs] def is_connected(self):
"""Return true if the serial port device is open."""
return self.port is not None
[docs] def set_serial_port_name(self, name):
"""Set the name of the serial port device."""
self.portname = name
return
[docs] def open_serial_port(self):
"""Open the serial connection to the Arduino and initialize communciations."""
# open the serial port, which should also reset the Arduino
self.port = serial.Serial( self.portname, 115200, timeout=5 )
if self.verbose:
print("Opened serial port named", self.port.name)
# initialize timer to allow waiting briefly for the Arduino to finish booting
self.start_time = time.time()
# save separate copies of the file object; this will ease simulation using other sources
self.output = self.port
self.input = self.port
return
[docs] def is_startup_delay_complete(self):
"""Check whether the boot time interval has completed after opening the serial port."""
return time.time() > self.start_time + 2.0
[docs] def close_serial_port(self):
"""Shut down the serial connection to the Arduino, after which this object may no longer be used."""
self.port.close()
self.port = None
return
[docs] def position_updated(self):
"""Method called whenever a new position update is received from the Arduino.
The default implentation is null but this may be overridden in
subclasses as a hook for this event.
"""
pass
[docs] def wait_for_position_update(self):
"""Block until a position status message has been received."""
last_position_count = self.position_message_count
if self.debug:
print("Waiting for status update...")
while True:
self.wait_for_input()
if self.position_message_count > last_position_count:
return
if self.verbose:
print("Still waiting for status update...")
[docs] def send_command(self, string):
"""Issue a command string to the Arduino."""
if self.output is None:
print("Port not open for output.")
else:
if self.debug:
print("Sending: '%s'" % string)
self.output.write( string+'\n')
return
[docs] def motor_enable( self, value=True):
"""Issue a command to enable or disable the stepper motor drivers."""
self.send_command( "enable 1" if value is True else "enable 0" )
return
[docs] def issue_move(self, position):
"""Issue a command to move to a [x, y, z] absolute position (specified in microsteps). Returns immediately.
:param position: a list or tuple with at least three elements
"""
self.send_command( "goto %d %d %d" % tuple(position))
self.target = position
return
[docs] def distance_to_target(self):
"""Return the estimated total number of microsteps remaining to reach the target
position, based on the most recently received position status.
"""
# this returns the maximum individual integer error
distances = [abs(target - position) for target,position in zip(self.target, self.position)]
return max(distances)
#================================================================