Source code for ase.DFRemoteProtocol
"""Abstract protocols for managing communications for Dragonframe remote operation.
Copyright (c) 2018, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
"""
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import logging, collections
# Import modules from the course library.
import ase.base
################################################################
[docs]class DFRemoteServerProtocol(ase.base.Protocol):
"""Communicate as a remote control with Dragonframe using the Simple
Interface/TCP protocol.
"""
def __init__(self):
self.transport = None
self._buffer = b''
# configure logging
self.logger = logging.getLogger(__name__) #: logging object for controlling or capturing an event log
self.logger.setLevel(logging.DEBUG)
return
[docs] def data_received(self, data):
self.logger.debug('Message received: %r', data)
# Manage the possibility of partial reads by appending new data to any previously received partial line.
self._buffer += data
# If at least one full line has been received:
if b'\r\n' in self._buffer:
# Split out each line on CRLF; the last element will be any remainder.
lines = self._buffer.split(b'\r\n')
for line in lines[:-1]:
self.process_line(line.decode().split())
# Any extra text (possibly empty) will be saved for the next receive.
self._buffer = lines[-1]
return
# Transmit a status string to the client. It is encoded as UTF-8 and a CRLF ending appended.
def _send(self, status):
data = status.encode() + b'\r\n'
self.transport.write(data)
self.transport.flush()
self.logger.debug('Message sent: %r', data)
return
# Main entry point for interpreting the DFRemote protocol.
[docs] def process_line(self, tokens):
num_tokens = len(tokens)
if num_tokens > 0:
# Shoot Frame message, e.g. 'SH 1 1 X1 0'
# This indicates the frame/exposure combination which was just shot.
if tokens[0] == 'SH':
if num_tokens >= 5:
frame = int(tokens[1])
exposure = int(tokens[2])
exposure_name = tokens[3]
stereo_position = int(tokens[4])
self.shot_taken(frame, exposure, exposure_name, stereo_position)
# Position Frame message, e.g. 'PF 2 1 X1 0'
# This appears to indicate the frame/exposure combination to which to move after a shot.
elif tokens[0] == 'PF':
if num_tokens >= 5:
frame = int(tokens[1])
exposure = int(tokens[2])
exposure_name = tokens[3]
stereo_position = int(tokens[4])
self.position_requested(frame, exposure, exposure_name, stereo_position)
# Capture Complete message, e.g. 'CC 1 1 X1 0'
# This is the final stage of a common SH, PF, CC sequence during a shot.
elif tokens[0] == 'CC':
if num_tokens >= 5:
frame = int(tokens[1])
exposure = int(tokens[2])
exposure_name = tokens[3]
stereo_position = int(tokens[4])
# Delete message, e.g. 'DE'
elif tokens[0] == 'DE':
self.frame_deleted()
return
# Methods to transmit messages to Dragonframe.
[docs] def send_shoot_frames(self, count=1):
self._send('S %d' % (count))
return
[docs] def send_delete_frame(self):
self._send('D')
return
[docs] def send_toggle_play(self):
self._send('P')
return
[docs] def send_live_view(self):
self._send('L')
return
# Overridable methods to handle specific client requests.
[docs] def shot_taken(self, frame, exposure, exposure_name, stereo_position):
self.logger.info("Shot taken: frame %d, exposure %d, exposure_name %s, stereo_position %d", frame, exposure, exposure_name, stereo_position)
[docs] def position_requested(self, frame, exposure, exposure_name, stereo_position):
self.logger.info("Position requested: frame %d, exposure %d, exposure_name %s, stereo_position %d", frame, exposure, exposure_name, stereo_position)
[docs] def frame_deleted(self):
self.logger.info("Frame deleted")
################################################################