Source code for ase.DFMocoProtocol

"""Abstract protocols for managing communications with Dragonframe motion control.

Copyright (c) 2018, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""

# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals

# Standard library modules.
import logging, collections

# Import modules from the course library.
import ase.base

################################################################
[docs]class DFMocoServerProtocol(ase.base.Protocol): """Communicate as a motion control device with Dragonframe using the DFMoco protocol. This class by itself is a trivial simulation but is intended to be subclassed to control specific hardware. :param number_of_axes: number of channels offered for motion control """ major_version = 1 full_version_string = '1.3.0' def __init__(self, number_of_axes = 4): self.transport = None self._input_messages = 0 self.number_of_axes = number_of_axes self._buffer = b'' self.targets = [0]*number_of_axes #: array of desired stepper positions self.positions = [0]*number_of_axes #: array of current stepper positions self.speeds = [100]*number_of_axes #: array of maximum stepper speeds in pulses/second self.moving = [False]*number_of_axes #: array of flags indicating whether each axis is in motion # configure logging self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log self.logger.setLevel(logging.DEBUG) return # The following methods are set up to simplify future compatibility with Python 3 asyncio.
[docs] def connection_made(self, transport): self.logger.info("Connection opened.") self.transport = transport self.send_hello() return
[docs] def connection_lost(self): self.logger.info("Connection shutting down.") return
[docs] def data_received(self, data): self.logger.debug('Command received: %r', data) # Manage the possibility of partial reads by appending new data to any previously received partial line. self._buffer += data # If at least one full line has been received: if b'\r\n' in self._buffer: # Split out each line on CRLF; the last element will be any remainder. lines = self._buffer.split(b'\r\n') for line in lines[:-1]: self.process_line(line.decode().split()) # Any extra text (possibly empty) will be saved for the next receive. self._buffer = lines[-1] return
# Transmit a status string to the client. It is encoded as UTF-8 and a CRLF ending appended. def _send(self, status): data = status.encode() + b'\r\n' self.transport.write(data) self.transport.flush() self.logger.debug('Status sent: %r', data) return # Utility functions def _motor_is_in_range(self, motor): # This checks the 1-based packet data against the axes limits. return (motor > 0) and (motor <= self.number_of_axes) # Main entry point for interpreting the DFMoco protocol.
[docs] def process_line(self, tokens): num_tokens = len(tokens) if len(tokens) > 0: if tokens[0] == 'hi': # hello self.send_hello() elif tokens[0] == 'ms': # motor status self.send_motor_status() elif tokens[0] == 'mm': # motor move if num_tokens == 3: motor = int(tokens[1]) position = int(tokens[2]) if self._motor_is_in_range(motor): self.motor_move(motor-1, position) self.send_motor_position(motor-1) elif tokens[0] == 'mp': # motor position query if num_tokens == 2: motor = int(tokens[1]) if self._motor_is_in_range(motor): self.send_motor_position(motor-1) elif tokens[0] == 'sm': # stop motor if num_tokens == 2: motor = int(tokens[1]) if self._motor_is_in_range(motor): self.stop_motor(motor-1) self._send('sm %d' % motor) elif tokens[0] == 'sa': # stop all self.stop_all() self._send('sa') elif tokens[0] == 'jm': # jog motor toward a destination, which is either a very large positive or negative integer if num_tokens == 3: motor = int(tokens[1]) destination = int(tokens[2]) direction = 1 if destination > 0 else -1 if self._motor_is_in_range(motor): self.jog_motor(motor-1, direction) self._send('jm %d' % motor) elif tokens[0] == 'im': # inch motor very slowly motor toward a destination, which is either a very large positive or negative integer if num_tokens == 3: motor = int(tokens[1]) destination = int(tokens[2]) direction = 1 if destination > 0 else -1 if self._motor_is_in_range(motor): self.inch_motor(motor-1, direction) self._send('im %d' % motor) elif tokens[0] == 'pr': # pulse rate if num_tokens == 3: motor = int(tokens[1]) speed = int(tokens[2]) if self._motor_is_in_range(motor): self.pulse_rate(motor-1, speed) self._send('pr %d %d' % (motor, speed)) elif tokens[0] == 'zm': # zero motor if num_tokens == 2: motor = int(tokens[1]) if self._motor_is_in_range(motor): self.zero_motor(motor-1) self._send('zm %d' % motor) elif tokens[0] == 'np': # new position if num_tokens == 2: motor = int(tokens[1]) position = int(tokens[2]) if self._motor_is_in_range(motor): self.new_position(motor-1, position) self._send('np %d %d' % (motor, position)) return
# Methods to transmit status messages to client.
[docs] def send_hello(self): self._send('hi %d %d %s' % (self.major_version, self.number_of_axes, self.full_version_string)) return
[docs] def send_motor_status(self): """Transmit the motor status response with a binary string generated from self.moving.""" self._send('ms ' + ''.join(['1' if flag else '0' for flag in self.moving])) return
[docs] def send_motor_position(self, motor): """Transmit a 'mp' Motor Position status. :param motor: motor index, starting with zero """ self._send('mp %d %d' % (motor+1, self.positions[motor])) return
# Overridable methods to handle specific client requests.
[docs] def motor_move(self, motor, position): """Process a 'mm' Motor Move command. :param motor: motor index, starting with zero :param position: target position in pulses. """ self.positions[motor] = position self.targets[motor] = position return
[docs] def stop_motor(self, motor): """Process a 'sm' Stop Motor command. :param motor: motor index, starting with zero """ self.moving[motor] = False return
[docs] def stop_all(self): """Process a 'sa' Stop All command.""" self.moving = [False]*self.number_of_axes return
[docs] def jog_motor(self, motor, direction): """Process a 'jm' Jog Motor command. :param motor: motor index, starting with zero :param direction: -1 or 1 to indicate the direction of travel. """ self.targets[motor] += 10*direction self.positions[motor] = self.targets[motor] return
[docs] def inch_motor(self, motor, direction): """Process a 'im' Inch Motor command. :param motor: motor index, starting with zero :param direction: -1 or 1 to indicate the direction of travel. """ self.targets[motor] += direction self.positions[motor] = self.targets[motor] return
[docs] def pulse_rate(self, motor, speed): """Process a 'pr' Pulse Rate command. :param motor: motor index, starting with zero :param speed: maximum rate in steps/second """ self.speeds[motor] = speed return
[docs] def zero_motor(self, motor): """Process a 'zm' Zero Motor command. This resets the position count without physical movement. :param motor: motor index, starting with zero """ self.targets[motor] = 0 self.positions[motor] = 0 return
[docs] def new_position(self, motor, position): """Process a 'np' New Position command. This sets the position count without physical movement. :param motor: motor index, starting with zero :param position: target position in pulses """ self.targets[motor] = position self.positions[motor] = position return
################################################################
[docs]class DFMocoClientProtocol(ase.base.Protocol): """Communicate with a motion control device using the DFMoco protocol. This class is intended to be subclassed for specific applications. """ major_version = 1 full_version_string = '1.3.0' def __init__(self): self.transport = None #: a file-like bidirectional object for the communication with the motion control device, e.g. socket or serial port self.number_of_axes = 0 #: the number of axes reported by the server self._input_messages = 0 self._buffer = b'' self._recreate_arrays() self._requests = collections.deque() # keep track of pending requests for data # configure logging self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log self.logger.setLevel(logging.DEBUG) return def _recreate_arrays(self): self.targets = [0]*self.number_of_axes #: array of commanded stepper positions self.positions = [0]*self.number_of_axes #: array of most recently reported stepper positions self.speeds = [0]*self.number_of_axes #: array of maximum stepper speeds in pulses/second self.moving = [False]*self.number_of_axes #: array of flags indicating whether each axis is in motion return
[docs] def is_protocol_ready(self): return self.number_of_axes > 0
# Utility functions def _motor_is_in_range(self, motor): # This checks the 1-based packet data against the axes limits. return (motor > 0) and (motor <= self.number_of_axes) # The following methods are set up to simplify future compatibility with Python 3 asyncio.
[docs] def connection_made(self, transport): self.logger.info("Connection opened.") self.transport = transport return
[docs] def data_received(self, data): """Callback to receive a new bytestring of data, not necessarily line-terminated.""" self.logger.debug('Status received: %r', data) # Manage the possibility of partial reads by appending new data to any previously received partial line. self._buffer += data # If at least one full line has been received: if b'\r\n' in self._buffer: # Split out each line on CRLF; the last element will be any remainder. lines = self._buffer.split(b'\r\n') for line in lines[:-1]: self.logger.debug('Processing line: %r', line) self.process_line(line.decode().split()) # Any extra text (possibly empty) will be saved for the next receive. self._buffer = lines[-1] return
# Transmit a command string to the server. It is encoded as UTF-8 and a CRLF ending appended. def _send(self, command): data = command.encode() + b'\r\n' self.transport.write(data) self.transport.flush() self.logger.debug('Command sent: %r', data) return # Main entry point for interpreting the DFMoco protocol.
[docs] def process_line(self, tokens): """Protocol interpreter to process one status input line, supplied as a list of token strings with whitespace removed.""" num_tokens = len(tokens) if len(tokens) > 0: if tokens[0] == 'hi': # hello if num_tokens == 4: self.major_version = int(tokens[1]) self.number_of_axes = int(tokens[2]) self.full_version_string = tokens[3] if self.number_of_axes > 1000: self.logger.error("Excessive number of axes specified in server response: %d", self.number_of_axes) self.number_of_axes = 0 else: self._recreate_arrays() elif tokens[0] == 'ms': # motor status if num_tokens == 2: flags = tokens[1] if len(flags) != self.number_of_axes: self.logger.warning("Motor status response length does not match number_of_axes: %d", len(flags)) else: self.moving = [c == '1' for c in flags] try: req = self._requests.popleft() if req[0] == 'get-status': if req[1] is not None: req[1](self.moving) elif req[0] == 'get-moving-state': if req[1] is not None: req[1](any(self.moving)) else: self.logger.warning('Motor status reply received out of sync: %s', flags) self._requests.clear() except IndexError: self.logger.warning('Unrequested motor status received: %s', flags) elif tokens[0] == 'mp': # motor position status if num_tokens == 3: motor = int(tokens[1]) count = int(tokens[2]) if self._motor_is_in_range(motor): self.positions[motor-1] = count # the other responses are command acknowledgements which can be ignored for now return
[docs] def send_hello(self): self._send('hi') return
[docs] def send_motor_move(self, motor, target): if self._motor_is_in_range(motor+1): self.targets[motor] = target self._send('mm %d %d' % (motor+1, self.targets[motor]))
[docs] def send_target(self, motor, target): return self.send_motor_move(motor, target)
[docs] def send_multiple_targets(self, first, targets): for c, pos in enumerate(targets): self.send_motor_move(first + c, pos)
[docs] def send_pulse_rate(self, motor, rate): if self._motor_is_in_range(motor+1): self.speeds[motor] = rate self._send('pr %d %d' % (motor+1, self.speeds[motor]))
[docs] def send_motor_status_request(self, callback=None): self._send('ms') self._requests.append(("get-status", callback))
[docs] def send_get_moving_state(self, callback=None): """Send a request for the trajectory generator status. This is an asynchronous request so a callback function is normally provided to receive the return value. This is an informal protocol implemented by multiple devices to support waiting for completion. :param callback: callable to be called with True if any motors are still moving else False """ self._send('ms') self._requests.append(("get-moving-state", callback))
################################################################