Source code for CncShieldGUI.CncShieldProtocol

#!/usr/bin/env python

"""\
CncShieldProtocol.py : communications logic to control an Arduino running CNC_Shield_Server

Copyright (c) 2015, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.

"""

#================================================================
from __future__ import print_function
import argparse
import time

# This requires a pySerial installation.
#  Package details: https://pypi.python.org/pypi/pyserial,
#  Documentation: http://pythonhosted.org/pyserial/
import serial

#================================================================
[docs]class CncShieldClient(object): """Class to manage a connection to a CNC_Shield_Server running on a serial-connected Arduino. :param port: the name of the serial port device :param verbose: flag to increase console output :param debug: flag to print raw inputs on sconsole :param kwargs: collect any unused keyword arguments """ def __init__(self, port=None, verbose=False, debug=False, **kwargs ): # initialize the client state self.arduino_time = 0 self.position = [0, 0, 0] self.target = [0, 0, 0] self.verbose = verbose self.message_count = 0 self.position_message_count = 0 self.debug = debug self.awake = False self.portname = port self.port = None self.output = None self.input = None self.start_time = 0.0 return
[docs] def is_connected(self): """Return true if the serial port device is open.""" return self.port is not None
[docs] def set_serial_port_name(self, name): """Set the name of the serial port device.""" self.portname = name return
[docs] def open_serial_port(self): """Open the serial connection to the Arduino and initialize communciations.""" # open the serial port, which should also reset the Arduino self.port = serial.Serial( self.portname, 115200, timeout=5 ) if self.verbose: print("Opened serial port named", self.port.name) # initialize timer to allow waiting briefly for the Arduino to finish booting self.start_time = time.time() # save separate copies of the file object; this will ease simulation using other sources self.output = self.port self.input = self.port return
[docs] def is_startup_delay_complete(self): """Check whether the boot time interval has completed after opening the serial port.""" return time.time() > self.start_time + 2.0
[docs] def flush_serial_input(self): """Clear the input buffer.""" if self.input is not None: self.input.flushInput()
[docs] def close_serial_port(self): """Shut down the serial connection to the Arduino, after which this object may no longer be used.""" self.port.close() self.port = None return
[docs] def position_updated(self): """Method called whenever a new position update is received from the Arduino. The default implentation is null but this may be overridden in subclasses as a hook for this event. """ pass
[docs] def wait_for_input(self): """Wait for a line of input from the Arduino and process it, updating status variables. This will block if data is not ready; if using in an event-driven system, it is important to check the port for available data prior to calling. """ # read one line of input, blocking if no data is available line = self.port.readline().rstrip() if line: self.message_count += 1 elements = line.split() if self.debug: print("Received: '%s'" % line) if elements[0] == 'txyz': self.arduino_time = int(elements[1]) self.position = [int(s) for s in elements[2:]] self.position_message_count += 1 self.position_updated() elif elements[0] == 'awake': self.awake = True elif elements[0] == 'dbg': print("Received debugging message: '%s'" % line) else: print("Unknown status message: '%s'" % line) return
[docs] def wait_for_position_update(self): """Block until a position status message has been received.""" last_position_count = self.position_message_count if self.debug: print("Waiting for status update...") while True: self.wait_for_input() if self.position_message_count > last_position_count: return if self.verbose: print("Still waiting for status update...")
[docs] def send_command(self, string): """Issue a command string to the Arduino.""" if self.output is None: print("Port not open for output.") else: if self.debug: print("Sending: '%s'" % string) self.output.write( string+'\n') return
[docs] def motor_enable( self, value=True): """Issue a command to enable or disable the stepper motor drivers.""" self.send_command( "enable 1" if value is True else "enable 0" ) return
[docs] def issue_move(self, position): """Issue a command to move to a [x, y, z] absolute position (specified in microsteps). Returns immediately. :param position: a list or tuple with at least three elements """ self.send_command( "goto %d %d %d" % tuple(position)) self.target = position return
[docs] def distance_to_target(self): """Return the estimated total number of microsteps remaining to reach the target position, based on the most recently received position status. """ # this returns the maximum individual integer error distances = [abs(target - position) for target,position in zip(self.target, self.position)] return max(distances)
#================================================================