"""Abstract protocols for managing communications with Dragonframe motion control.
Copyright (c) 2018, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import logging, collections
# Import modules from the course library.
import ase.base
################################################################
[docs]class DFMocoServerProtocol(ase.base.Protocol):
"""Communicate as a motion control device with Dragonframe using the DFMoco
protocol. This class by itself is a trivial simulation but is intended to be
subclassed to control specific hardware.
:param number_of_axes: number of channels offered for motion control
"""
major_version = 1
full_version_string = '1.3.0'
def __init__(self, number_of_axes = 4):
self.transport = None
self._input_messages = 0
self.number_of_axes = number_of_axes
self._buffer = b''
self.targets = [0]*number_of_axes #: array of desired stepper positions
self.positions = [0]*number_of_axes #: array of current stepper positions
self.speeds = [100]*number_of_axes #: array of maximum stepper speeds in pulses/second
self.moving = [False]*number_of_axes #: array of flags indicating whether each axis is in motion
# configure logging
self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log
self.logger.setLevel(logging.DEBUG)
return
# The following methods are set up to simplify future compatibility with Python 3 asyncio.
[docs] def connection_made(self, transport):
self.logger.info("Connection opened.")
self.transport = transport
self.send_hello()
return
[docs] def connection_lost(self):
self.logger.info("Connection shutting down.")
return
[docs] def data_received(self, data):
self.logger.debug('Command received: %r', data)
# Manage the possibility of partial reads by appending new data to any previously received partial line.
self._buffer += data
# If at least one full line has been received:
if b'\r\n' in self._buffer:
# Split out each line on CRLF; the last element will be any remainder.
lines = self._buffer.split(b'\r\n')
for line in lines[:-1]:
self.process_line(line.decode().split())
# Any extra text (possibly empty) will be saved for the next receive.
self._buffer = lines[-1]
return
# Transmit a status string to the client. It is encoded as UTF-8 and a CRLF ending appended.
def _send(self, status):
data = status.encode() + b'\r\n'
self.transport.write(data)
self.transport.flush()
self.logger.debug('Status sent: %r', data)
return
# Utility functions
def _motor_is_in_range(self, motor):
# This checks the 1-based packet data against the axes limits.
return (motor > 0) and (motor <= self.number_of_axes)
# Main entry point for interpreting the DFMoco protocol.
[docs] def process_line(self, tokens):
num_tokens = len(tokens)
if len(tokens) > 0:
if tokens[0] == 'hi': # hello
self.send_hello()
elif tokens[0] == 'ms': # motor status
self.send_motor_status()
elif tokens[0] == 'mm': # motor move
if num_tokens == 3:
motor = int(tokens[1])
position = int(tokens[2])
if self._motor_is_in_range(motor):
self.motor_move(motor-1, position)
self.send_motor_position(motor-1)
elif tokens[0] == 'mp': # motor position query
if num_tokens == 2:
motor = int(tokens[1])
if self._motor_is_in_range(motor):
self.send_motor_position(motor-1)
elif tokens[0] == 'sm': # stop motor
if num_tokens == 2:
motor = int(tokens[1])
if self._motor_is_in_range(motor):
self.stop_motor(motor-1)
self._send('sm %d' % motor)
elif tokens[0] == 'sa': # stop all
self.stop_all()
self._send('sa')
elif tokens[0] == 'jm':
# jog motor toward a destination, which is either a very large positive or negative integer
if num_tokens == 3:
motor = int(tokens[1])
destination = int(tokens[2])
direction = 1 if destination > 0 else -1
if self._motor_is_in_range(motor):
self.jog_motor(motor-1, direction)
self._send('jm %d' % motor)
elif tokens[0] == 'im':
# inch motor very slowly motor toward a destination, which is either a very large positive or negative integer
if num_tokens == 3:
motor = int(tokens[1])
destination = int(tokens[2])
direction = 1 if destination > 0 else -1
if self._motor_is_in_range(motor):
self.inch_motor(motor-1, direction)
self._send('im %d' % motor)
elif tokens[0] == 'pr': # pulse rate
if num_tokens == 3:
motor = int(tokens[1])
speed = int(tokens[2])
if self._motor_is_in_range(motor):
self.pulse_rate(motor-1, speed)
self._send('pr %d %d' % (motor, speed))
elif tokens[0] == 'zm': # zero motor
if num_tokens == 2:
motor = int(tokens[1])
if self._motor_is_in_range(motor):
self.zero_motor(motor-1)
self._send('zm %d' % motor)
elif tokens[0] == 'np': # new position
if num_tokens == 2:
motor = int(tokens[1])
position = int(tokens[2])
if self._motor_is_in_range(motor):
self.new_position(motor-1, position)
self._send('np %d %d' % (motor, position))
return
# Methods to transmit status messages to client.
[docs] def send_hello(self):
self._send('hi %d %d %s' % (self.major_version, self.number_of_axes, self.full_version_string))
return
[docs] def send_motor_status(self):
"""Transmit the motor status response with a binary string generated from self.moving."""
self._send('ms ' + ''.join(['1' if flag else '0' for flag in self.moving]))
return
[docs] def send_motor_position(self, motor):
"""Transmit a 'mp' Motor Position status.
:param motor: motor index, starting with zero
"""
self._send('mp %d %d' % (motor+1, self.positions[motor]))
return
# Overridable methods to handle specific client requests.
[docs] def motor_move(self, motor, position):
"""Process a 'mm' Motor Move command.
:param motor: motor index, starting with zero
:param position: target position in pulses.
"""
self.positions[motor] = position
self.targets[motor] = position
return
[docs] def stop_motor(self, motor):
"""Process a 'sm' Stop Motor command.
:param motor: motor index, starting with zero
"""
self.moving[motor] = False
return
[docs] def stop_all(self):
"""Process a 'sa' Stop All command."""
self.moving = [False]*self.number_of_axes
return
[docs] def jog_motor(self, motor, direction):
"""Process a 'jm' Jog Motor command.
:param motor: motor index, starting with zero
:param direction: -1 or 1 to indicate the direction of travel.
"""
self.targets[motor] += 10*direction
self.positions[motor] = self.targets[motor]
return
[docs] def inch_motor(self, motor, direction):
"""Process a 'im' Inch Motor command.
:param motor: motor index, starting with zero
:param direction: -1 or 1 to indicate the direction of travel.
"""
self.targets[motor] += direction
self.positions[motor] = self.targets[motor]
return
[docs] def pulse_rate(self, motor, speed):
"""Process a 'pr' Pulse Rate command.
:param motor: motor index, starting with zero
:param speed: maximum rate in steps/second
"""
self.speeds[motor] = speed
return
[docs] def zero_motor(self, motor):
"""Process a 'zm' Zero Motor command. This resets the position count without physical movement.
:param motor: motor index, starting with zero
"""
self.targets[motor] = 0
self.positions[motor] = 0
return
[docs] def new_position(self, motor, position):
"""Process a 'np' New Position command. This sets the position count without physical movement.
:param motor: motor index, starting with zero
:param position: target position in pulses
"""
self.targets[motor] = position
self.positions[motor] = position
return
################################################################
[docs]class DFMocoClientProtocol(ase.base.Protocol):
"""Communicate with a motion control device using the DFMoco protocol. This
class is intended to be subclassed for specific applications.
"""
major_version = 1
full_version_string = '1.3.0'
def __init__(self):
self.transport = None #: a file-like bidirectional object for the communication with the motion control device, e.g. socket or serial port
self.number_of_axes = 0 #: the number of axes reported by the server
self._input_messages = 0
self._buffer = b''
self._recreate_arrays()
self._requests = collections.deque() # keep track of pending requests for data
# configure logging
self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log
self.logger.setLevel(logging.DEBUG)
return
def _recreate_arrays(self):
self.targets = [0]*self.number_of_axes #: array of commanded stepper positions
self.positions = [0]*self.number_of_axes #: array of most recently reported stepper positions
self.speeds = [0]*self.number_of_axes #: array of maximum stepper speeds in pulses/second
self.moving = [False]*self.number_of_axes #: array of flags indicating whether each axis is in motion
return
[docs] def is_protocol_ready(self):
return self.number_of_axes > 0
# Utility functions
def _motor_is_in_range(self, motor):
# This checks the 1-based packet data against the axes limits.
return (motor > 0) and (motor <= self.number_of_axes)
# The following methods are set up to simplify future compatibility with Python 3 asyncio.
[docs] def connection_made(self, transport):
self.logger.info("Connection opened.")
self.transport = transport
return
[docs] def data_received(self, data):
"""Callback to receive a new bytestring of data, not necessarily line-terminated."""
self.logger.debug('Status received: %r', data)
# Manage the possibility of partial reads by appending new data to any previously received partial line.
self._buffer += data
# If at least one full line has been received:
if b'\r\n' in self._buffer:
# Split out each line on CRLF; the last element will be any remainder.
lines = self._buffer.split(b'\r\n')
for line in lines[:-1]:
self.logger.debug('Processing line: %r', line)
self.process_line(line.decode().split())
# Any extra text (possibly empty) will be saved for the next receive.
self._buffer = lines[-1]
return
# Transmit a command string to the server. It is encoded as UTF-8 and a CRLF ending appended.
def _send(self, command):
data = command.encode() + b'\r\n'
self.transport.write(data)
self.transport.flush()
self.logger.debug('Command sent: %r', data)
return
# Main entry point for interpreting the DFMoco protocol.
[docs] def process_line(self, tokens):
"""Protocol interpreter to process one status input line, supplied as a list of
token strings with whitespace removed."""
num_tokens = len(tokens)
if len(tokens) > 0:
if tokens[0] == 'hi': # hello
if num_tokens == 4:
self.major_version = int(tokens[1])
self.number_of_axes = int(tokens[2])
self.full_version_string = tokens[3]
if self.number_of_axes > 1000:
self.logger.error("Excessive number of axes specified in server response: %d", self.number_of_axes)
self.number_of_axes = 0
else:
self._recreate_arrays()
elif tokens[0] == 'ms': # motor status
if num_tokens == 2:
flags = tokens[1]
if len(flags) != self.number_of_axes:
self.logger.warning("Motor status response length does not match number_of_axes: %d", len(flags))
else:
self.moving = [c == '1' for c in flags]
try:
req = self._requests.popleft()
if req[0] == 'get-status':
if req[1] is not None:
req[1](self.moving)
elif req[0] == 'get-moving-state':
if req[1] is not None:
req[1](any(self.moving))
else:
self.logger.warning('Motor status reply received out of sync: %s', flags)
self._requests.clear()
except IndexError:
self.logger.warning('Unrequested motor status received: %s', flags)
elif tokens[0] == 'mp': # motor position status
if num_tokens == 3:
motor = int(tokens[1])
count = int(tokens[2])
if self._motor_is_in_range(motor):
self.positions[motor-1] = count
# the other responses are command acknowledgements which can be ignored for now
return
[docs] def send_hello(self):
self._send('hi')
return
[docs] def send_motor_move(self, motor, target):
if self._motor_is_in_range(motor+1):
self.targets[motor] = target
self._send('mm %d %d' % (motor+1, self.targets[motor]))
[docs] def send_target(self, motor, target):
return self.send_motor_move(motor, target)
[docs] def send_multiple_targets(self, first, targets):
for c, pos in enumerate(targets):
self.send_motor_move(first + c, pos)
[docs] def send_pulse_rate(self, motor, rate):
if self._motor_is_in_range(motor+1):
self.speeds[motor] = rate
self._send('pr %d %d' % (motor+1, self.speeds[motor]))
[docs] def send_motor_status_request(self, callback=None):
self._send('ms')
self._requests.append(("get-status", callback))
[docs] def send_get_moving_state(self, callback=None):
"""Send a request for the trajectory generator status. This is an asynchronous
request so a callback function is normally provided to receive the
return value. This is an informal protocol implemented by multiple
devices to support waiting for completion.
:param callback: callable to be called with True if any motors are still moving else False
"""
self._send('ms')
self._requests.append(("get-moving-state", callback))
################################################################