Source code for ase.DFRemoteProtocol

"""Abstract protocols for managing communications for Dragonframe remote operation.

Copyright (c) 2018, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""

# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals

# Standard library modules.
import logging, collections

# Import modules from the course library.
import ase.base

################################################################
[docs]class DFRemoteServerProtocol(ase.base.Protocol): """Communicate as a remote control with Dragonframe using the Simple Interface/TCP protocol. """ def __init__(self): self.transport = None self._buffer = b'' # configure logging self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log self.logger.setLevel(logging.DEBUG) return
[docs] def data_received(self, data): self.logger.debug('Message received: %r', data) # Manage the possibility of partial reads by appending new data to any previously received partial line. self._buffer += data # If at least one full line has been received: if b'\r\n' in self._buffer: # Split out each line on CRLF; the last element will be any remainder. lines = self._buffer.split(b'\r\n') for line in lines[:-1]: self.process_line(line.decode().split()) # Any extra text (possibly empty) will be saved for the next receive. self._buffer = lines[-1] return
# Transmit a status string to the client. It is encoded as UTF-8 and a CRLF ending appended. def _send(self, status): data = status.encode() + b'\r\n' self.transport.write(data) self.transport.flush() self.logger.debug('Message sent: %r', data) return # Main entry point for interpreting the DFRemote protocol.
[docs] def process_line(self, tokens): num_tokens = len(tokens) if num_tokens > 0: # Shoot Frame message, e.g. 'SH 1 1 X1 0' # This indicates the frame/exposure combination which was just shot. if tokens[0] == 'SH': if num_tokens >= 5: frame = int(tokens[1]) exposure = int(tokens[2]) exposure_name = tokens[3] stereo_position = int(tokens[4]) self.shot_taken(frame, exposure, exposure_name, stereo_position) # Position Frame message, e.g. 'PF 2 1 X1 0' # This appears to indicate the frame/exposure combination to which to move after a shot. elif tokens[0] == 'PF': if num_tokens >= 5: frame = int(tokens[1]) exposure = int(tokens[2]) exposure_name = tokens[3] stereo_position = int(tokens[4]) self.position_requested(frame, exposure, exposure_name, stereo_position) # Capture Complete message, e.g. 'CC 1 1 X1 0' # This is the final stage of a common SH, PF, CC sequence during a shot. elif tokens[0] == 'CC': if num_tokens >= 5: frame = int(tokens[1]) exposure = int(tokens[2]) exposure_name = tokens[3] stereo_position = int(tokens[4]) # Delete message, e.g. 'DE' elif tokens[0] == 'DE': self.frame_deleted() return
# Methods to transmit messages to Dragonframe.
[docs] def send_shoot_frames(self, count=1): self._send('S %d' % (count)) return
[docs] def send_delete_frame(self): self._send('D') return
[docs] def send_toggle_play(self): self._send('P') return
[docs] def send_live_view(self): self._send('L') return
# Overridable methods to handle specific client requests.
[docs] def shot_taken(self, frame, exposure, exposure_name, stereo_position): self.logger.info("Shot taken: frame %d, exposure %d, exposure_name %s, stereo_position %d", frame, exposure, exposure_name, stereo_position)
[docs] def position_requested(self, frame, exposure, exposure_name, stereo_position): self.logger.info("Position requested: frame %d, exposure %d, exposure_name %s, stereo_position %d", frame, exposure, exposure_name, stereo_position)
[docs] def frame_deleted(self): self.logger.info("Frame deleted")
################################################################